Skip to content

Type Error while using dynamodb_to_s3 operator #28103

@potiuk

Description

@potiuk

Discussed in #28102

Originally posted by p-madduri December 1, 2022

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

def _convert_item_to_json_bytes(item: dict[str, Any]) -> bytes:

if we use the below class at line 39:
def _convert_item_to_json_bytes(item: dict[str, Any]) -> bytes:
return (json.dumps(item) + "\n").encode("utf-8")

its throwing below error
TypeError: Object of type Decimal is not JSON serializable.

can we use

class DecimalEncoder(json.JSONEncoder):
def encode(self, obj):
if isinstance(obj, Mapping):
return '{' + ', '.join(f'{self.encode(k)}: {self.encode(v)}' for (k, v) in obj.items()) + '}'
elif isinstance(obj, Iterable) and (not isinstance(obj, str)):
return '[' + ', '.join(map(self.encode, obj)) + ']'
elif isinstance(obj, Decimal):
return f'{obj.normalize():f}' # using normalize() gets rid of trailing 0s, using ':f' prevents scientific notation
else:
print(obj)
return super().encode(obj)

and need to update the code at line

process_func: Callable[[dict[str, Any]], bytes] = _convert_item_to_json_bytes,

This solution is suggested in this article:
https://randomwits.com/blog/export-dynamodb-s3

Airflow version of MWAA : 2.0.2

What you think should happen instead

mentioned in what happened section

How to reproduce

mentioned in what happened section

Operating System

MAC

Versions of Apache Airflow Providers

from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator

Deployment

MWAA

Deployment details

n/a

Anything else

n/a

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions