-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature][connectors-v2][kafka] Kafka supports custom schema #2371 #2783
Conversation
fields { | ||
name = "string" | ||
age = "int" | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for the Pointers
} | ||
|
||
sink { | ||
Console {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sql { | ||
sql = "select name,age from kafka" | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove or use select * from kafka
selected all fields
}else{ | ||
format = config.getString(FORMAT); | ||
this.deserializationSchema = null; | ||
} | ||
}else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check codestyle
} | ||
} | ||
topic = "test_csv" | ||
bootstrap.server = "hadoop101:9092,hadoop102:9092,hadoop103:9092" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- You need to start the kafka test container in the test case
- Connect your test container and initialize the data
- Test your logic
reference
https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbSourceToConsoleIT.java#L70
https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_to_console.conf#L33
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I'll try to set it up
result_table_name = "kafka" | ||
schema = { | ||
fields { | ||
name = "string" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
} | ||
} | ||
topic = "test_csv" | ||
bootstrap.server = "hadoop101:9092,hadoop102:9092,hadoop103:9092" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
|
||
transform { | ||
sql { | ||
sql = "select name,age from kafka" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
|
||
sink { | ||
Console {} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
c4321a4
to
7f30f2a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move e2e IT to seatunnel-connector-v2-e2e module? you can see #2924.
Can I submit this first? The branches are a bit messy |
87c53d3
to
aef883c
Compare
0a8c1b3
to
9e804ca
Compare
Please make sure CI passed first, thanks. |
6cdeaa6
to
0dafb56
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) | ||
.withNetwork(NETWORK) | ||
.withNetworkAliases(KAFKA_HOST) | ||
.withLogConsumer(new Slf4jLogConsumer(log)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.withLogConsumer(new Slf4jLogConsumer(log)); | |
.withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1"))); |
kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) | ||
.withNetwork(NETWORK) | ||
.withNetworkAliases(KAFKA_HOST) | ||
.withLogConsumer(new Slf4jLogConsumer(log)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.withLogConsumer(new Slf4jLogConsumer(log)); | |
.withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1"))); |
kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) | ||
.withNetwork(NETWORK) | ||
.withNetworkAliases(KAFKA_HOST) | ||
.withLogConsumer(new Slf4jLogConsumer(log)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.withLogConsumer(new Slf4jLogConsumer(log)); | |
.withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1"))); |
...rk-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
Outdated
Show resolved
Hide resolved
532fcd8
to
ba4b6c4
Compare
} | ||
} else { | ||
typeInfo = SeaTunnelSchema.buildSimpleTextSchema(); | ||
this.deserializationSchema = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.deserializationSchema = null; | |
this.deserializationSchema = TextDeserializationSchema.builder() | |
.seaTunnelRowType(typeInfo) | |
.delimiter(String.valueOf('\002')) | |
.build(); |
If user does not assign schema option, so connector will treat data as the following shown:
content |
---|
xxxxxxxx |
So the upstream data does not need to be delimited, so just pass in an impossible delimiter when initialization the TextDeserializationSchema.
if (deserializationSchema != null) { | ||
deserializationSchema.deserialize(record.value(), output); | ||
} else { | ||
String content = stringDeserializer.deserialize(partition.topic(), record.value()); | ||
output.collect(new SeaTunnelRow(new Object[]{content})); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (deserializationSchema != null) { | |
deserializationSchema.deserialize(record.value(), output); | |
} else { | |
String content = stringDeserializer.deserialize(partition.topic(), record.value()); | |
output.collect(new SeaTunnelRow(new Object[]{content})); | |
} | |
deserializationSchema.deserialize(record.value(), output) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you for your contribution. Let's waiting CI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, waiting CI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Thank you for your contribution. CC @Hisoka-X
Purpose of this pull request
[Feature][seatunnel-connectors-v2][connector-kafka] Kafka supports custom schema #2371
The last PR that was closed #2720
Check list
New License Guide
此拉取请求的目的
Kafka 支持自定义Schema #2371
上一个关闭的PR #2720
检查列表
添加e2e测试
添加example测试