Skip to content

Commit c7c596a

Browse files
liugddxzhilinlizhilinli123
authored
[Improve][Connector-fake] Optimizing Data Generation Strategies refer to #4004 (#4061)
--------- Co-authored-by: zhilinli <lzl15844876351@163.com> Co-authored-by: zhilinli <76689593+zhilinli123@users.noreply.github.com>
1 parent aee2c58 commit c7c596a

File tree

4 files changed

+41
-16
lines changed

4 files changed

+41
-16
lines changed

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.fake.source;
1919

20+
import org.apache.seatunnel.api.source.Collector;
2021
import org.apache.seatunnel.api.table.type.ArrayType;
2122
import org.apache.seatunnel.api.table.type.BasicType;
2223
import org.apache.seatunnel.api.table.type.DecimalType;
@@ -77,19 +78,21 @@ private SeaTunnelRow randomRow() {
7778
return new SeaTunnelRow(randomRow.toArray());
7879
}
7980

80-
public List<SeaTunnelRow> generateFakedRows(int rowNum) {
81-
List<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
81+
/**
82+
* @param rowNum The number of pieces of data to be generated by the current task
83+
* @param output Data collection and distribution
84+
**/
85+
public void collectFakedRows(int rowNum, Collector<SeaTunnelRow> output) {
86+
// Use manual configuration data preferentially
8287
if (fakeConfig.getFakeRows() != null) {
8388
for (FakeConfig.RowData rowData : fakeConfig.getFakeRows()) {
84-
seaTunnelRows.add(convertRow(rowData));
89+
output.collect(convertRow(rowData));
90+
}
91+
} else {
92+
for (int i = 0; i < rowNum; i++) {
93+
output.collect(randomRow());
8594
}
86-
return seaTunnelRows;
87-
}
88-
89-
for (int i = 0; i < rowNum; i++) {
90-
seaTunnelRows.add(randomRow());
9195
}
92-
return seaTunnelRows;
9396
}
9497

9598
@SuppressWarnings("magicnumber")

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,8 @@ public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException
7070
synchronized (output.getCheckpointLock()) {
7171
FakeSourceSplit split = splits.poll();
7272
if (null != split) {
73-
// Generate a random number of rows to emit.
74-
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows(split.getRowNum());
75-
for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
76-
output.collect(seaTunnelRow);
77-
}
73+
// Randomly generated data are sent directly to the downstream operator
74+
fakeDataGenerator.collectFakedRows(split.getRowNum(), output);
7875
log.info("{} rows of data have been generated in split({}). Generation time: {}", split.getRowNum(), split.splitId(), latestTimestamp);
7976
} else {
8077
if (!noMoreSplit) {

seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.fake.source;
1919

20+
import org.apache.seatunnel.api.source.Collector;
2021
import org.apache.seatunnel.api.table.type.RowKind;
2122
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2223
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -35,6 +36,7 @@
3536
import java.net.URISyntaxException;
3637
import java.net.URL;
3738
import java.nio.file.Paths;
39+
import java.util.ArrayList;
3840
import java.util.Arrays;
3941
import java.util.List;
4042
import java.util.Map;
@@ -49,7 +51,18 @@ public void testComplexSchemaParse(String conf) throws FileNotFoundException, UR
4951
SeaTunnelRowType seaTunnelRowType = seaTunnelSchema.getSeaTunnelRowType();
5052
FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
5153
FakeDataGenerator fakeDataGenerator = new FakeDataGenerator(seaTunnelSchema, fakeConfig);
52-
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
54+
List<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
55+
fakeDataGenerator.collectFakedRows(fakeConfig.getRowNum(), new Collector<SeaTunnelRow>() {
56+
@Override
57+
public void collect(SeaTunnelRow record) {
58+
seaTunnelRows.add(record);
59+
}
60+
61+
@Override
62+
public Object getCheckpointLock() {
63+
throw new UnsupportedOperationException();
64+
}
65+
});
5366
Assertions.assertNotNull(seaTunnelRows);
5467
Assertions.assertEquals(seaTunnelRows.size(), 10);
5568
for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
@@ -96,7 +109,18 @@ public void testRowDataParse(String conf) throws FileNotFoundException, URISynta
96109
SeaTunnelSchema seaTunnelSchema = SeaTunnelSchema.buildWithConfig(testConfig.getConfig(SeaTunnelSchema.SCHEMA.key()));
97110
FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
98111
FakeDataGenerator fakeDataGenerator = new FakeDataGenerator(seaTunnelSchema, fakeConfig);
99-
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
112+
List<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
113+
fakeDataGenerator.collectFakedRows(fakeConfig.getRowNum(), new Collector<SeaTunnelRow>() {
114+
@Override
115+
public void collect(SeaTunnelRow record) {
116+
seaTunnelRows.add(record);
117+
}
118+
119+
@Override
120+
public Object getCheckpointLock() {
121+
throw new UnsupportedOperationException();
122+
}
123+
});
100124
Assertions.assertIterableEquals(expected, seaTunnelRows);
101125
}
102126

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource {
6868
public void startUp() throws Exception {
6969
container = new ElasticsearchContainer(DockerImageName.parse("elasticsearch:8.0.0").asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"))
7070
.withNetwork(NETWORK)
71+
.withEnv("cluster.routing.allocation.disk.threshold_enabled", "false")
7172
.withNetworkAliases("elasticsearch")
7273
.withPassword("elasticsearch")
7374
.withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("elasticsearch:8.0.0")));

0 commit comments

Comments
 (0)