Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][seatunnel-connectors-v2][connector-kafka] Kafka supports custom schema #2371 #2720

Closed
wants to merge 1 commit into from

Conversation

eyys
Copy link
Contributor

@eyys eyys commented Sep 12, 2022

Purpose of this pull request

[Feature][seatunnel-connectors-v2][connector-kafka] Kafka supports custom schema #2371

Check list

此拉取请求的目的

Kafka 支持自定义Schema #2371

检查列表

添加e2e测试
添加example测试

@@ -0,0 +1,51 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seatunnel-examples module is only used by developers. So test cases should not be submitted

public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSplit, KafkaSourceState> {

private static final String DEFAULT_CONSUMER_GROUP = "SeaTunnel-Consumer-Group";
public class KafkaSource<T> implements SeaTunnelSource<T, KafkaSourceSplit, KafkaSourceState> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change the generic type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refer to the PULSAR implementation

}

@Override
public SeaTunnelRowType getProducedType() {
return this.typeInfo;
public SeaTunnelDataType<T> getProducedType() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same as above.

}

@Override
public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
return new KafkaSourceReader(this.metadata, this.typeInfo, readerContext);
public SourceReader<T, KafkaSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same as above.

} else {
rowType = SeaTunnelSchema.buildSimpleTextSchema();
}
deserializationSchema = (DeserializationSchema<T>) new JsonDeserializationSchema(false, false, rowType);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

T should replaced by SeaTunnelRow

@@ -46,7 +47,7 @@
import java.util.Set;
import java.util.stream.Collectors;

public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSourceSplit> {
public class KafkaSourceReader<T> implements SourceReader<T, KafkaSourceSplit> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same as above.

// TODO support user custom type
private SeaTunnelRowType typeInfo;

private final DeserializationSchema<T> deserializationSchema;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same as above.

@@ -87,7 +89,7 @@ public void close() throws IOException {
}

@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
public void pollNext(Collector<T> output) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same as above.

}else{
String v = stringDeserializer.deserialize(partition.topic(), record.value());
String t = partition.topic();
output.collect((T) new SeaTunnelRow(new Object[]{t, v}));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user does not assign schema, it will generate a simple schema as the following shown:

content
data

So the row only have one field named content, but you generate a row with two fields.

@TyrantLucifer
Copy link
Member

BTW, you should add spark-e2e test cases and update doc of kafka source conenctor too.

Copy link
Member

@ashulin ashulin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to check the code style and add license header.

Copy link
Member

@hailin0 hailin0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add docs & e2e-testcase(spark)?

Comment on lines +33 to +34
name = "string"
age = "int"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test all datatypes?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants