Skip to content

Commit fdf1bea

Browse files
authored
[Improve] Refactor file enumerator to prevent duplicate put split (#8989)
1 parent 5f85d76 commit fdf1bea

File tree

8 files changed

+113
-72
lines changed

8 files changed

+113
-72
lines changed

seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/source/SourceFlowTestUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public EventListener getEventListener() {
131131
return event -> {};
132132
}
133133
});
134+
enumerator.open();
134135
for (int i = 0; i < parallelism; i++) {
135136
int finalI = i;
136137
SourceReader<Object, SourceSplit> reader =
@@ -176,6 +177,7 @@ public EventListener getEventListener() {
176177
readers.add(reader);
177178
enumerator.registerReader(i);
178179
}
180+
enumerator.run();
179181

180182
List<SeaTunnelRow> rows = new ArrayList<>();
181183
while (!unfinishedReaders.isEmpty()) {

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import lombok.Getter;
2323

24+
import java.util.Objects;
25+
2426
public class FileSourceSplit implements SourceSplit {
2527
private static final long serialVersionUID = 1L;
2628

@@ -46,4 +48,21 @@ public String splitId() {
4648
}
4749
return tableId + "_" + filePath;
4850
}
51+
52+
@Override
53+
public boolean equals(Object o) {
54+
if (this == o) {
55+
return true;
56+
}
57+
if (o == null || getClass() != o.getClass()) {
58+
return false;
59+
}
60+
FileSourceSplit that = (FileSourceSplit) o;
61+
return Objects.equals(tableId, that.tableId) && Objects.equals(filePath, that.filePath);
62+
}
63+
64+
@Override
65+
public int hashCode() {
66+
return Objects.hash(tableId, filePath);
67+
}
4968
}

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,17 @@ public MultipleTableFileSourceSplitEnumerator(
7474
this.assignedSplit.addAll(fileSourceState.getAssignedSplit());
7575
}
7676

77+
@Override
78+
public void open() {
79+
for (Map.Entry<String, List<String>> filePathEntry : filePathMap.entrySet()) {
80+
String tableId = filePathEntry.getKey();
81+
List<String> filePaths = filePathEntry.getValue();
82+
for (String filePath : filePaths) {
83+
allSplit.add(new FileSourceSplit(tableId, filePath));
84+
}
85+
}
86+
}
87+
7788
@Override
7889
public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
7990
if (CollectionUtils.isEmpty(splits)) {
@@ -92,16 +103,7 @@ public int currentUnassignedSplitSize() {
92103
public void handleSplitRequest(int subtaskId) {}
93104

94105
@Override
95-
public void registerReader(int subtaskId) {
96-
for (Map.Entry<String, List<String>> filePathEntry : filePathMap.entrySet()) {
97-
String tableId = filePathEntry.getKey();
98-
List<String> filePaths = filePathEntry.getValue();
99-
for (String filePath : filePaths) {
100-
allSplit.add(new FileSourceSplit(tableId, filePath));
101-
}
102-
}
103-
assignSplit(subtaskId);
104-
}
106+
public void registerReader(int subtaskId) {}
105107

106108
@Override
107109
public FileSourceState snapshotState(long checkpointId) {
@@ -149,14 +151,12 @@ private static int getSplitOwner(int assignCount, int numReaders) {
149151
return assignCount % numReaders;
150152
}
151153

152-
@Override
153-
public void open() {
154-
// do nothing
155-
}
156-
157154
@Override
158155
public void run() throws Exception {
159-
// do nothing
156+
for (int i = 0; i < context.currentParallelism(); i++) {
157+
log.info("Assigned splits to reader [{}]", i);
158+
assignSplit(i);
159+
}
160160
}
161161

162162
@Override

seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumeratorTest.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import org.junit.jupiter.api.Assertions;
3030
import org.junit.jupiter.api.Test;
31+
import org.mockito.ArgumentCaptor;
3132
import org.mockito.Mockito;
3233

3334
import lombok.extern.slf4j.Slf4j;
@@ -37,14 +38,13 @@
3738
import java.util.HashMap;
3839
import java.util.List;
3940
import java.util.Map;
40-
import java.util.concurrent.atomic.AtomicInteger;
4141
import java.util.stream.IntStream;
4242

4343
@Slf4j
4444
public class MultipleTableFileSourceSplitEnumeratorTest {
4545

4646
@Test
47-
void assignSplitTest() {
47+
void assignSplitTest() throws Exception {
4848
int parallelism = 4;
4949
int fileSize = 50;
5050

@@ -80,24 +80,25 @@ void assignSplitTest() {
8080
new MultipleTableFileSourceSplitEnumerator(
8181
context, baseMultipleTableFileSourceConfig);
8282

83-
AtomicInteger unAssignedSplitSize = new AtomicInteger(fileSize);
84-
IntStream.range(0, parallelism)
85-
.forEach(
86-
id -> {
87-
enumerator.registerReader(id);
88-
89-
// check the number of files assigned each time
90-
Assertions.assertEquals(
91-
enumerator.currentUnassignedSplitSize(),
92-
unAssignedSplitSize.get()
93-
- allocateFiles(id, parallelism, fileSize));
94-
unAssignedSplitSize.set(enumerator.currentUnassignedSplitSize());
95-
96-
log.info(
97-
"unAssigned splits => {}, allocate files => {}",
98-
enumerator.currentUnassignedSplitSize(),
99-
allocateFiles(id, parallelism, fileSize));
100-
});
83+
enumerator.open();
84+
Assertions.assertEquals(50, enumerator.currentUnassignedSplitSize());
85+
IntStream.range(0, parallelism).forEach(enumerator::registerReader);
86+
enumerator.run();
87+
88+
ArgumentCaptor<Integer> subtaskId = ArgumentCaptor.forClass(Integer.class);
89+
ArgumentCaptor<List> split = ArgumentCaptor.forClass(List.class);
90+
91+
Mockito.verify(context, Mockito.times(parallelism))
92+
.assignSplit(subtaskId.capture(), split.capture());
93+
94+
List<Integer> subTaskAllValues = subtaskId.getAllValues();
95+
List<List> splitAllValues = split.getAllValues();
96+
97+
for (int i = 0; i < parallelism; i++) {
98+
Assertions.assertEquals(i, subTaskAllValues.get(i));
99+
Assertions.assertEquals(
100+
allocateFiles(i, parallelism, fileSize), splitAllValues.get(i).size());
101+
}
101102

102103
// check no duplicate file assigned
103104
Assertions.assertEquals(0, enumerator.currentUnassignedSplitSize());

seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,4 +293,19 @@ void testWriteFileWithCustomFileExtension() throws Exception {
293293

294294
Assertions.assertEquals(0, emptyRows.size());
295295
}
296+
297+
@Test
298+
void testReadOneFileButHasTwoParallelism() throws Exception {
299+
Map<String, Object> readOptions =
300+
new HashMap<String, Object>() {
301+
{
302+
put("path", LocalFileTest.class.getResource("/test_data.txt").getPath());
303+
put("file_format_type", "text");
304+
}
305+
};
306+
List<SeaTunnelRow> rows =
307+
SourceFlowTestUtils.runParallelSubtasksBatchWithCheckpointDisabled(
308+
ReadonlyConfig.fromMap(readOptions), new LocalFileSourceFactory(), 2);
309+
Assertions.assertEquals(3, rows.size());
310+
}
296311
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
1,a,a,1
2+
2,a,a,1
3+
3,a,a,1

seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,17 @@ public MultipleTableHiveSourceSplitEnumerator(
7575
this.assignedSplit.addAll(localFileSourceState.getAssignedSplit());
7676
}
7777

78+
@Override
79+
public void open() {
80+
for (Map.Entry<String, List<String>> filePathEntry : filePathMap.entrySet()) {
81+
String tableId = filePathEntry.getKey();
82+
List<String> filePaths = filePathEntry.getValue();
83+
for (String filePath : filePaths) {
84+
allSplit.add(new FileSourceSplit(tableId, filePath));
85+
}
86+
}
87+
}
88+
7889
@Override
7990
public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
8091
if (CollectionUtils.isEmpty(splits)) {
@@ -93,16 +104,7 @@ public int currentUnassignedSplitSize() {
93104
public void handleSplitRequest(int subtaskId) {}
94105

95106
@Override
96-
public void registerReader(int subtaskId) {
97-
for (Map.Entry<String, List<String>> filePathEntry : filePathMap.entrySet()) {
98-
String tableId = filePathEntry.getKey();
99-
List<String> filePaths = filePathEntry.getValue();
100-
for (String filePath : filePaths) {
101-
allSplit.add(new FileSourceSplit(tableId, filePath));
102-
}
103-
}
104-
assignSplit(subtaskId);
105-
}
107+
public void registerReader(int subtaskId) {}
106108

107109
@Override
108110
public FileSourceState snapshotState(long checkpointId) {
@@ -150,14 +152,12 @@ private static int getSplitOwner(int assignCount, int numReaders) {
150152
return assignCount % numReaders;
151153
}
152154

153-
@Override
154-
public void open() {
155-
// do nothing
156-
}
157-
158155
@Override
159156
public void run() throws Exception {
160-
// do nothing
157+
for (int i = 0; i < context.currentParallelism(); i++) {
158+
log.info("Assigned splits to reader [{}]", i);
159+
assignSplit(i);
160+
}
161161
}
162162

163163
@Override

seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/split/MultipleTableHiveSourceSplitEnumeratorTest.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import org.junit.jupiter.api.Assertions;
3232
import org.junit.jupiter.api.Test;
33+
import org.mockito.ArgumentCaptor;
3334
import org.mockito.Mockito;
3435

3536
import lombok.extern.slf4j.Slf4j;
@@ -39,14 +40,13 @@
3940
import java.util.HashMap;
4041
import java.util.List;
4142
import java.util.Map;
42-
import java.util.concurrent.atomic.AtomicInteger;
4343
import java.util.stream.IntStream;
4444

4545
@Slf4j
4646
public class MultipleTableHiveSourceSplitEnumeratorTest {
4747

4848
@Test
49-
void assignSplitRoundTest() {
49+
void assignSplitRoundTest() throws Exception {
5050
int parallelism = 4;
5151
int fileSize = 50;
5252

@@ -81,24 +81,25 @@ void assignSplitRoundTest() {
8181
MultipleTableHiveSourceSplitEnumerator enumerator =
8282
new MultipleTableHiveSourceSplitEnumerator(context, mockConfig);
8383

84-
AtomicInteger unAssignedSplitSize = new AtomicInteger(fileSize);
85-
IntStream.range(0, parallelism)
86-
.forEach(
87-
id -> {
88-
enumerator.registerReader(id);
89-
90-
// check the number of files assigned each time
91-
Assertions.assertEquals(
92-
enumerator.currentUnassignedSplitSize(),
93-
unAssignedSplitSize.get()
94-
- allocateFiles(id, parallelism, fileSize));
95-
unAssignedSplitSize.set(enumerator.currentUnassignedSplitSize());
96-
97-
log.info(
98-
"unAssigned splits => {}, allocate files => {}",
99-
enumerator.currentUnassignedSplitSize(),
100-
allocateFiles(id, parallelism, fileSize));
101-
});
84+
enumerator.open();
85+
Assertions.assertEquals(50, enumerator.currentUnassignedSplitSize());
86+
IntStream.range(0, parallelism).forEach(enumerator::registerReader);
87+
enumerator.run();
88+
89+
ArgumentCaptor<Integer> subtaskId = ArgumentCaptor.forClass(Integer.class);
90+
ArgumentCaptor<List> split = ArgumentCaptor.forClass(List.class);
91+
92+
Mockito.verify(context, Mockito.times(parallelism))
93+
.assignSplit(subtaskId.capture(), split.capture());
94+
95+
List<Integer> subTaskAllValues = subtaskId.getAllValues();
96+
List<List> splitAllValues = split.getAllValues();
97+
98+
for (int i = 0; i < parallelism; i++) {
99+
Assertions.assertEquals(i, subTaskAllValues.get(i));
100+
Assertions.assertEquals(
101+
allocateFiles(i, parallelism, fileSize), splitAllValues.get(i).size());
102+
}
102103

103104
// check no duplicate file assigned
104105
Assertions.assertEquals(0, enumerator.currentUnassignedSplitSize());

0 commit comments

Comments
 (0)