Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-26203] Basic table factory for Pulsar connector #56

Merged
merged 10 commits into from Aug 31, 2023

Conversation

tisonkun
Copy link
Member

This patch consists work from multiple contributors. I'll respect all their credits before merging.

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for
convenience.)

  • Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>

@ParameterizedTest
@ValueSource(strings = {JSON_FORMAT, AVRO_FORMAT, CSV_FORMAT})
void pulsarSourceSinkWithKeyAndPartialValue(String format) throws Exception {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test still doesn't pass. But maybe we don't support this use case at all.

…other staff works well

Signed-off-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
@tisonkun
Copy link
Member Author

cc @syhily @cbornet I'll appreciate it if you can help with review and suggest on code or features.

I'm pending to add you two and @imaffe to the Co-authored-by columns.

@@ -18,7 +18,7 @@

# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level=OFF
rootLogger.level=INFO
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To-be reverted.

@syhily
Copy link
Contributor

syhily commented Jul 25, 2023

This is a huge PR. I may have time this weekend to glance at it.

@MartijnVisser
Copy link
Contributor

I'll respect all their credits before merging.

@tisonkun This seems to indicate that this contains code that's been contributed by people outside of the ASF, is that true? Because we need to take copyright considerations, licenses etc into account, before that code can be merged into an ASF project.

@tisonkun
Copy link
Member Author

tisonkun commented Jul 27, 2023

outside of the ASF

@MartijnVisser those patches are licensed with ALv2 also, and with the copyright of StreamNative Inc. where we agree to contribute it to the upstream.

@tisonkun tisonkun changed the title [FLINK-XXXXX] Basic table factory for Pulsar connector [FLINK-26203] Basic table factory for Pulsar connector Jul 27, 2023
@tisonkun
Copy link
Member Author

Update JIRA number, this should cover several subtasks under FLINK-26203.

@MartijnVisser
Copy link
Contributor

Because we need to take copyright considerations, licenses etc into account, before that code can be merged into an ASF project.

@tisonkun and I talked about this offline and checked it, there's no problem to accept these contributions

Signed-off-by: tison <wander4096@gmail.com>
@tisonkun tisonkun marked this pull request as ready for review July 28, 2023 06:52
@tisonkun
Copy link
Member Author

Support keybytes for now to minimize code change. Following work -

  1. Add SQL client E2E test
  2. Add upsert table factory
  3. Revisit key serialization support. I know that users can have biz data in keys, but I don't know if it's common. @leonardBang @PatrickRen does Kafka supports mapping message key to Flink SQL columns?

@tisonkun
Copy link
Member Author

This patch itself is ready for review and technically mergable. Feel free to leave a comment.

cc @nlu90.

* <p>By default all {@link org.apache.flink.connector.pulsar.common.config.PulsarOptions} are
* included in the validateExcept() method./p>
*/
public static final ConfigOption<String> ADMIN_URL =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should look into removing the admin url

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I saw your issue here - https://issues.apache.org/jira/browse/FLINK-32938.

This is an inherited issue that we should handle it separately.

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tisonkun for the great work, I'm not familiar with Pulsar, I try to review this PR and left some comments.

private PulsarTableOptions() {}

public static final ConfigOption<List<String>> TOPICS =
ConfigOptions.key("topics")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use topic and topic-pattern in kafka connector, I wonder could we unified this concept?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Support topic-pattern can be an advanced feature. I don't think it's a blocker of this patch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can file a JIRA ticket to support "topic-pattern" in the SQL connector. It's already supported by the DataStream connector.

Comment on lines +65 to +81
ConfigOptions.key("source.subscription-type")
.enumType(SubscriptionType.class)
.defaultValue(SubscriptionType.Exclusive)
.withDescription(
Description.builder()
.text(
"The [subscription type](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-subscriptions) that is supported by the [Pulsar DataStream source connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source). Currently, only %s and %s subscription types are supported.",
code("Exclusive"), code("Shared"))
.build());

/**
* Exactly same as {@link
* org.apache.flink.connector.pulsar.source.PulsarSourceOptions#PULSAR_SUBSCRIPTION_NAME}.
* Copied because we want to have a default value for it.
*/
public static final ConfigOption<String> SOURCE_SUBSCRIPTION_NAME =
ConfigOptions.key("source.subscription-name")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused about the two configurations from users' perspective, why subscription-type has a default value and the later one does not?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are totally different concepts. You may think of Flink deployment type can be default to application mode, but there is no reason for a default deployment name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... and we actually fallback to a default subscription name in code:

                        .getOptional(SOURCE_SUBSCRIPTION_NAME)
                        .orElse(DEFAULT_SUBSCRIPTION_NAME_PREFIX + randomAlphabetic(5)));

Comment on lines +101 to +103
code("earliest"),
code("latest"),
code("ledgerId:entryId:partitionId"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The earliest and latest worth a public configuration for users from my side, these startup modes are similar to specific-offset in kafka connector. I don't think the configuration in Kafka connector is good one, I just want to align them for decreasing the users' understanding cost.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand your suggestion. Earliest and lastest can be regarded as specific positions that the Pulsar broker can understand. They are barely positions that don't need a separate config option.

pom.xml Outdated Show resolved Hide resolved
Signed-off-by: tison <wander4096@gmail.com>
@tisonkun
Copy link
Member Author

@leonardBang All comments addressed. Also pushed a commit for including upsert mode code. Please give another look.

Signed-off-by: tison <wander4096@gmail.com>
Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tisonkun for the update and great work, LGTM.
Btw, the task to for sql jar and task for docs can start as well next.

@tisonkun tisonkun merged commit 75c79ad into apache:main Aug 31, 2023
3 checks passed
@tisonkun tisonkun deleted the table-connector branch August 31, 2023 02:50
tisonkun added a commit that referenced this pull request Aug 31, 2023
Signed-off-by: tison <wander4096@gmail.com>
Co-authored-by: Christophe Bornet <cbornet@hotmail.com>
Co-authored-by: Yufei Zhang <affeisme@gmail.com>
Co-authored-by: Yufan Sheng <yufan@streamnative.io>
@tisonkun
Copy link
Member Author

@leonardBang Thank you! Merged.

sql jar and task for docs can start as well next

Yep. Let me create the related JIRA tickets..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants