Skip to content

Commit 68b0504

Browse files
authored
[Feature][Connectors-V2] Add end_timestamp for timstamp start mode (#9318)
1 parent 0e3c1bc commit 68b0504

File tree

9 files changed

+191
-24
lines changed

9 files changed

+191
-24
lines changed

docs/en/connector-v2/source/Kafka.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
5252
| start_mode | StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] | No | group_offsets | The initial consumption pattern of consumers. |
5353
| start_mode.offsets | Config | No | - | The offset required for consumption mode to be specific_offsets. |
5454
| start_mode.timestamp | Long | No | - | The time required for consumption mode to be "timestamp". |
55+
| start_mode.end_timestamp | Long | No | - | The end time required for consumption mode to be "timestamp" in batch mode
5556
| partition-discovery.interval-millis | Long | No | -1 | The interval for dynamically discovering topics and partitions. |
5657
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details |
5758
| protobuf_message_name | String | No | - | Effective when the format is set to protobuf, specifies the Message name |

docs/zh/connector-v2/source/Kafka.md

Lines changed: 25 additions & 24 deletions
Large diffs are not rendered by default.

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaSourceOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,4 +109,11 @@ public class KafkaSourceOptions extends KafkaBaseOptions {
109109
"The processing method of data format error. The default value is fail, and the optional value is (fail, skip). "
110110
+ "When fail is selected, data format error will block and an exception will be thrown. "
111111
+ "When skip is selected, data format error will skip this line data.");
112+
113+
public static final Option<Long> START_MODE_END_TIMESTAMP =
114+
Options.key("start_mode.end_timestamp")
115+
.longType()
116+
.noDefaultValue()
117+
.withDescription(
118+
"The time required for consumption mode to be timestamp.The endTimestamp configuration specifies the end timestamp of the messages and is only applicable in batch mode");
112119
}

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,21 @@ public static DefaultSeaTunnelRowSerializer create(
9292
headersExtractor(rowType));
9393
}
9494

95+
public static DefaultSeaTunnelRowSerializer create(
96+
String topic,
97+
MessageFormat format,
98+
SeaTunnelRowType rowType,
99+
String delimiter,
100+
ReadonlyConfig pluginConfig) {
101+
return new DefaultSeaTunnelRowSerializer(
102+
topicExtractor(topic, rowType, format),
103+
partitionExtractor(null),
104+
timestampExtractor(rowType),
105+
keyExtractor(null, rowType, format, null, null),
106+
valueExtractor(rowType, format, delimiter, pluginConfig),
107+
headersExtractor());
108+
}
109+
95110
public static DefaultSeaTunnelRowSerializer create(
96111
String topic,
97112
SeaTunnelRowType rowType,

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,5 @@ public class ConsumerMetadata implements Serializable {
4242
private Long startOffsetsTimestamp;
4343
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
4444
private CatalogTable catalogTable;
45+
private Long endOffsetsTimestamp;
4546
}

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import java.util.HashMap;
6666
import java.util.List;
6767
import java.util.Map;
68+
import java.util.Objects;
6869
import java.util.Optional;
6970
import java.util.Properties;
7071
import java.util.stream.Collectors;
@@ -92,6 +93,7 @@
9293
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.PROTOBUF_SCHEMA;
9394
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.READER_CACHE_QUEUE_SIZE;
9495
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE;
96+
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_END_TIMESTAMP;
9597
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_OFFSETS;
9698
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.START_MODE_TIMESTAMP;
9799
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.TOPIC;
@@ -187,6 +189,18 @@ private ConsumerMetadata createConsumerMetadata(ReadonlyConfig readonlyConfig) {
187189
}
188190
consumerMetadata.setStartOffsetsTimestamp(
189191
startOffsetsTimestamp);
192+
if (Objects.nonNull(
193+
readonlyConfig.get(START_MODE_END_TIMESTAMP))) {
194+
long endOffsetsTimestamp =
195+
readonlyConfig.get(START_MODE_END_TIMESTAMP);
196+
if (endOffsetsTimestamp < 0
197+
|| endOffsetsTimestamp > currentTimestamp) {
198+
throw new IllegalArgumentException(
199+
"start_mode.endTimestamp The value is smaller than 0 or smaller than the current time");
200+
}
201+
consumerMetadata.setEndOffsetsTimestamp(
202+
endOffsetsTimestamp);
203+
}
190204
break;
191205
case SPECIFIC_OFFSETS:
192206
// Key is topic-partition, value is offset

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ public void run() throws ExecutionException, InterruptedException {
155155
private void setPartitionStartOffset() throws ExecutionException, InterruptedException {
156156
Set<TopicPartition> pendingTopicPartitions = pendingSplit.keySet();
157157
Map<TopicPartition, Long> topicPartitionOffsets = new HashMap<>();
158+
Map<TopicPartition, Long> topicPartitionEndOffsets = new HashMap<>();
158159
// Set kafka TopicPartition based on the topicPath granularity
159160
Map<TablePath, Set<TopicPartition>> tablePathPartitionMap =
160161
pendingTopicPartitions.stream()
@@ -182,6 +183,13 @@ private void setPartitionStartOffset() throws ExecutionException, InterruptedExc
182183
listOffsets(
183184
topicPartitions,
184185
OffsetSpec.forTimestamp(metadata.getStartOffsetsTimestamp())));
186+
if (Objects.nonNull(metadata.getEndOffsetsTimestamp())) {
187+
topicPartitionEndOffsets.putAll(
188+
listOffsets(
189+
topicPartitions,
190+
OffsetSpec.forTimestamp(
191+
metadata.getEndOffsetsTimestamp())));
192+
}
185193
break;
186194
case SPECIFIC_OFFSETS:
187195
topicPartitionOffsets.putAll(metadata.getSpecificStartOffsets());
@@ -197,6 +205,14 @@ private void setPartitionStartOffset() throws ExecutionException, InterruptedExc
197205
pendingSplit.get(key).setStartOffset(value);
198206
}
199207
});
208+
if (!isStreamingMode && !topicPartitionEndOffsets.isEmpty()) {
209+
topicPartitionEndOffsets.forEach(
210+
(key, value) -> {
211+
if (pendingSplit.containsKey(key)) {
212+
pendingSplit.get(key).setEndOffset(value);
213+
}
214+
});
215+
}
200216
}
201217

