Skip to content

Commit c203ef5

Browse files
authored
[hotfix][kafka] Fix the problem that the partition information cannot be obtained when kafka is restored (#4764)
1 parent 23ed0fc commit c203ef5

File tree

3 files changed

+26
-23
lines changed

3 files changed

+26
-23
lines changed

seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@
3636
import java.util.Arrays;
3737
import java.util.Collection;
3838
import java.util.HashMap;
39+
import java.util.HashSet;
3940
import java.util.List;
4041
import java.util.Map;
42+
import java.util.Objects;
4143
import java.util.Properties;
4244
import java.util.Set;
4345
import java.util.concurrent.ExecutionException;
@@ -57,18 +59,19 @@ public class KafkaSourceSplitEnumerator
5759
private final ConsumerMetadata metadata;
5860
private final Context<KafkaSourceSplit> context;
5961
private long discoveryIntervalMillis;
60-
private AdminClient adminClient;
62+
private final AdminClient adminClient;
6163

62-
private Map<TopicPartition, KafkaSourceSplit> pendingSplit;
64+
private final Map<TopicPartition, KafkaSourceSplit> pendingSplit;
6365
private final Map<TopicPartition, KafkaSourceSplit> assignedSplit;
6466
private ScheduledExecutorService executor;
65-
private ScheduledFuture scheduledFuture;
67+
private ScheduledFuture<?> scheduledFuture;
6668

6769
KafkaSourceSplitEnumerator(ConsumerMetadata metadata, Context<KafkaSourceSplit> context) {
6870
this.metadata = metadata;
6971
this.context = context;
7072
this.assignedSplit = new HashMap<>();
7173
this.pendingSplit = new HashMap<>();
74+
this.adminClient = initAdminClient(this.metadata.getProperties());
7275
}
7376

7477
KafkaSourceSplitEnumerator(
@@ -97,7 +100,6 @@ public class KafkaSourceSplitEnumerator
97100

98101
@Override
99102
public void open() {
100-
this.adminClient = initAdminClient(this.metadata.getProperties());
101103
if (discoveryIntervalMillis > 0) {
102104
this.executor =
103105
Executors.newScheduledThreadPool(
@@ -180,7 +182,6 @@ public void close() throws IOException {
180182
public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
181183
if (!splits.isEmpty()) {
182184
pendingSplit.putAll(convertToNextSplit(splits));
183-
assignSplit();
184185
}
185186
}
186187

@@ -191,6 +192,7 @@ public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
191192
listOffsets(
192193
splits.stream()
193194
.map(KafkaSourceSplit::getTopicPartition)
195+
.filter(Objects::nonNull)
194196
.collect(Collectors.toList()),
195197
OffsetSpec.latest());
196198
splits.forEach(
@@ -199,7 +201,7 @@ public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
199201
split.setEndOffset(listOffsets.get(split.getTopicPartition()));
200202
});
201203
return splits.stream()
202-
.collect(Collectors.toMap(split -> split.getTopicPartition(), split -> split));
204+
.collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, split -> split));
203205
} catch (Exception e) {
204206
throw new KafkaConnectorException(
205207
KafkaConnectorErrorCode.ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED, e);
@@ -225,7 +227,7 @@ public void registerReader(int subtaskId) {
225227

226228
@Override
227229
public KafkaSourceState snapshotState(long checkpointId) throws Exception {
228-
return new KafkaSourceState(assignedSplit.values().stream().collect(Collectors.toSet()));
230+
return new KafkaSourceState(new HashSet<>(assignedSplit.values()));
229231
}
230232

231233
@Override
@@ -291,18 +293,12 @@ private synchronized void assignSplit() {
291293
readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
292294
}
293295

294-
pendingSplit
295-
.entrySet()
296-
.forEach(
297-
s -> {
298-
if (!assignedSplit.containsKey(s.getKey())) {
299-
readySplit
300-
.get(
301-
getSplitOwner(
302-
s.getKey(), context.currentParallelism()))
303-
.add(s.getValue());
304-
}
305-
});
296+
pendingSplit.forEach(
297+
(key, value) -> {
298+
if (!assignedSplit.containsKey(key)) {
299+
readySplit.get(getSplitOwner(key, context.currentParallelism())).add(value);
300+
}
301+
});
306302

307303
readySplit.forEach(
308304
(id, split) -> {

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@
2525
import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
2626
import org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
2727

28+
import lombok.extern.slf4j.Slf4j;
29+
2830
import java.util.Collections;
2931
import java.util.HashSet;
3032
import java.util.List;
3133
import java.util.Set;
3234

35+
@Slf4j
3336
public class SeaTunnelSplitEnumeratorContext<SplitT extends SourceSplit>
3437
implements SourceSplitEnumerator.Context<SplitT> {
3538

@@ -60,6 +63,10 @@ public Set<Integer> registeredReaders() {
6063

6164
@Override
6265
public void assignSplit(int subtaskIndex, List<SplitT> splits) {
66+
if (registeredReaders().isEmpty()) {
67+
log.warn("No reader is obtained, skip this assign!");
68+
return;
69+
}
6370
task.getExecutionContext()
6471
.sendToMember(
6572
new AssignSplitOperation<>(

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ private void register() {
168168
enumeratorTaskAddress)
169169
.get();
170170
} catch (InterruptedException | ExecutionException e) {
171-
log.warn("source register failed {}", e);
171+
log.warn("source register failed.", e);
172172
throw new RuntimeException(e);
173173
}
174174
}
@@ -182,7 +182,7 @@ public void requestSplit() {
182182
enumeratorTaskAddress)
183183
.get();
184184
} catch (InterruptedException | ExecutionException e) {
185-
log.warn("source request split failed [{}]", e);
185+
log.warn("source request split failed.", e);
186186
throw new RuntimeException(e);
187187
}
188188
}
@@ -197,7 +197,7 @@ public void sendSourceEventToEnumerator(SourceEvent sourceEvent) {
197197
enumeratorTaskAddress)
198198
.get();
199199
} catch (InterruptedException | ExecutionException e) {
200-
log.warn("source request split failed {}", e);
200+
log.warn("source request split failed.", e);
201201
throw new RuntimeException(e);
202202
}
203203
}
@@ -263,7 +263,7 @@ public void restoreState(List<ActionSubtaskState> actionStateList) throws Except
263263
enumeratorTaskAddress)
264264
.get();
265265
} catch (InterruptedException | ExecutionException e) {
266-
log.warn("source request split failed {}", e);
266+
log.warn("source request split failed.", e);
267267
throw new RuntimeException(e);
268268
}
269269
}

0 commit comments

Comments
 (0)