-
Notifications
You must be signed in to change notification settings - Fork 13.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-3229] Flink streaming consumer for AWS Kinesis #1911
Conversation
Thank you for opening a pull request for the consumer.
|
@rmetzger Sure, that seems reasonable. I'll wait until the producer is merged and resubmit a new PR for the integrated consumer :) |
Cool. You don't need to resubmit a new PR. By pushing new commits to your |
|
||
this.regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION, KinesisConfigConstants.DEFAULT_AWS_REGION); | ||
AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials()); | ||
client.setRegion(Region.getRegion(Regions.fromName(this.regionId))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to set the endpoint here as well to make it use.
Which AWS region were you using? (Maybe there's something like a default endpoint that works for one region)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm using the "ap-northeast-1" region, which isn't the default.
Setting the region on the AmazonKinesisClient should set the endpoint too, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought so too, but it didn't work for me.
I'll investigate the issue further ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found out what I was doing wrong. The code was using the default region ID because I forgot to set it.
I'm currently fixing some issues in the consumer and I'll make the region a required argument.
Quick update on our plan: I've merged the Kinesis producer. If you want, you can rebase this pull request on the current master. |
@rmetzger Hi Robert, |
@tzulitai The producer wasn't merged yet. |
ah ok :) see it now, thanks. |
The problem was that the github mirror needed some time to sync with the commit. But now its there. |
…rtiesConfig` to protected for testing purposes
…aming.connectors.kinesis.serialization package
Quick update:
|
Great, thank you. I'll review the PR soon. |
@@ -45,6 +45,7 @@ under the License. | |||
<module>flink-connector-rabbitmq</module> | |||
<module>flink-connector-twitter</module> | |||
<module>flink-connector-nifi</module> | |||
<module>flink-connector-kinesis</module> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have to remove this line again. The module is included in the profile below (you have to activate the "include-kinesis" maven build profile)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I missed the "include-kinesis" profile defined below. We'll probably need a more general profile name in the future though (ex. include-aws-connectors), for example when we start including more Amazon licensed libraries for other connectors such as for DynamoDB.
… module (only include when profile "include-kinesis" is activated)
|
||
GetRecordsResult getRecordsResult = kinesisProxy.getRecords(nextShardItr, 100); | ||
|
||
final List<Record> fetchedRecords = getRecordsResult.getRecords(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The records returned here might be aggregated: http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html
So we need to add code to deaggregate them here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted, thanks for pointing this out as I did not realize this. Will include in the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rmetzger It seems like to determine whether or not a record is aggregated, we will need to rely on some protobuf magic. The KCL has implemented this implicitly in the above mentioned code, starting from line 201. I don't really understand the reason for why the AGGREGATED_RECORD_MAGIC is set this way. Should we simply import the KCL too solely to use this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we have to add the KCL as a dependency for this class, yes.
When you add the dependency, can you exclude the dependency to aws-java-sdk-dynamodb
? I don't want to pull in too many dependencies ;) (maybe there are more deps we can safely remove)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, no problem. I'll investigate if there are more deps that we can remove along with the new dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update:
Ended up excluding aws-java-sdk-dynamodb
and aws-java-sdk-cloudwatch
.
I could not get the example to work with the current jackson version. Only after upgrading it to |
|
||
this.deserializer = checkNotNull(deserializer, "deserializer can not be null"); | ||
|
||
this.shards = new KinesisProxy(configProps).getShardList(streams); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we need to query the list of shards from the client. The problem (also of the Kafka consumer) is that we expect the client to be able to connect to Kinesis.
Imagine somebody submitting a Flink job from their laptop to an EC2 instance running Flink. The laptop would need to be able to access Kinesis as well.
I think we can get the shard list on the parallel tasks as well, and then modulo by the hash of the shard id, so that there is a defined assignment to a parallel worker.
This would also allow us to elegantly handle reshards (if a worker detects a reshard, it queries again for the shard list and assigns the appropriate shards)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main concern when deciding to implement getting complete shard list at the client is due to Amazon's limitation of the DescribeStream operation (need to use this to access shard list) at 10 transactions per second: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html.
Many parallel tasks calling this operation simultaneously for big Kinesis streams might cause issues, which will make it hard to decide on an appropriate backfire time & retry limit for the DescribeStream operation (current default settings in the consumer is 1 second backfire and 3 retry limit).
Other than this concern, I think it will be absolutely fine to implement this only on the parallel tasks. And certainly the implementation will be much cleaner and friendly to future enhancements for Kinesis-side resharding, as you mentioned :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, that limit is indeed a problem when running Flink with a parallelism higher than 10. Lets leave this then as it is.
Thanks for the explanation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm currently trying to mock the KinesisProxy here for unit testing the whole FlinkKinesisConsumer
.
Currently considering two ways of modifying this part to make the class testable:
-
Introduce a serializable concrete
KinesisProxyFactory
class which will be instantiated as a default field withinFlinkKinesisConsumer
. TheKinesisProxyFactory
will have a non-static method to help create the proxy. This line will this be changed to,
this.shards = this.kinesisProxyFactory.createProxy(configProps).getShardList(streams)
which is much easier to mock. -
Change this line to,
this.shards = createProxy(configProps).getShardList(streams)
, wherecreateProxy
is a protected class method withinFlinkKinesisConsumer
that does the actual instantiation ofKinesisProxy
. In our tests, we can write aTestableFlinkKinesisConsumer
that extendsFlinkKinesisConsumer
and overrides the protectedcreateProxy
to return a mockedKinesisProxy
.
Downside of approach 1 is that FlinkKinesisConsumer
will be carrying an extra KinesisProxyFactory
field instance. On the other hand, approach 2 is also quite tedious because TestableFlinkKinesisConsumer
will need to also implement dummies for all variants of FlinkKinesisConsumer
's constructors ...
Which one might be better? Or is there better solutions besides the above?
I'm not quite familiar with writing good tests, so any suggestion here will be quite helpful!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would go for approach 1). I think its cleaner for the production code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the advice :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update for this:
I ended up leaving this line as it is, and use Power Mock for the tests. I tried approach 1, but still ended up in hard testing since the factory still resides in the constructor which was hard to inject mocks for either the factory or the returned KinesisProxy.
…'s Shard as a field This change also includes, 1) Remove regionName from KinesisStreamShard since we won't be needing it for the consumer as all streams are limited to be in the same AWS region. 2) Introduce access methods getStartingHashKey() and getEndingHashKey(), since this info is carried as well with AWS's Shard. 3) Reimplement ReferenceKinesisShardTopologies in the tests according to the changes to how KinesisStreamShard is instantiated.
1) Update AWS Java SDK version to 1.10.71 2) Add AWS KCL library dependency, exluding dependency for DynamoDB and Cloudwatch 3) Rename property `kinesis-producer.version` to `aws.kinesis-kpl.version` for more consistent property naming pattern with the Java SDK and KCL library.
…PL, should deaggregate after fetching
…ly on AWS SDK responses and not on timeout implementation
…erializable classes
1) Remove javadoc for non-public facing classes / methods 2) Finish off any WIP javadoc messages 3) Make note in javadocs of some classes of the fact that the AWS libraries have similar implementations
@rmetzger I've addressed your comments with the latest commits. Thanks in advance for your help on reviewing them :) Please let me know if there is anything else to address! |
I wonder why one of the Travis CI builds is failing on |
The build failure is unrelated to your changes. Its just an instability of the testing infrastructure. |
…ucerTest from rmetzger/FLINK-3229-pr-after-rework add manual exactly once test
…public class for easier testing
…ink.streaming.connectors.kinesis.manualtests
I'm currently busy with some other ongoing tasks. I hope to get back to this PR soon. |
…asier mocking in tests
Changes include: 1. Consolidate all FlinkKinesisConsumer related tests within FlinkKinesisConsumerTest 2. Add a new TestableFlinkKinesisConsumer that helps with tests in FlinkKinesisConsumerTest 3. Add tests for KinesisDataFetcher and ShardConsumerThread that uses mocked KinesisProxy behaviours
…() call to shards user configurable
…alue configurations are given parsable values
@rmetzger |
I'm currently working on a custom branch based on this pull request. It seems that there is a clash with the protobuf versions (kinesis needs 2.6.x, but Flink has 2.5.0 in its classpath). I keep you posted |
Thanks Robert. I'll keep notice of your FLINK-3229-review branch for the changes (I'm assuming your working on FLINK-3229-review for the protobuf problem, please tell me if I'm wrong :)) Also, if there is anything I can do / help with (etc. tests on other environments) to further improve the PR, please don't hesitate to let me know :) |
Yep, that's the right branch. |
As discussed in the JIRA, I'm going to follow the "relocation approach" for fixing the protobuf issue. But we won't release the kinesis connector to mvn central. |
I've been using this consumer for a while in off-production environments.
I understand we should have good test coverage for each PR, but since Kinesis is a hosted service, reliable integration tests are hard to pull off. To speed up merging Kinesis connector for the next release, I'm submitting the consumer now for some early reviews.
On the other hand, since @rmetzger is submitting a separate PR for Kinesis producer, I'd like to postpone writing more tests for the consumer, as well as corresponding modification to the document until both the consumer and producer are in place.