Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import json
from copy import copy
from decimal import Decimal
from os.path import getsize
from tempfile import NamedTemporaryFile
from typing import IO, TYPE_CHECKING, Any, Callable, Sequence
Expand All @@ -36,8 +37,18 @@
from airflow.utils.context import Context


class JSONEncoder(json.JSONEncoder):
"""Custom json encoder implementation"""

def default(self, obj):
"""Convert decimal objects in a json serializable format."""
if isinstance(obj, Decimal):
return float(obj)
return super().default(obj)


def _convert_item_to_json_bytes(item: dict[str, Any]) -> bytes:
return (json.dumps(item) + "\n").encode("utf-8")
return (json.dumps(item, cls=JSONEncoder) + "\n").encode("utf-8")


def _upload_file_to_s3(
Expand Down
43 changes: 42 additions & 1 deletion tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,21 @@

import json
import unittest
from decimal import Decimal
from unittest.mock import MagicMock, patch

from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator
from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator, JSONEncoder


class JSONEncoderTest(unittest.TestCase):
def test_jsonencoder_with_decimal(self):
"""Test JSONEncoder correctly encodes and decodes decimal values."""

for i in ["102938.3043847474", 1.010001, 10, "100", "1E-128", 1e-128]:
org = Decimal(i)
encoded = json.dumps(org, cls=JSONEncoder)
decoded = json.loads(encoded, parse_float=Decimal)
self.assertAlmostEqual(decoded, org)


class DynamodbToS3Test(unittest.TestCase):
Expand Down Expand Up @@ -65,6 +77,35 @@ def test_dynamodb_to_s3_success(self, mock_aws_dynamodb_hook, mock_s3_hook):

assert [{"a": 1}, {"b": 2}, {"c": 3}] == self.output_queue

@patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.S3Hook")
@patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBHook")
def test_dynamodb_to_s3_success_with_decimal(self, mock_aws_dynamodb_hook, mock_s3_hook):
a = Decimal(10.028)
b = Decimal("10.048")
responses = [
{
"Items": [{"a": a}, {"b": b}],
}
]
table = MagicMock()
table.return_value.scan.side_effect = responses
mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table

s3_client = MagicMock()
s3_client.return_value.upload_file = self.mock_upload_file
mock_s3_hook.return_value.get_conn = s3_client

dynamodb_to_s3_operator = DynamoDBToS3Operator(
task_id="dynamodb_to_s3",
dynamodb_table_name="airflow_rocks",
s3_bucket_name="airflow-bucket",
file_size=4000,
)

dynamodb_to_s3_operator.execute(context={})

assert [{"a": float(a)}, {"b": float(b)}] == self.output_queue

@patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.S3Hook")
@patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBHook")
def test_dynamodb_to_s3_with_different_aws_conn_id(self, mock_aws_dynamodb_hook, mock_s3_hook):
Expand Down