Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink #6968

Merged
merged 15 commits into from
Dec 22, 2018

Conversation

yxu-valleytider
Copy link
Contributor

What is the purpose of the change

This PR introduces a new Flink source to consume directly from dynamodb streams. This new source is built on top of the existing Kinesis connector. It interacts with the dynamodb streams via a dynamodb-streams-kinesis-adapter client.

Brief change log

New data stream can be constructed to directly pull data from DynamoDB streams.

DataStream<String> dynamodbStreams = env.addSource(new FlinkDynamodbStreamsConsumer(streamName, ..., ...))

Other changes include:

  • DynamodbStreamsProxy class which uses a dynamodbstreams-kinesis adapter client to interact with Dynamodb streams.
  • Port the describeStream API from the flink-1.5 branch into DynamodbStreamsProxy.
  • Minimalist DynamodbStreamsSchema class which helps deserialize dynamodb streams records

Verifying this change

This change is already covered by most of the existing Flink kinesis connector tests.

Manual tests are provided to verify that the dynamodbstreams connector can:

  • consume from a stream given its ARN
  • consume from a given table with new stream created

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes)
    Added optional dependency on AWS dynamodb-streams-kinesis-adapter.
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs)

@zentol zentol changed the title Consuming data from DynamoDB streams to flink [FLINK-4582] Consuming data from DynamoDB streams to flink Oct 31, 2018
@zentol
Copy link
Contributor

zentol commented Oct 31, 2018

@yxu-valleytider Please make sure that the Pull Request title references the corresponding JIRA. I have modified the title accordingly this time.

@tweise
Copy link
Contributor

tweise commented Oct 31, 2018

@yxu-valleytider please include tag [kinesis] into the PR title and also format the commit messages as per convention [<JIRA>] [kinesis] add nice feature

* limitations under the License.
*/

package org.apache.flink.streaming.connectors.dynamodbstreams;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the newly added packages should be under org.apache.flink.streaming.connectors.kinesis as they are part of the flink-connector-kinesis module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, both the dynamodbstreams and kinesis modules are under the parent module org.apache.flink.streaming.connectors. This is also consistent with the directory structure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are just packages with the module flink-connector-kinesis and the package prefix should reflect that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case I don't mind putting the files into a separate directory, e.g., flink-connector-dynamodbstreams. WDYT ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that will work: We had already determined in our internal work that we need a single shaded AWS dependency. I don't think it's desirable either, because it is just a slight variation of the Kinesis consumer that does not deserve a separate module.

DynamodbStreamsShardHandle.compareShardIds(shardIdInvalid, shardIdValid);
fail("invalid shard Id" + shardIdInvalid + " should trigger exception");
} catch (IllegalArgumentException e) {
// ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be // expected instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes // expected clarifies more.

@@ -73,7 +73,7 @@
private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class);

/** The actual Kinesis client from the AWS SDK that we will be using to make calls. */
private final AmazonKinesis kinesisClient;
protected final AmazonKinesis kinesisClient;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this modifier change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used inside DynamodbStreamsProxy to execute the describeStream call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If describeStream moves to KinesisProxy, this change becomes unnecessary.

@yxu-valleytider yxu-valleytider changed the title [FLINK-4582] Consuming data from DynamoDB streams to flink [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink Oct 31, 2018
@yxu-valleytider
Copy link
Contributor Author

PTAL @tzulitai

@yxu-valleytider
Copy link
Contributor Author

Adjusted package name based on comments.

PTAL @tweise @tzulitai

super(streams, deserializer, config);
}

public static <T> FlinkDynamodbStreamsConsumer<T> create(String stream,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove these static create methods, they don't provide any benefit since the constructors are public (which is consistent with the base class).

*
* @return the result of the describe stream operation
*/
private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless there is anything specific to DynamoDB in this implementation, move it to KinesisProxy. (DescribeStream is standard Kinesis, and it was used for discovery before listShards got introduced.)

Copy link
Contributor Author

@yxu-valleytider yxu-valleytider Dec 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK to move. Just in that case DescribeStream will be marked as protected at least, in order to be visible to DynamoDBStreamsProxy.

By keeping it inside the DynamodbStreamsProxy layer this logic can be private specific for interacting with DynamodbStreams.

@tweise

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making it protected is fine. Since these are common capabilities of the Kinesis API it is better to keep it in one place.

@@ -73,7 +73,7 @@
private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class);

/** The actual Kinesis client from the AWS SDK that we will be using to make calls. */
private final AmazonKinesis kinesisClient;
protected final AmazonKinesis kinesisClient;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If describeStream moves to KinesisProxy, this change becomes unnecessary.

* Different tag name to distinguish from "flink.stream.describe.backoff.base"
* since the latter is deprecated.
*/
public static final String DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove these and remove the @Deprecated annotations from the prior properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tweise are you ok with removing this function KinesisProxy::replaceDeprecatedConsumerKeys as well ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to modify the method to map/replicate the keys but not remove them (since you need them for dynamo). Add a comment that this is for backward compatibility for the regular proxy that is now using listShards instead of describeShards. We can remove the mapping in later release.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

* Updates the last discovered shard of a subscribed stream; only updates if the update is valid.
*/
@Override
public void advanceLastDiscoveredShardOfStream(String stream, String shardId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than duplicating complete logic from the base class, can we just extract what is unique to DynamoDB? That might also eliminate the need to expose subscribedStreamsToLastDiscoveredShardIds?

@yxu-valleytider
Copy link
Contributor Author

PTAL @tweise

super(configProps);
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove extra line.

@@ -55,6 +58,12 @@
import java.util.Properties;
import java.util.Random;

import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's stay consistent with the existing implementation and not add these imports.

* Boolean to indicate whether to compare/enforce shardId format based on the one defined in
* DynamodbStreamsShardHandle.
*/
public static final String DYNAMODB_STREAMS_SHARDID_FORMAT_CHECK =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This property isn't actually used? If so, please remove.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The property is currently used inside DynamoDBStreamsDataFetcher.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key is never used to set the property, that's what not used refers to. The documentation also does not mention why anyone would use it. If there is no use case for this, let's remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed the default to true and removed the config property.

@tweise
Copy link
Contributor

tweise commented Dec 21, 2018

Does the PR need to be rebased?

/**
* DynamoDB streams proxy: interface interacting with the DynamoDB streams.
*/
public class DynamodbStreamsProxy extends KinesisProxy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DynamodbStreamsProxy => DynamoDBStreamsProxy (same may apply elsewhere also)

@@ -469,6 +496,61 @@ private ListShardsResult listShards(String streamName, @Nullable String startSha
return listShardsResults;
}

/**
* Get metainfo for a Dynamodb stream, which contains information about which shards this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove DynamoDB reference here. This is applicable to proper Kinesis also.

configProps.setProperty(newKey, configProps.getProperty(deprecatedOldKey));
configProps.remove(deprecatedOldKey);
if (configProps.containsKey(oldKey)) {
LOG.warn("Backfill the property key {} based on the original key {}", newKey, oldKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this warning since it isn't applicable any more (the key is expected to be used for DynamoDB).

*
* In the context of 1), the set of configurations needs to be translated to the corresponding
* configurations in the Kinesis listShards API. In the mean time, keep these configs since
* they may be useful in the context of 2), i.e., polling data from a DynamoDB stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"may be useful" => "are applicable"

@tweise
Copy link
Contributor

tweise commented Dec 21, 2018

Dependency diff shows the following:

< [INFO] |  |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.6.7.1:compile
< [INFO] |  |  |  +- com.fasterxml.jackson.core:jackson-annotations:jar:2.6.0:compile
< [INFO] |  |  |  \- com.fasterxml.jackson.core:jackson-core:jar:2.6.7:compile
79a77,79
> [INFO] |  +- com.amazonaws:aws-java-sdk-dynamodb:jar:1.11.272:compile
> [INFO] |  |  \- com.amazonaws:aws-java-sdk-s3:jar:1.11.272:compile
> [INFO] |  |     \- com.amazonaws:aws-java-sdk-kms:jar:1.11.272:compile
80a81,84
> [INFO] +- com.amazonaws:dynamodb-streams-kinesis-adapter:jar:1.4.0:compile
> [INFO] |  \- com.fasterxml.jackson.core:jackson-databind:jar:2.6.6:compile
> [INFO] |     +- com.fasterxml.jackson.core:jackson-annotations:jar:2.6.0:compile
> [INFO] |     \- com.fasterxml.jackson.core:jackson-core:jar:2.6.6:compile

The version of com.fasterxml.jackson.core:jackson-databind that comes with com.amazonaws:dynamodb-streams-kinesis-adapter is 2.6.6 vs. 2.6.7 from com.amazonaws:aws-java-sdk-core previously.

@tweise tweise merged commit bc3eb82 into apache:master Dec 22, 2018
@tweise
Copy link
Contributor

tweise commented Dec 22, 2018

@yxu-valleytider thanks for the contribution!

tisonkun pushed a commit to tisonkun/flink that referenced this pull request Jan 17, 2019
…pache#6968)

Introduces a new Flink source to consume from DynamoDB streams. This new source is built on top of the existing Kinesis connector. It interacts with the DynamoDB streams via a dynamodb-streams-kinesis-adapter client.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants