Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulsar-IO: Added Kinesis Source Connector #3784

Merged
merged 6 commits into from
Sep 5, 2019

Conversation

david-streamlio
Copy link
Contributor

Added an AWS Kinesis Source connector to the existing Kinesis module.

Modifications

Added an AWS Kinesis Source connector and associated configuration class
Added test class for AWS Kinesis Source connector
Refactored Configuration hierarchy to promote code reused between the Sink and Source configurations.
Added test cases to confirm the above refactoring did NOT break anything in the Sink
Verifying this change

Make sure that the change passes the CI checks.
(Please pick either of the following options)

This change added tests and can be verified as follows:

Added unit tests
Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

Dependencies (does it add or upgrade a dependency): NO
The public API: NO
The schema: NO
The default values of configurations: NO
The wire protocol: NO
The rest endpoints: NO
The admin cli options: NO
Anything that affects deployment: NO
Documentation

Does this pull request introduce a new feature? YES
If yes, how is the feature documented? YES, I updated the docs

@feathj
Copy link

feathj commented Aug 19, 2019

@david-streamlio What is the status of this PR? We have a use case for a kinesis source, and I will probably need to add a modification (allow assume role). Just wondering if it makes sense to do on this branch or to wait until this lands on master.

@david-streamlio
Copy link
Contributor Author

@feathj It looks like this PR was never assigned a reviewer. It is ready to go, so I will see if I can get someone to review it

Copy link
Contributor

@ivankelly ivankelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One major comment. The rest are minor.

private static final long serialVersionUID = 1L;

@FieldDoc(
required = true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need endpoint and region? surely one or the other is sufficient.

private final HashMap<String, String> userProperties = new HashMap<String, String> ();

public KinesisRecord(com.amazonaws.services.kinesis.model.Record record) {
this.key = Optional.of(record.getPartitionKey() + "-" + record.getSequenceNumber());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you adding the sequence number to the key? Seems like partition key -> key should be a 1-1 mapping.

for (int i = 0; i < numRetries; i++) {

try {
checkpointer.checkpoint();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop never breaks on success. It should break out if the checkpoint is successful, rather than running the operation multiple times.

}

// Checkpoint once every checkpoint interval.
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: currentTimeMillis isn't guaranteed to be monotonically increasing so people tend to use nanoTime for this.

try {
queue.put(new KinesisRecord(record));
} catch (InterruptedException e) {
log.error("unable to create KinesisRecord ", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May not necessarily be an error. When the source is being shutdown, the thread would have to be interrupted.

Copy link
Contributor

@ivankelly ivankelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comment around configuration. Otherwise gtg.

@@ -38,7 +37,7 @@
private String awsEndpoint;

@FieldDoc(
required = true,
required = false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be either/or for this and endpoint. if region is set, you should be able to leave endpoint blank

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have set both of these to required = false and added logic in both the Source and Sink connector's open method to check that at least one of these is set to true.

@ivankelly
Copy link
Contributor

rerun java8 tests

@ivankelly ivankelly added this to the 2.5.0 milestone Sep 5, 2019
@ivankelly ivankelly added component/connect type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages labels Sep 5, 2019
@ivankelly ivankelly merged commit 53fb055 into apache:master Sep 5, 2019
addisonj pushed a commit to instructure/pulsar that referenced this pull request Sep 17, 2019
addisonj pushed a commit to instructure/pulsar that referenced this pull request Sep 27, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connector type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants