Skip to content

Commit 1784c01

Browse files
authored
[Fix][Connector-V2] Fix kafka batch mode can not read all message (#7135)
1 parent bb9fd51 commit 1784c01

File tree

12 files changed

+237
-44
lines changed

12 files changed

+237
-44
lines changed

.github/workflows/backend.yml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,7 +1052,7 @@ jobs:
10521052

10531053
kafka-connector-it:
10541054
needs: [ changes, sanity-check ]
1055-
if: needs.changes.outputs.api == 'true'
1055+
if: needs.changes.outputs.api == 'true' || contains(needs.changes.outputs.it-modules, 'connector-kafka-e2e')
10561056
runs-on: ${{ matrix.os }}
10571057
strategy:
10581058
matrix:
@@ -1068,15 +1068,14 @@ jobs:
10681068
distribution: 'temurin'
10691069
cache: 'maven'
10701070
- name: run kafka connector integration test
1071-
if: needs.changes.outputs.api == 'true'
10721071
run: |
10731072
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-kafka-e2e -am -Pci
10741073
env:
10751074
MAVEN_OPTS: -Xmx4096m
10761075

10771076
rocketmq-connector-it:
10781077
needs: [ changes, sanity-check ]
1079-
if: needs.changes.outputs.api == 'true'
1078+
if: needs.changes.outputs.api == 'true' || contains(needs.changes.outputs.it-modules, 'connector-rocketmq-e2e')
10801079
runs-on: ${{ matrix.os }}
10811080
strategy:
10821081
matrix:
@@ -1092,7 +1091,6 @@ jobs:
10921091
distribution: 'temurin'
10931092
cache: 'maven'
10941093
- name: run rocket connector integration test
1095-
if: needs.changes.outputs.api == 'true'
10961094
run: |
10971095
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-rocketmq-e2e -am -Pci
10981096
env:

docs/en/connector-v2/formats/avro.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ source {
7777
bootstrap.servers = "kafkaCluster:9092"
7878
topic = "test_avro_topic"
7979
result_table_name = "kafka_table"
80-
kafka.auto.offset.reset = "earliest"
80+
start_mode = "earliest"
8181
format = avro
8282
format_error_handle_way = skip
8383
schema = {

docs/zh/connector-v2/formats/avro.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ source {
7777
bootstrap.servers = "kafkaCluster:9092"
7878
topic = "test_avro_topic"
7979
result_table_name = "kafka_table"
80-
kafka.auto.offset.reset = "earliest"
80+
start_mode = "earliest"
8181
format = avro
8282
format_error_handle_way = skip
8383
schema = {

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.Set;
4646
import java.util.concurrent.CompletableFuture;
4747
import java.util.concurrent.ConcurrentHashMap;
48+
import java.util.concurrent.CopyOnWriteArrayList;
4849
import java.util.concurrent.ExecutorService;
4950
import java.util.concurrent.Executors;
5051
import java.util.concurrent.LinkedBlockingQueue;
@@ -103,7 +104,7 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
103104
return;
104105
}
105106

106-
while (pendingPartitionsQueue.size() != 0) {
107+
while (!pendingPartitionsQueue.isEmpty()) {
107108
sourceSplits.add(pendingPartitionsQueue.poll());
108109
}
109110
sourceSplits.forEach(
@@ -120,9 +121,10 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
120121
executorService.submit(thread);
121122
return thread;
122123
}));
124+
List<KafkaSourceSplit> finishedSplits = new CopyOnWriteArrayList<>();
123125
sourceSplits.forEach(
124126
sourceSplit -> {
125-
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
127+
CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
126128
TablePath tablePath = sourceSplit.getTablePath();
127129
DeserializationSchema<SeaTunnelRow> deserializationSchema =
128130
tablePathMetadataMap.get(tablePath).getDeserializationSchema();
@@ -148,9 +150,14 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
148150
for (TopicPartition partition : partitions) {
149151
List<ConsumerRecord<byte[], byte[]>>
150152
recordList = records.records(partition);
153+
if (Boundedness.BOUNDED.equals(
154+
context.getBoundedness())
155+
&& recordList.isEmpty()) {
156+
completableFuture.complete(true);
157+
return;
158+
}
151159
for (ConsumerRecord<byte[], byte[]> record :
152160
recordList) {
153-
154161
try {
155162
if (deserializationSchema
156163
instanceof
@@ -180,7 +187,8 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
180187
&& record.offset()
181188
>= sourceSplit
182189
.getEndOffset()) {
183-
break;
190+
completableFuture.complete(true);
191+
return;
184192
}
185193
}
186194
long lastOffset = -1;
@@ -199,18 +207,21 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
199207
} catch (Exception e) {
200208
completableFuture.completeExceptionally(e);
201209
}
202-
completableFuture.complete(null);
210+
completableFuture.complete(false);
203211
});
204-
} catch (InterruptedException e) {
212+
if (completableFuture.get()) {
213+
finishedSplits.add(sourceSplit);
214+
}
215+
} catch (Exception e) {
205216
throw new KafkaConnectorException(
206217
KafkaConnectorErrorCode.CONSUME_DATA_FAILED, e);
207218
}
208-
completableFuture.join();
209219
});
210-
211220
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
212-
// signal to the source that we have reached the end of the data.
213-
context.signalNoMoreElement();
221+
finishedSplits.forEach(sourceSplits::remove);
222+
if (sourceSplits.isEmpty()) {
223+
context.signalNoMoreElement();
224+
}
214225
}
215226
}
216227

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,27 @@ public void testSourceKafkaTextToConsole(TestContainer container)
212212
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
213213
}
214214

215+
@TestTemplate
216+
public void testSourceKafkaToAssertWithMaxPollRecords1(TestContainer container)
217+
throws IOException, InterruptedException {
218+
TextSerializationSchema serializer =
219+
TextSerializationSchema.builder()
220+
.seaTunnelRowType(SEATUNNEL_ROW_TYPE)
221+
.delimiter(",")
222+
.build();
223+
generateTestData(
224+
row ->
225+
new ProducerRecord<>(
226+
"test_topic_text_max_poll_records_1",
227+
null,
228+
serializer.serialize(row)),
229+
0,
230+
100);
231+
Container.ExecResult execResult =
232+
container.executeJob("/kafka/kafka_source_to_assert_with_max_poll_records_1.conf");
233+
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
234+
}
235+
215236
@TestTemplate
216237
public void testSourceKafkaTextToConsoleAssertCatalogTable(TestContainer container)
217238
throws IOException, InterruptedException {
@@ -538,29 +559,34 @@ private Properties kafkaByteConsumerConfig() {
538559
}
539560

540561
private void generateTestData(ProducerRecordConverter converter, int start, int end) {
541-
for (int i = start; i < end; i++) {
542-
SeaTunnelRow row =
543-
new SeaTunnelRow(
544-
new Object[] {
545-
Long.valueOf(i),
546-
Collections.singletonMap("key", Short.parseShort("1")),
547-
new Byte[] {Byte.parseByte("1")},
548-
"string",
549-
Boolean.FALSE,
550-
Byte.parseByte("1"),
551-
Short.parseShort("1"),
552-
Integer.parseInt("1"),
553-
Long.parseLong("1"),
554-
Float.parseFloat("1.1"),
555-
Double.parseDouble("1.1"),
556-
BigDecimal.valueOf(11, 1),
557-
"test".getBytes(),
558-
LocalDate.of(2024, 1, 1),
559-
LocalDateTime.of(2024, 1, 1, 12, 59, 23)
560-
});
561-
ProducerRecord<byte[], byte[]> producerRecord = converter.convert(row);
562-
producer.send(producerRecord);
562+
try {
563+
for (int i = start; i < end; i++) {
564+
SeaTunnelRow row =
565+
new SeaTunnelRow(
566+
new Object[] {
567+
Long.valueOf(i),
568+
Collections.singletonMap("key", Short.parseShort("1")),
569+
new Byte[] {Byte.parseByte("1")},
570+
"string",
571+
Boolean.FALSE,
572+
Byte.parseByte("1"),
573+
Short.parseShort("1"),
574+
Integer.parseInt("1"),
575+
Long.parseLong("1"),
576+
Float.parseFloat("1.1"),
577+
Double.parseDouble("1.1"),
578+
BigDecimal.valueOf(11, 1),
579+
"test".getBytes(),
580+
LocalDate.of(2024, 1, 1),
581+
LocalDateTime.of(2024, 1, 1, 12, 59, 23)
582+
});
583+
ProducerRecord<byte[], byte[]> producerRecord = converter.convert(row);
584+
producer.send(producerRecord).get();
585+
}
586+
} catch (Exception e) {
587+
throw new RuntimeException(e);
563588
}
589+
producer.flush();
564590
}
565591

566592
private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE =

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_assert.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ source {
3232
bootstrap.servers = "kafkaCluster:9092"
3333
topic = "test_avro_topic"
3434
result_table_name = "kafka_table"
35-
kafka.auto.offset.reset = "earliest"
35+
start_mode = "earliest"
3636
format = avro
3737
format_error_handle_way = skip
3838
schema = {

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ source {
3232
bootstrap.servers = "kafkaCluster:9092"
3333
topic = "test_topic_json"
3434
result_table_name = "kafka_table"
35-
kafka.auto.offset.reset = "earliest"
35+
start_mode = "earliest"
3636
format_error_handle_way = skip
3737
schema = {
3838
fields {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = "BATCH"
21+
22+
# You can set spark configuration here
23+
spark.app.name = "SeaTunnel"
24+
spark.executor.instances = 1
25+
spark.executor.cores = 1
26+
spark.executor.memory = "1g"
27+
spark.master = local
28+
}
29+
30+
source {
31+
Kafka {
32+
bootstrap.servers = "kafkaCluster:9092"
33+
topic = "test_topic_text_max_poll_records_1"
34+
result_table_name = "kafka_table"
35+
start_mode = "earliest"
36+
format_error_handle_way = fail
37+
kafka.config = {
38+
max.poll.records = 1
39+
}
40+
schema = {
41+
columns = [
42+
{
43+
name = id
44+
type = bigint
45+
}
46+
{
47+
name = c_map
48+
type = "map<string, smallint>"
49+
}
50+
{
51+
name = c_array
52+
type = "array<tinyint>"
53+
}
54+
{
55+
name = c_string
56+
type = "string"
57+
}
58+
{
59+
name = c_boolean
60+
type = "boolean"
61+
}
62+
{
63+
name = c_tinyint
64+
type = "tinyint"
65+
}
66+
{
67+
name = c_smallint
68+
type = "smallint"
69+
}
70+
{
71+
name = c_int
72+
type = "int"
73+
}
74+
{
75+
name = c_bigint
76+
type = "bigint"
77+
}
78+
{
79+
name = c_float
80+
type = "float"
81+
}
82+
{
83+
name = c_double
84+
type = "double"
85+
}
86+
{
87+
name = c_decimal
88+
type = "decimal(2, 1)"
89+
}
90+
{
91+
name = c_bytes
92+
type = "bytes"
93+
}
94+
{
95+
name = c_date
96+
type = "date"
97+
}
98+
{
99+
name = c_timestamp
100+
type = "timestamp"
101+
}
102+
]
103+
primaryKey = {
104+
name = "primary key"
105+
columnNames = ["id"]
106+
}
107+
constraintKeys = [
108+
{
109+
constraintName = "unique_c_string"
110+
constraintType = UNIQUE_KEY
111+
constraintColumns = [
112+
{
113+
columnName = "c_string"
114+
sortType = ASC
115+
}
116+
]
117+
}
118+
]
119+
}
120+
format = text
121+
field_delimiter = ","
122+
}
123+
}
124+
125+
sink {
126+
console {
127+
source_table_name = "kafka_table"
128+
}
129+
Assert {
130+
source_table_name = "kafka_table"
131+
rules =
132+
{
133+
field_rules = [
134+
{
135+
field_name = id
136+
field_type = bigint
137+
field_value = [
138+
{
139+
rule_type = NOT_NULL
140+
},
141+
{
142+
rule_type = MIN
143+
rule_value = 0
144+
},
145+
{
146+
rule_type = MAX
147+
rule_value = 99
148+
}
149+
]
150+
}
151+
]
152+
row_rules = [
153+
{
154+
rule_type = MIN_ROW
155+
rule_value = 100
156+
}
157+
]
158+
}
159+
}
160+
}

0 commit comments

Comments
 (0)