Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter #136

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

vahmed-hamdy
Copy link
Contributor

Purpose of the change

Add DynamoDbTypeInformedElementConverter to convert Elements to dynamoDb Sink using its provided type Info.

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)

  • Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

@dannycranmer
Copy link
Contributor

Please change commit message to include the component: [FLINK-35022][Connectors/DynamoDB]

@dannycranmer
Copy link
Contributor

I have left a comment regarding the need for this on the Jira: https://issues.apache.org/jira/browse/FLINK-35022

@vahmed-hamdy vahmed-hamdy force-pushed the type-informed-element-converter branch 3 times, most recently from c1c4bb6 to a44f9a6 Compare April 20, 2024 01:15
@vahmed-hamdy vahmed-hamdy changed the title [FLINK-35022] Add TypeInformed DDB Element Converter [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter Apr 20, 2024
@vahmed-hamdy
Copy link
Contributor Author

@dannycranmer Thanks for the feedback, I have addressed your comments. Alternatively I decided to follow an approach similar to RowDataToAttributeValueConverter to avoid using reflection. PTAL

@vahmed-hamdy vahmed-hamdy force-pushed the type-informed-element-converter branch 2 times, most recently from 58f0618 to 88fe1cc Compare April 25, 2024 11:16
Copy link
Contributor

@dannycranmer dannycranmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @vahmed-hamdy. Left a few comments

@dannycranmer
Copy link
Contributor

@vahmed-hamdy can you also update the docs please?

@vahmed-hamdy vahmed-hamdy force-pushed the type-informed-element-converter branch from edbf9f3 to 086538d Compare May 17, 2024 10:08
@vahmed-hamdy vahmed-hamdy changed the title [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter [FLINK-35022][Connector/DynamoDB] Add TypeInformed DDB Element Converter as default element converter May 17, 2024
Copy link
Contributor

@dannycranmer dannycranmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @vahmed-hamdy

Comment on lines +54 to +55
@Override
public void open(Sink.InitContext context) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Do we need this? I think it has a no-op default implementation

@dannycranmer
Copy link
Contributor

@hlteoh37 / @z3d1k can you please take a look at this one too?

Comment on lines +202 to +243
if (typeInfo instanceof BasicTypeInfo) {
return AttributeConverterProvider.defaultProvider()
.converterFor(EnhancedType.of(typeInfo.getTypeClass()));
} else if (typeInfo.equals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) {
return getAttributeConverter(
AttributeValueType.B,
bytes ->
bytes instanceof SdkBytes
? AttributeValue.fromB((SdkBytes) bytes)
: AttributeValue.fromB(SdkBytes.fromByteArray((byte[]) bytes)));
} else if (typeInfo instanceof BasicArrayTypeInfo) {
BasicArrayTypeInfo<AttributeT, ?> basicArrayTypeInfo =
(BasicArrayTypeInfo<AttributeT, ?>) typeInfo;
if (basicArrayTypeInfo.getComponentInfo().equals(BasicTypeInfo.STRING_TYPE_INFO)) {
return getAttributeConverter(
AttributeValueType.SS,
array -> AttributeValue.fromSs(Arrays.asList((String[]) array)));
} else if (basicArrayTypeInfo.getComponentInfo() instanceof NumericTypeInfo) {
return getAttributeConverter(
AttributeValueType.NS,
array ->
AttributeValue.fromNs(
convertObjectArrayToStringList((Object[]) array)));
}

return new ArrayAttributeConverterProvider()
.converterFor(EnhancedType.of(typeInfo.getTypeClass()));
} else if (typeInfo instanceof ObjectArrayTypeInfo) {
return getObjectArrayTypeConverter((ObjectArrayTypeInfo<AttributeT, ?>) typeInfo);
} else if (typeInfo instanceof PrimitiveArrayTypeInfo) {
PrimitiveArrayTypeInfo<AttributeT> primitiveArrayTypeInfo =
(PrimitiveArrayTypeInfo<AttributeT>) typeInfo;
if (primitiveArrayTypeInfo.getComponentType() instanceof NumericTypeInfo) {
return getAttributeConverter(
AttributeValueType.NS,
array -> AttributeValue.fromNs(convertPrimitiveArrayToStringList(array)));
} else {
throw new IllegalArgumentException(
String.format(
"Unsupported primitive array typeInfo %s",
primitiveArrayTypeInfo.getComponentType()));
}
Copy link
Contributor

@hlteoh37 hlteoh37 Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we considered using the .attributeConverterProviders() method in the table schema builder and reusing the constructs we use to DDB Table API? It might be good to reuse code here instead of duplicating!

TableSchema.builder(...)
                      .attributeConverterProviders(
                            new ArrayAttributeConverterProvider(), 
                            ... (add any more here) ..., 
                            AttributeConverterProvider.defaultProvider());

We have an example here:
https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java#L56-L72

Copy link
Contributor

@hlteoh37 hlteoh37 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @vahmed-hamdy ! Added a comment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants