Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-24229][connectors/dynamodb] Added DynamoDB connector #18518

Conversation

YuriGusev
Copy link

What is the purpose of the change

User stories:

As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.

Scope:

  • Implement an asynchronous sink for DynamoDB by inheriting the AsyncSinkBase class.
  • The implementation can for now reside in its own module in flink-connectors.
  • Implement an asynchornous sink writer for DynamoDB by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the requeueFailedRequestEntry method.
  • The implementation should utilize DynamoDb batch API.
  • The implemented Sink Writer will be used by the Sink class that is created as part of this story.
  • Java / code-level docs.
  • Unit/Integration testing.

Brief change log

  • Added new DynamoDB Sink into a new module flink-connectors/flink-connector-dynamodb

Verifying this change

This change added tests and can be verified as follows:

  • Integration tests in org.apache.flink.streaming.connectors.dynamodb.sink.DynamoDbSinkITCase

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 8ec1f57 (Wed Jan 26 09:57:33 UTC 2022)

Warnings:

  • 3 pom.xml files were touched: Check for build and licensing issues.
  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 26, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@@ -0,0 +1,184 @@
/*
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an alternative we could let user to construct the client themselves and pass it via a provider:

interface DynamoDbClientProvider { DynamoDbAsyncClient getClient(); }

Then we do not have to create the wrapper util class and give full flexibility to the user on what configuration options to set.

The downside will be that we leak DynamoDB SDK interfaces.

In the end I took same approach as we use in other places. But may be we should consider this option.

What do you think @CrynetLogistics ?

Copy link
Contributor

@CrynetLogistics CrynetLogistics Jan 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your current approach more, like you said, we'd not leak any ddb sdk interfaces.

Plus, I'm not sure there is a way to pass DynamoDbClientProvider to the DynamoDbSinkWriter without first making it a field in DynamoDbSink. If I'm not wrong, I believe the DynamoDbSink will complain that DynamoDbAsyncClient in your DynamoDbClientProvider is not serialisable during runtime (it compiles ok).

I think all fields of the sink must be serialisable... so even if we wanted to accept a customer-ready-made DynamoDbAsyncClient, I'm not sure there's a good way to do it...

Copy link
Author

@YuriGusev YuriGusev Jan 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested with provider in the previous sink implementation, I pass the provider to the constructor and then retrieve client when constructing the writer itself, then serialization issue is not there in runtime.

But I will stick with the current approach for consistency and other reasons you mentioned

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good to know. Thanks, agreed.

Copy link
Contributor

@CrynetLogistics CrynetLogistics left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Yuri, really appreciate your contribution.

  1. I was wondering if you were planning to add a new page to the documentation in a separate jira?
  2. FYI, I have a pending PR to make some minor modifications to the elementConverter that would allow sink implementers(like us!) to hide the elementConverter from the user completely.

return new DynamoDbSinkBuilder<>();
}

@Experimental
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @Internal might be more appropriate here, since we're not expecting to remove this method... flink-architecture-tests are cool with @Internal here too.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @CrynetLogistics for the review. I wasn't sure about the documentation that is published on the website. I'll read up about the process and add it in this PR if possible

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please give me a reference to the changes for the elements converter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please give me a reference to the changes for the elements converter?

Sure, here is the link to the commit on the master branch.

Copy link
Author

@YuriGusev YuriGusev Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see how it is useful for the KinesisSink, as we do not leak Kinesis Put request to the user, but I think it would be more complicated to implement for the DynamoDB Sink.

We will have to implement something that would look exactly like DynamoDB WriteRequest with all its attributes, types, request types, etc. and translate it internally into the WriteRequest object (by the converter). I can not see any other solution for that at the moment. Do you have any ideas?

It will be a lot of functionality we will have to "repeat" that dynamodb provides and limit the user if dynamoDb client evolves in the future.

Do you think it worth doing it, instead of letting user to define the converter themselves? @CrynetLogistics

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

@nirtsruya nirtsruya Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CrynetLogistics @dannycranmer apologize for ping. We have prepared a sample PR in case hiding the ElementConverter is a requirement.
Here we don't let the user interact with the Element Converter or any class from the AWS sdk

dynamoDbClientProperties);
}

@Experimental
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, @Internal might be better.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

*
* <ul>
* <li>{@code maxBatchSize} will be 25
* <li>{@code maxInFlightRequests} will be 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we set the maxInFlightRequests to 50? Since that's the default for the AWS Async Client ... .

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

import java.util.function.Consumer;

/**
* TODO.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A short description :-)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh missed that :( thanks a lot! :)

@@ -0,0 +1,184 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good to know. Thanks, agreed.

package org.apache.flink.streaming.connectors.dynamodb.sink;

/** Exception is thrown when DynamoDb sink failed to write data. */
public class DynamoDbSinkException extends RuntimeException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing compatibility annotation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, fixed

* Represents a single DynamoDb {@link WriteRequest}. Contains the name of the DynamoDb table name
* to write to as well as the {@link WriteRequest}
*/
public class DynamoDbWriteRequest implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing compatibility annotation, might be worth having a look across the whole PR

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added for all classes

}

public static PrimaryKey build(DynamoDbTablesConfig.TableConfig config, WriteRequest request) {
if (config != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we expecting a nullable parameter?
might be worth adding a @Nullable annotation

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

requestItems.get(config.getPartitionKeyName());
AttributeValue sortKeyAttributeValue = requestItems.get(config.getSortKeyName());

if (config.getPartitionKeyName() != null && partitionKeyAttributeValue == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can config.getPartitionKeyName() ever be null?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, you are right, it is mandatory in the config class. Will remove it.

class TableRequestsContainer {

private final DynamoDbTablesConfig tablesConfig;
private final LinkedHashMap<String, Map<PrimaryKey, WriteRequest>> container;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Do we need container to be a LinkedHashMap?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, thanks, it is fixed now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not look fixed in this PR

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, strange, probably got lost on previous re-base. It is fixed now.

* @param <InputT> Type of the elements handled by this sink
*/
@PublicEvolving
public class DynamoDbSink<InputT> extends AsyncSinkBase<InputT, DynamoDbWriteRequest> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why didn't you name it DynamoDbAsyncSink? Shouldn't users be aware of this?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment, makes sense, I was looking at other sinks in the project already inheriting from the AsyncSinkBase, and they are not using the Async naming convention.
@CrynetLogistics @dannycranmer it makes sense to me, is that something we want to do moving forward?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it as DynamoDbSink for consistency with the code base. I do not think the underlying implementation of sync vs async is necessarily a concern to the user and therefore adding it to the class name does not add much value.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dannycranmer So how would you expect a user to know if a connector is sync /async without looking at the underlying code? It should at least be added to the docs. Somewhere it needs to be specified.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nirtsruya what other async connectors aren't specified with async in their names? Can you give some examples?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@almogtavor I would not expect a user to care whether the connector is sync/async. This is an implementation detail of the underlying connector. What is the reason to expose async to the user in the classname?

Copy link
Contributor

@dannycranmer dannycranmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not get a chance to go though all of the test yet, but have added a bunch of comments.

flink-connectors/flink-connector-dynamodb/pom.xml Outdated Show resolved Hide resolved

<modelVersion>4.0.0</modelVersion>

<artifactId>flink-connector-dynamodb</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename inline with the other new aws connectors > flink-connector-aws-dynamodb

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

<name>Flink : Connectors : DynamoDB</name>
<properties>
<aws.sdk.version>2.17.116</aws.sdk.version>
<commons-lang3.version>3.11</commons-lang3.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use the managed version from parent instead? https://github.com/apache/flink/blob/master/pom.xml#L478

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, fixed now

<properties>
<aws.sdk.version>2.17.116</aws.sdk.version>
<commons-lang3.version>3.11</commons-lang3.version>
<testcontainers.version>1.16.2</testcontainers.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use the managed version from parent instead?
https://github.com/apache/flink/blob/master/pom.xml#L139

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, fixed now

<executions>
<execution>
<goals>
<goal>test-jar</goal>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we building a test jar? I cannot see any consumers of this

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed test jar configuration

Comment on lines 103 to 107
TableRequestsContainer container = new TableRequestsContainer(tablesConfig);
requestEntries.forEach(container::put);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Since you are deduplicating here, the actual batch size will be less than the configured batch size. I cannot see a better way to do it though

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes good point, this is the same how boto3 behaves. May be we should rename batch size property to MAX batch size

Comment on lines 100 to 126
@VisibleForTesting
static RetryPolicy getRetryPolicy(Properties properties) {
if (hasRetryConfiguration(properties)) {
RetryPolicy.Builder builder = RetryPolicy.builder();

if (properties.containsKey(AWSDynamoDbConfigConstants.NUMBER_RETRIES)) {
builder.numRetries(
Integer.parseInt(
properties.getProperty(AWSDynamoDbConfigConstants.NUMBER_RETRIES)));
}

if (properties.containsKey(AWSDynamoDbConfigConstants.BACKOFF_STRATEGY)) {
builder.backoffStrategy(
getBackoffStrategy(
properties, AWSDynamoDbConfigConstants.BACKOFF_STRATEGY));
}

if (properties.containsKey(AWSDynamoDbConfigConstants.THROTTLING_BACKOFF_STRATEGY)) {
builder.throttlingBackoffStrategy(
getBackoffStrategy(
properties,
AWSDynamoDbConfigConstants.THROTTLING_BACKOFF_STRATEGY));
}
return builder.build();
}
return RetryPolicy.defaultRetryPolicy();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe these configs can be made generic and pulled in to AWSGeneralUtil?

Comment on lines 138 to 179
@VisibleForTesting
static BackoffStrategy getBackoffStrategy(Properties properties, String strategy) {
AWSDynamoDbConfigConstants.BackoffStrategy backoffStrategy =
AWSDynamoDbConfigConstants.BackoffStrategy.valueOf(
properties.getProperty(strategy));

switch (backoffStrategy) {
case FULL_JITTER:
return FullJitterBackoffStrategy.builder()
.baseDelay(
getDuration(
properties.getProperty(
AWSDynamoDbConfigConstants
.FULL_JITTER_BASE_DELAY_MS)))
.maxBackoffTime(
getDuration(
properties.getProperty(
AWSDynamoDbConfigConstants
.FULL_JITTER_MAX_BACKOFF_TIME_MS)))
.build();
case EQUAL_JITTER:
return EqualJitterBackoffStrategy.builder()
.baseDelay(
getDuration(
properties.getProperty(
AWSDynamoDbConfigConstants
.EQUAL_JITTER_BASE_DELAY_MS)))
.maxBackoffTime(
getDuration(
properties.getProperty(
AWSDynamoDbConfigConstants
.EQUAL_JITTER_MAX_BACKOFF_TIME_MS)))
.build();
case FIXED_DELAY:
return FixedDelayBackoffStrategy.create(
getDuration(
properties.getProperty(
AWSDynamoDbConfigConstants.FIXED_DELAY_BACKOFF_MS)));
default:
return BackoffStrategy.defaultStrategy();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe these configs can be made generic and pulled in to AWSGeneralUtil?


/** A collection of utility functions to simplify work with DynamoDB service exceptions. */
@Internal
public class DynamoDbExceptionUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed previously, we have implemented some error handling classes in AsyncSink to help with the common things, for example https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSCredentialFatalExceptionClassifiers.java

@YuriGusev YuriGusev force-pushed the FLINK-24229-dynamodb-connector-async-writer branch 2 times, most recently from 63f13be to f92d953 Compare April 1, 2022 17:17
YuriGusev and others added 18 commits June 2, 2022 13:59
… TestMapper and DynamoDbElementConverter, do not retry items that are bigger that dynamodb maximum allowed record size
…anged the Scenario implementation, to create the test table with name of the variable testTableName from DynamoDbSinkITCase, but create the DynamoDbSink with table name passed by the tableName property.
… flink-connector-aws-dynamodb, similar to other modules
…versions for testcontainers and apache commons
Add support for DynamoDb write request.
Currently WriteRequest is not serializable (although it is implementing the Serializable interface). The reason is that the AttributeValue class is not Serializable, although also implementing the Serializable interface. This is due to the Set fields initialized as a software.amazon.awssdk.core.util.DefaultSdkAutoConstructList which is not Serializable.
There is an open issue - aws/aws-sdk-java-v2#3143
@YuriGusev YuriGusev force-pushed the FLINK-24229-dynamodb-connector-async-writer branch from 38caf77 to c770f76 Compare June 2, 2022 16:15
nirtsruya and others added 2 commits July 8, 2022 15:26
…sync-writer-element-converter

Flink 24229 dynamodb connector async writer element converter
@YuriGusev
Copy link
Author

Closing this pull request as it was moved to a separate flink-connector-dynamodb repository: apache/flink-connector-aws#1

@YuriGusev YuriGusev closed this Oct 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants