Skip to content

Commit efabe6a

Browse files
authored
[improve][connector][fake] supports setting the number of split rows and reading interval (#3098)
1 parent 0bc1208 commit efabe6a

File tree

10 files changed

+118
-45
lines changed

10 files changed

+118
-45
lines changed

docs/en/connector-v2/source/FakeSource.md

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@ just for some test cases such as type conversion or connector new feature testin
1818

1919
## Options
2020

21-
| name | type | required | default value |
22-
| -------------- | ------ | -------- | ------------- |
23-
| schema | config | yes | - |
24-
| row.num | int | no | 5 |
25-
| map.size | int | no | 5 |
26-
| array.size | int | no | 5 |
27-
| bytes.length | int | no | 5 |
28-
| string.length | int | no | 5 |
29-
| common-options | | no | - |
21+
| name | type | required | default value |
22+
|---------------------|--------|----------|---------------|
23+
| schema | config | yes | - |
24+
| row.num | int | no | 5 |
25+
| split.num | int | no | 1 |
26+
| split.read-interval | long | no | 1 |
27+
| map.size | int | no | 5 |
28+
| array.size | int | no | 5 |
29+
| bytes.length | int | no | 5 |
30+
| string.length | int | no | 5 |
31+
| common-options | | no | - |
3032

3133
### schema [config]
3234

@@ -81,7 +83,15 @@ Source plugin common parameters, please refer to [Source Common Options](common-
8183

8284
### row.num
8385

84-
Total num of data that connector generated
86+
The total number of data generated per degree of parallelism
87+
88+
### split.num
89+
90+
the number of splits generated by the enumerator for each degree of parallelism
91+
92+
### split.read-interval
93+
94+
The interval(mills) between two split reads in a reader
8595

8696
### map.size
8797

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
@Getter
2929
public class FakeConfig implements Serializable {
3030
public static final String ROW_NUM = "row.num";
31+
public static final String SPLIT_NUM = "split.num";
32+
public static final String SPLIT_READ_INTERVAL = "split.read-interval";
3133
public static final String MAP_SIZE = "map.size";
3234
public static final String ARRAY_SIZE = "array.size";
3335
public static final String BYTES_LENGTH = "bytes.length";
@@ -40,6 +42,10 @@ public class FakeConfig implements Serializable {
4042
@Builder.Default
4143
private int rowNum = DEFAULT_ROW_NUM;
4244
@Builder.Default
45+
private int splitNum = 1;
46+
@Builder.Default
47+
private int splitReadInterval = 1;
48+
@Builder.Default
4349
private int mapSize = DEFAULT_MAP_SIZE;
4450
@Builder.Default
4551
private int arraySize = DEFAULT_ARRAY_SIZE;
@@ -53,6 +59,12 @@ public static FakeConfig buildWithConfig(Config config) {
5359
if (config.hasPath(ROW_NUM)) {
5460
builder.rowNum(config.getInt(ROW_NUM));
5561
}
62+
if (config.hasPath(SPLIT_NUM)) {
63+
builder.splitNum(config.getInt(SPLIT_NUM));
64+
}
65+
if (config.hasPath(SPLIT_READ_INTERVAL)) {
66+
builder.splitReadInterval(config.getInt(SPLIT_READ_INTERVAL));
67+
}
5668
if (config.hasPath(MAP_SIZE)) {
5769
builder.mapSize(config.getInt(MAP_SIZE));
5870
}

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ private SeaTunnelRow randomRow() {
5858
return new SeaTunnelRow(randomRow.toArray());
5959
}
6060

61-
public List<SeaTunnelRow> generateFakedRows() {
61+
public List<SeaTunnelRow> generateFakedRows(int rowNum) {
6262
ArrayList<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
63-
for (int i = 0; i < fakeConfig.getRowNum(); i++) {
63+
for (int i = 0; i < rowNum; i++) {
6464
seaTunnelRows.add(randomRow());
6565
}
6666
return seaTunnelRows;

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,16 @@
2727
import org.apache.seatunnel.common.constants.JobMode;
2828
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
2929
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
30+
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;
3031

3132
import org.apache.seatunnel.shade.com.typesafe.config.Config;
3233

3334
import com.google.auto.service.AutoService;
3435

35-
import java.io.Serializable;
36+
import java.util.Collections;
3637

3738
@AutoService(SeaTunnelSource.class)
38-
public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, Serializable> {
39+
public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, FakeSourceState> {
3940

4041
private JobContext jobContext;
4142
private SeaTunnelSchema schema;
@@ -52,18 +53,18 @@ public SeaTunnelRowType getProducedType() {
5253
}
5354

5455
@Override
55-
public SourceSplitEnumerator<FakeSourceSplit, Serializable> createEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) throws Exception {
56-
return new FakeSourceSplitEnumerator(enumeratorContext);
56+
public SourceSplitEnumerator<FakeSourceSplit, FakeSourceState> createEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) throws Exception {
57+
return new FakeSourceSplitEnumerator(enumeratorContext, fakeConfig, Collections.emptySet());
5758
}
5859

5960
@Override
60-
public SourceSplitEnumerator<FakeSourceSplit, Serializable> restoreEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext, Serializable checkpointState) throws Exception {
61-
return new FakeSourceSplitEnumerator(enumeratorContext);
61+
public SourceSplitEnumerator<FakeSourceSplit, FakeSourceState> restoreEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext, FakeSourceState checkpointState) throws Exception {
62+
return new FakeSourceSplitEnumerator(enumeratorContext, fakeConfig, checkpointState.getAssignedSplits());
6263
}
6364

6465
@Override
6566
public SourceReader<SeaTunnelRow, FakeSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
66-
return new FakeSourceReader(readerContext, new FakeDataGenerator(schema, fakeConfig));
67+
return new FakeSourceReader(readerContext, schema, fakeConfig);
6768
}
6869

6970
@Override

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@
2121
import org.apache.seatunnel.api.source.Collector;
2222
import org.apache.seatunnel.api.source.SourceReader;
2323
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
24+
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
25+
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
2426

2527
import lombok.extern.slf4j.Slf4j;
2628

29+
import java.time.Instant;
2730
import java.util.ArrayList;
2831
import java.util.Deque;
2932
import java.util.LinkedList;
@@ -34,12 +37,16 @@ public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSp
3437

3538
private final SourceReader.Context context;
3639
private final Deque<FakeSourceSplit> splits = new LinkedList<>();
40+
41+
private final FakeConfig config;
3742
private final FakeDataGenerator fakeDataGenerator;
38-
boolean noMoreSplit;
43+
private volatile boolean noMoreSplit;
44+
private volatile long latestTimestamp = 0;
3945

40-
public FakeSourceReader(SourceReader.Context context, FakeDataGenerator fakeDataGenerator) {
46+
public FakeSourceReader(SourceReader.Context context, SeaTunnelSchema schema, FakeConfig fakeConfig) {
4147
this.context = context;
42-
this.fakeDataGenerator = fakeDataGenerator;
48+
this.config = fakeConfig;
49+
this.fakeDataGenerator = new FakeDataGenerator(schema, fakeConfig);
4350
}
4451

4552
@Override
@@ -53,27 +60,32 @@ public void close() {
5360
}
5461

5562
@Override
56-
@SuppressWarnings("magicnumber")
63+
@SuppressWarnings("MagicNumber")
5764
public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
65+
long currentTimestamp = Instant.now().toEpochMilli();
66+
if (currentTimestamp <= latestTimestamp + config.getSplitReadInterval()) {
67+
return;
68+
}
69+
latestTimestamp = currentTimestamp;
5870
synchronized (output.getCheckpointLock()) {
5971
FakeSourceSplit split = splits.poll();
6072
if (null != split) {
6173
// Generate a random number of rows to emit.
62-
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows();
74+
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows(split.getRowNum());
6375
for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
6476
output.collect(seaTunnelRow);
6577
}
78+
log.info("{} rows of data have been generated in split({}). Generation time: {}", split.getRowNum(), split.splitId(), latestTimestamp);
6679
} else {
67-
if (noMoreSplit && Boundedness.BOUNDED.equals(context.getBoundedness())) {
68-
// signal to the source that we have reached the end of the data.
69-
log.info("Closed the bounded fake source");
70-
context.signalNoMoreElement();
71-
}
7280
if (!noMoreSplit) {
7381
log.info("wait split!");
7482
}
7583
}
76-
84+
}
85+
if (splits.isEmpty() && noMoreSplit && Boundedness.BOUNDED.equals(context.getBoundedness())) {
86+
// signal to the source that we have reached the end of the data.
87+
log.info("Closed the bounded fake source");
88+
context.signalNoMoreElement();
7789
}
7890
Thread.sleep(1000L);
7991
}

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
public class FakeSourceSplit implements SourceSplit {
2828
private int splitId;
2929

30+
private int rowNum;
31+
3032
@Override
3133
public String splitId() {
3234
return String.valueOf(splitId);

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,40 @@
1818
package org.apache.seatunnel.connectors.seatunnel.fake.source;
1919

2020
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
21+
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
22+
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;
2123

2224
import org.slf4j.Logger;
2325
import org.slf4j.LoggerFactory;
2426

2527
import java.io.IOException;
26-
import java.io.Serializable;
2728
import java.util.ArrayList;
29+
import java.util.Collection;
2830
import java.util.HashMap;
2931
import java.util.HashSet;
3032
import java.util.List;
3133
import java.util.Map;
3234
import java.util.Set;
3335

34-
public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSourceSplit, Serializable> {
36+
public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSourceSplit, FakeSourceState> {
3537

3638
private static final Logger LOG = LoggerFactory.getLogger(FakeSourceSplitEnumerator.class);
3739
private final SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext;
3840
private final Map<Integer, Set<FakeSourceSplit>> pendingSplits;
3941

40-
public FakeSourceSplitEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) {
42+
private final FakeConfig fakeConfig;
43+
/**
44+
* Partitions that have been assigned to readers.
45+
*/
46+
private final Set<FakeSourceSplit> assignedSplits;
47+
48+
public FakeSourceSplitEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext,
49+
FakeConfig config,
50+
Set<FakeSourceSplit> assignedSplits) {
4151
this.enumeratorContext = enumeratorContext;
4252
this.pendingSplits = new HashMap<>();
53+
this.fakeConfig = config;
54+
this.assignedSplits = new HashSet<>(assignedSplits);
4355
}
4456

4557
@Override
@@ -60,12 +72,12 @@ public void close() throws IOException {
6072

6173
@Override
6274
public void addSplitsBack(List<FakeSourceSplit> splits, int subtaskId) {
63-
75+
addSplitChangeToPendingAssignments(splits);
6476
}
6577

6678
@Override
6779
public int currentUnassignedSplitSize() {
68-
return 0;
80+
return pendingSplits.size();
6981
}
7082

7183
@Override
@@ -79,8 +91,8 @@ public void registerReader(int subtaskId) {
7991
}
8092

8193
@Override
82-
public Serializable snapshotState(long checkpointId) throws Exception {
83-
return null;
94+
public FakeSourceState snapshotState(long checkpointId) throws Exception {
95+
return new FakeSourceState(assignedSplits);
8496
}
8597

8698
@Override
@@ -89,19 +101,31 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
89101
}
90102

91103
private void discoverySplits() {
92-
List<FakeSourceSplit> allSplit = new ArrayList<>();
104+
Set<FakeSourceSplit> allSplit = new HashSet<>();
93105
LOG.info("Starting to calculate splits.");
94106
int numReaders = enumeratorContext.currentParallelism();
107+
int readerRowNum = fakeConfig.getRowNum();
108+
int splitNum = fakeConfig.getSplitNum();
109+
int splitRowNum = (int) Math.ceil((double) readerRowNum / splitNum);
95110
for (int i = 0; i < numReaders; i++) {
96-
allSplit.add(new FakeSourceSplit(i));
111+
int index = i;
112+
for (int num = 0; num < readerRowNum; index += numReaders, num += splitRowNum) {
113+
allSplit.add(new FakeSourceSplit(index, Math.min(splitRowNum, readerRowNum - num)));
114+
}
97115
}
98-
for (FakeSourceSplit split : allSplit) {
99-
int ownerReader = split.getSplitId() % numReaders;
116+
117+
assignedSplits.forEach(allSplit::remove);
118+
addSplitChangeToPendingAssignments(allSplit);
119+
LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
120+
LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
121+
}
122+
123+
private void addSplitChangeToPendingAssignments(Collection<FakeSourceSplit> newSplits) {
124+
for (FakeSourceSplit split : newSplits) {
125+
int ownerReader = split.getSplitId() % enumeratorContext.currentParallelism();
100126
pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
101127
.add(split);
102128
}
103-
LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
104-
LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
105129
}
106130

107131
private void assignPendingSplits() {

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeSourceState.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,16 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.fake.state;
1919

20+
import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSourceSplit;
21+
22+
import lombok.AllArgsConstructor;
23+
import lombok.Getter;
24+
2025
import java.io.Serializable;
26+
import java.util.Set;
2127

28+
@Getter
29+
@AllArgsConstructor
2230
public class FakeSourceState implements Serializable {
31+
private final Set<FakeSourceSplit> assignedSplits;
2332
}

seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public void testComplexSchemaParse(String conf) throws FileNotFoundException, UR
4747
SeaTunnelRowType seaTunnelRowType = seaTunnelSchema.getSeaTunnelRowType();
4848
FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
4949
FakeDataGenerator fakeDataGenerator = new FakeDataGenerator(seaTunnelSchema, fakeConfig);
50-
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows();
50+
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
5151
Assertions.assertNotNull(seaTunnelRows);
5252
Assertions.assertEquals(seaTunnelRows.size(), 10);
5353
for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ source {
2626
# This is a example source plugin **only for test and demonstrate the feature source plugin**
2727
FakeSource {
2828
result_table_name = "fake"
29+
row.num = 100
30+
split.row = 25
31+
split.read-interval = 2000
2932
schema = {
3033
fields {
3134
name = "string"
@@ -53,7 +56,7 @@ sink {
5356
row_rules = [
5457
{
5558
rule_type = MAX_ROW
56-
rule_value = 10
59+
rule_value = 100
5760
},
5861
{
5962
rule_type = MIN_ROW

0 commit comments

Comments
 (0)