Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] Use new Factory API to create KafkaSource #5635

Merged
merged 1 commit into from
Oct 21, 2023

Conversation

ruanwenjun
Copy link
Member

Purpose of this pull request

KafkaSource use new Factory API to create, after this change the CatalogTable can be send to post task.

Does this PR introduce any user-facing change?

How was this patch tested?

Use e2e case to assert the catalog generate by KafkaSource

Check list

@ruanwenjun ruanwenjun changed the title KafkaSource use Factory to create source Use new Factory API to create KafkaSource Oct 16, 2023
@ruanwenjun ruanwenjun force-pushed the dev_wenjun_KafakSourceUseFactory branch 2 times, most recently from 9fbc1d0 to 5b12a51 Compare October 16, 2023 09:42
@ruanwenjun ruanwenjun force-pushed the dev_wenjun_KafakSourceUseFactory branch from 5b12a51 to 990ae3b Compare October 16, 2023 11:17
@ruanwenjun ruanwenjun changed the title Use new Factory API to create KafkaSource [Improve] Use new Factory API to create KafkaSource Oct 16, 2023
@@ -105,127 +67,36 @@ public String getPluginName() {

@Override
public void prepare(Config config) throws PrepareFailException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this method ?

#5625

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

setDeserialization(config);
kafkaSourceConfig = new KafkaSourceConfig(ReadonlyConfig.fromConfig(config));
}

@Override
public SeaTunnelRowType getProducedType() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this method ?

#5625

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can not be removed for now. I need to do some extra work on the translation layer in the next PR before I can properly remove it. For this PR, it is better to keep it.

@@ -66,6 +73,10 @@ public <T> T get(Option<T> option) {
return getOptional(option).orElseGet(option::defaultValue);
}

public <T> T get(Option<T> option, T defaultValue) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the following:

  1. Define default value in Option

  2. getOptional(Option).orElse(defaultValue)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's better way, I removed this method.

@ruanwenjun ruanwenjun force-pushed the dev_wenjun_KafakSourceUseFactory branch from 990ae3b to 9b4b165 Compare October 19, 2023 03:01
Comment on lines 101 to 108
CheckResult result = readonlyConfig.checkAllExists(TOPIC, BOOTSTRAP_SERVERS);
if (!result.isSuccess()) {
throw new KafkaConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
CONNECTOR_IDENTITY, PluginType.SOURCE, result.getMsg()));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think checkAllExists is unnecessary. As ReadonlyConfig, if it implement TableSourceFactory, we will check whether all configs are legal. Please refer

ConfigValidator.of(context.getOptions()).validate(factory.optionRule());

c_bytes = bytes
c_date = date
c_timestamp = timestamp
columns = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you create new conf not replace old one? I think we should make sure both two config way worked fine for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with you

@ruanwenjun ruanwenjun force-pushed the dev_wenjun_KafakSourceUseFactory branch 2 times, most recently from 410dd3c to 5d7344c Compare October 19, 2023 13:21
@ruanwenjun ruanwenjun force-pushed the dev_wenjun_KafakSourceUseFactory branch from 5d7344c to 3834b8a Compare October 20, 2023 05:38
Comment on lines +190 to +191
Collections.emptyMap(),
Collections.emptyList(),
Copy link
Member

@Hisoka-X Hisoka-X Oct 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fine after #5681 merged.

Copy link
Member

@hailin0 hailin0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ruanwenjun ruanwenjun merged commit 1c6176e into apache:dev Oct 21, 2023
5 checks passed
@ruanwenjun ruanwenjun deleted the dev_wenjun_KafakSourceUseFactory branch October 21, 2023 02:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants