Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 10445][pulsar-io] Exposed SubscriptionType in the SinkContext #10446

Merged
merged 3 commits into from
May 8, 2021

Conversation

dlg99
Copy link
Contributor

@dlg99 dlg99 commented Apr 29, 2021

Fixes #10445

Motivation

SinkContext should expose Subscription type to the Sink
More context: #9927 (comment)

Needed for #9927

Modifications

Added getSubscriptionType() to the SinkContext interface and ContextImpl

Verifying this change

  • Make sure that the change passes the CI checks.

This change is already covered by existing unit tests

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • The public API: yes

New method added to the interface that is a public API. Default method implementation is provided.

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall is good.

I would also add a (unit, non integration) test in order to ensure that the Sink receives the value.

@dlg99 dlg99 requested a review from eolivelli April 30, 2021 17:45
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm

@dlg99
Copy link
Contributor Author

dlg99 commented May 3, 2021

rebased on current master

@dlg99 dlg99 changed the title [Issue 10445][pulsar-io] Exposed SubscriptionType in the ConnectorContext [Issue 10445][pulsar-io] Exposed SubscriptionType in the SinkContext May 3, 2021
Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before merging this pull request, we should get stakeholders like @jerrypeng and @srkukarni to confirm if this API works for them.

@dlg99 dlg99 requested a review from sijie May 4, 2021 02:42
@dlg99
Copy link
Contributor Author

dlg99 commented May 4, 2021

@jerrypeng @srkukarni Do you have any feedback on this? (I cannot add you to reviewers, no permissions)

@eolivelli
Copy link
Contributor

@jerrypeng @srkukarni ping.
this patch is blocking another bigger work.
can you please take a quick look ?

@jerrypeng
Copy link
Contributor

jerrypeng commented May 7, 2021

I don't see any red flags in this PR but I also don't understand adding this functionality will be useful.

@dlg99 based the comments

#9927 (comment)

It seem like the subscription type should be checked at submission time and be rejected if a wrong subscription type is used.

If we are checking the subscription type during the sink open, what are we going to do if the subscription type is wrong? Just go into a crash loop?

@dlg99
Copy link
Contributor Author

dlg99 commented May 7, 2021

If we are checking the subscription type during the sink open, what are we going to do if the subscription type is wrong? Just go into a crash loop?

@jerrypeng This is not different from what's happening in other connectors now. I agree we can consider adding something to handle it better, but this is out of the scope of this PR.

if (kafkaSinkConfig.getBatchSize() <= 0) {
throw new IllegalArgumentException("Invalid Kafka Producer batchSize : "
+ kafkaSinkConfig.getBatchSize());
}
if (kafkaSinkConfig.getMaxRequestSize() <= 0) {
throw new IllegalArgumentException("Invalid Kafka Producer maxRequestSize : "
+ kafkaSinkConfig.getMaxRequestSize());
}

checkArgument(isNotBlank(kinesisSinkConfig.getAwsKinesisStreamName()), "empty kinesis-stream name");
checkArgument(isNotBlank(kinesisSinkConfig.getAwsEndpoint()) ||
isNotBlank(kinesisSinkConfig.getAwsRegion()),
"Either the aws-end-point or aws-region must be set");
checkArgument(isNotBlank(kinesisSinkConfig.getAwsCredentialPluginParam()), "empty aws-credential param");

etc

@jerrypeng
Copy link
Contributor

I am not a fan of adding new interfaces to support a not well thought out behavior that might be changed in the future but I don't see a major problem with this as well.

@eolivelli
Copy link
Contributor

Thank you @jerrypeng

@sijie you left "request changes" status.
Do you agree to merge this patch ?

@sijie sijie added this to the 2.8.0 milestone May 8, 2021
@sijie sijie merged commit b5fd8ef into apache:master May 8, 2021
eolivelli pushed a commit to eolivelli/pulsar that referenced this pull request May 11, 2021
…pache#10446)

Fixes apache#10445

### Motivation

SinkContext should expose Subscription type to the Sink
More context: apache#9927 (comment)

Needed for apache#9927

### Modifications

Added `getSubscriptionType()` to the `SinkContext` interface and `ContextImpl`
@dlg99 dlg99 deleted the context_subscription branch October 14, 2021 23:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

SinkContext should expose Subscription type to the Sink
4 participants