202218
@Override

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,22 @@ public void testSourceKafka(TestContainer container) throws IOException, Interru
409409
testKafkaTimestampToConsole(container);
410410
}
411411

412+
@TestTemplate
413+
public void testSourceKafkaWithEndTimestamp(TestContainer container)
414+
throws IOException, InterruptedException {
415+
DefaultSeaTunnelRowSerializer serializer =
416+
DefaultSeaTunnelRowSerializer.create(
417+
"test_topic_source",
418+
DEFAULT_FORMAT,
419+
new SeaTunnelRowType(
420+
new String[] {"id", "timestamp"},
421+
new SeaTunnelDataType[] {BasicType.LONG_TYPE, BasicType.LONG_TYPE}),
422+
"",
423+
null);
424+
generateWithTimestampTestData(serializer::serializeRow, 0, 100, 1738395840000L);
425+
testKafkaWithEndTimestampToConsole(container);
426+
}
427+
412428
@TestTemplate
413429
public void testSourceKafkaStartConfig(TestContainer container)
414430
throws IOException, InterruptedException {
@@ -1084,6 +1100,13 @@ public void testKafkaTimestampToConsole(TestContainer container)
10841100
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
10851101
}
10861102

1103+
public void testKafkaWithEndTimestampToConsole(TestContainer container)
1104+
throws IOException, InterruptedException {
1105+
Container.ExecResult execResult =
1106+
container.executeJob("/kafka/kafkasource_endTimestamp_to_console.conf");
1107+
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
1108+
}
1109+
10871110
private AdminClient createKafkaAdmin() {
10881111
Properties props = new Properties();
10891112
String bootstrapServers = kafkaContainer.getBootstrapServers();
@@ -1163,6 +1186,21 @@ private void generateTestData(ProducerRecordConverter converter, int start, int
11631186
producer.flush();
11641187
}
11651188

1189+
private void generateWithTimestampTestData(
1190+
ProducerRecordConverter converter, int start, int end, long startTimestamp) {
1191+
try {
1192+
for (int i = start; i < end; i++) {
1193+
SeaTunnelRow row =
1194+
new SeaTunnelRow(new Object[] {Long.valueOf(i), startTimestamp + i * 1000});
1195+
ProducerRecord<byte[], byte[]> producerRecord = converter.convert(row);
1196+
producer.send(producerRecord).get();
1197+
}
1198+
} catch (Exception e) {
1199+
throw new RuntimeException(e);
1200+
}
1201+
producer.flush();
1202+
}
1203+
11661204
private void generateNativeTestData(String topic, int start, int end) {
11671205
try {
11681206
for (int i = start; i < end; i++) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
######
18+
###### This config file is a demonstration of streaming processing in seatunnel config
19+
######
20+
21+
env {
22+
parallelism = 1
23+
job.mode = "BATCH"
24+
}
25+
26+
source {
27+
Kafka {
28+
bootstrap.servers = "kafkaCluster:9092"
29+
topic = "test_topic_source"
30+
plugin_output = "kafka_table"
31+
# The default format is json, which is optional
32+
format = json
33+
start_mode = timestamp
34+
schema = {
35+
fields {
36+
id = bigint
37+
}
38+
}
39+
start_mode.timestamp = 1738395840000
40+
start_mode.end_timestamp= 1738395900000
41+
}
42+
43+
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
44+
# please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
45+
}
46+
47+
transform {
48+
}
49+
50+
sink {
51+
Assert {
52+
plugin_input = "kafka_table"
53+
rules =
54+
{
55+
field_rules = [
56+
{
57+
field_name = id
58+
field_type = bigint
59+
field_value = [
60+
61+
{
62+
rule_type = MIN
63+
rule_value = 0
64+
},
65+
{
66+
rule_type = MAX
67+
rule_value = 60
68+
}
69+
]
70+
}
71+
]
72+
}
73+
}
74+
}

0 commit comments

Comments
 (0)