Skip to content

Commit c28c44b

Browse files
liugddxTyrantLuciferashulinhailin0
authored
[feature][connector][fake] Support mutil splits for fake source connector (#2974)
* feature Support more than splits and parallelism for fake connector close #2961 * test parallelism * Not practical average * fix a bug * remove split rowNum * [Feature][Connector-V2] Improve class attribute * fine-tuning * Remove useless blank line. * support STREAMING * [feature][connector][fake] Support mutil-splits for fake source * [connector][fake] Make sure the bounded flow is correct * Update seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java Co-authored-by: hailin0 <hailin088@gmail.com> Co-authored-by: TyrantLucifer <tyrantlucifer@apache.org> Co-authored-by: Zongwen Li <zongwen@apache.org> Co-authored-by: hailin0 <hailin088@gmail.com>
1 parent bfde596 commit c28c44b

File tree

9 files changed

+255
-31
lines changed

9 files changed

+255
-31
lines changed
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.seatunnel.connectors.seatunnel.fake.source;
18+
package org.apache.seatunnel.connectors.seatunnel.fake.config;
1919

2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121

@@ -27,11 +27,11 @@
2727
@Builder
2828
@Getter
2929
public class FakeConfig implements Serializable {
30-
private static final String ROW_NUM = "row.num";
31-
private static final String MAP_SIZE = "map.size";
32-
private static final String ARRAY_SIZE = "array.size";
33-
private static final String BYTES_LENGTH = "bytes.length";
34-
private static final String STRING_LENGTH = "string.length";
30+
public static final String ROW_NUM = "row.num";
31+
public static final String MAP_SIZE = "map.size";
32+
public static final String ARRAY_SIZE = "array.size";
33+
public static final String BYTES_LENGTH = "bytes.length";
34+
public static final String STRING_LENGTH = "string.length";
3535
private static final int DEFAULT_ROW_NUM = 5;
3636
private static final int DEFAULT_MAP_SIZE = 5;
3737
private static final int DEFAULT_ARRAY_SIZE = 5;

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2626
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2727
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
28+
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
2829

2930
import org.apache.commons.lang3.RandomStringUtils;
3031
import org.apache.commons.lang3.RandomUtils;

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,23 @@
2020
import org.apache.seatunnel.api.common.JobContext;
2121
import org.apache.seatunnel.api.source.Boundedness;
2222
import org.apache.seatunnel.api.source.SeaTunnelSource;
23+
import org.apache.seatunnel.api.source.SourceReader;
24+
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
2325
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2426
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2527
import org.apache.seatunnel.common.constants.JobMode;
2628
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
27-
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
28-
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
29-
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
29+
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
3030

3131
import org.apache.seatunnel.shade.com.typesafe.config.Config;
3232

3333
import com.google.auto.service.AutoService;
3434

35+
import java.io.Serializable;
36+
3537
@AutoService(SeaTunnelSource.class)
36-
public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
38+
public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, Serializable> {
3739

38-
private Config pluginConfig;
3940
private JobContext jobContext;
4041
private SeaTunnelSchema schema;
4142
private FakeConfig fakeConfig;
@@ -51,7 +52,17 @@ public SeaTunnelRowType getProducedType() {
5152
}
5253

5354
@Override
54-
public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
55+
public SourceSplitEnumerator<FakeSourceSplit, Serializable> createEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) throws Exception {
56+
return new FakeSourceSplitEnumerator(enumeratorContext);
57+
}
58+
59+
@Override
60+
public SourceSplitEnumerator<FakeSourceSplit, Serializable> restoreEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext, Serializable checkpointState) throws Exception {
61+
return new FakeSourceSplitEnumerator(enumeratorContext);
62+
}
63+
64+
@Override
65+
public SourceReader<SeaTunnelRow, FakeSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
5566
return new FakeSourceReader(readerContext, new FakeDataGenerator(schema, fakeConfig));
5667
}
5768

@@ -62,7 +73,6 @@ public String getPluginName() {
6273

6374
@Override
6475
public void prepare(Config pluginConfig) {
65-
this.pluginConfig = pluginConfig;
6676
assert pluginConfig.hasPath(FakeDataGenerator.SCHEMA);
6777
this.schema = SeaTunnelSchema.buildWithConfig(pluginConfig.getConfig(FakeDataGenerator.SCHEMA));
6878
this.fakeConfig = FakeConfig.buildWithConfig(pluginConfig);

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,27 @@
1919

2020
import org.apache.seatunnel.api.source.Boundedness;
2121
import org.apache.seatunnel.api.source.Collector;
22+
import org.apache.seatunnel.api.source.SourceReader;
2223
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
23-
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
24-
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
2524

2625
import lombok.extern.slf4j.Slf4j;
2726

27+
import java.util.ArrayList;
28+
import java.util.Deque;
29+
import java.util.LinkedList;
2830
import java.util.List;
2931

3032
@Slf4j
31-
public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
32-
33-
private final SingleSplitReaderContext context;
33+
public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSplit> {
3434

35+
private final SourceReader.Context context;
36+
private final Deque<FakeSourceSplit> splits = new LinkedList<>();
3537
private final FakeDataGenerator fakeDataGenerator;
38+
boolean noMoreSplit;
3639

37-
public FakeSourceReader(SingleSplitReaderContext context, FakeDataGenerator randomData) {
40+
public FakeSourceReader(SourceReader.Context context, FakeDataGenerator fakeDataGenerator) {
3841
this.context = context;
39-
this.fakeDataGenerator = randomData;
42+
this.fakeDataGenerator = fakeDataGenerator;
4043
}
4144

4245
@Override
@@ -52,16 +55,46 @@ public void close() {
5255
@Override
5356
@SuppressWarnings("magicnumber")
5457
public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
55-
// Generate a random number of rows to emit.
56-
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows();
57-
for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
58-
output.collect(seaTunnelRow);
59-
}
60-
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
61-
// signal to the source that we have reached the end of the data.
62-
log.info("Closed the bounded fake source");
63-
context.signalNoMoreElement();
58+
synchronized (output.getCheckpointLock()) {
59+
FakeSourceSplit split = splits.poll();
60+
if (null != split) {
61+
// Generate a random number of rows to emit.
62+
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows();
63+
for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
64+
output.collect(seaTunnelRow);
65+
}
66+
} else {
67+
if (noMoreSplit && Boundedness.BOUNDED.equals(context.getBoundedness())) {
68+
// signal to the source that we have reached the end of the data.
69+
log.info("Closed the bounded fake source");
70+
context.signalNoMoreElement();
71+
}
72+
if (!noMoreSplit) {
73+
log.info("wait split!");
74+
}
75+
}
76+
6477
}
6578
Thread.sleep(1000L);
6679
}
80+
81+
@Override
82+
public List<FakeSourceSplit> snapshotState(long checkpointId) throws Exception {
83+
return new ArrayList<>(splits);
84+
}
85+
86+
@Override
87+
public void addSplits(List<FakeSourceSplit> splits) {
88+
this.splits.addAll(splits);
89+
}
90+
91+
@Override
92+
public void handleNoMoreSplits() {
93+
noMoreSplit = true;
94+
}
95+
96+
@Override
97+
public void notifyCheckpointComplete(long checkpointId) throws Exception {
98+
99+
}
67100
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.fake.source;
19+
20+
import org.apache.seatunnel.api.source.SourceSplit;
21+
22+
import lombok.AllArgsConstructor;
23+
import lombok.Data;
24+
25+
@Data
26+
@AllArgsConstructor
27+
public class FakeSourceSplit implements SourceSplit {
28+
private int splitId;
29+
30+
@Override
31+
public String splitId() {
32+
return String.valueOf(splitId);
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.fake.source;
19+
20+
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
21+
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.io.IOException;
26+
import java.io.Serializable;
27+
import java.util.ArrayList;
28+
import java.util.HashMap;
29+
import java.util.HashSet;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.Set;
33+
34+
public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSourceSplit, Serializable> {
35+
36+
private static final Logger LOG = LoggerFactory.getLogger(FakeSourceSplitEnumerator.class);
37+
private final SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext;
38+
private final Map<Integer, Set<FakeSourceSplit>> pendingSplits;
39+
40+
public FakeSourceSplitEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) {
41+
this.enumeratorContext = enumeratorContext;
42+
this.pendingSplits = new HashMap<>();
43+
}
44+
45+
@Override
46+
public void open() {
47+
// No connection needs to be opened
48+
}
49+
50+
@Override
51+
public void run() throws Exception {
52+
discoverySplits();
53+
assignPendingSplits();
54+
}
55+
56+
@Override
57+
public void close() throws IOException {
58+
// nothing
59+
}
60+
61+
@Override
62+
public void addSplitsBack(List<FakeSourceSplit> splits, int subtaskId) {
63+
64+
}
65+
66+
@Override
67+
public int currentUnassignedSplitSize() {
68+
return 0;
69+
}
70+
71+
@Override
72+
public void handleSplitRequest(int subtaskId) {
73+
74+
}
75+
76+
@Override
77+
public void registerReader(int subtaskId) {
78+
// nothing
79+
}
80+
81+
@Override
82+
public Serializable snapshotState(long checkpointId) throws Exception {
83+
return null;
84+
}
85+
86+
@Override
87+
public void notifyCheckpointComplete(long checkpointId) throws Exception {
88+
89+
}
90+
91+
private void discoverySplits() {
92+
List<FakeSourceSplit> allSplit = new ArrayList<>();
93+
LOG.info("Starting to calculate splits.");
94+
int numReaders = enumeratorContext.currentParallelism();
95+
for (int i = 0; i < numReaders; i++) {
96+
allSplit.add(new FakeSourceSplit(i));
97+
}
98+
for (FakeSourceSplit split : allSplit) {
99+
int ownerReader = split.getSplitId() % numReaders;
100+
pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
101+
.add(split);
102+
}
103+
LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
104+
LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
105+
}
106+
107+
private void assignPendingSplits() {
108+
// Check if there's any pending splits for given readers
109+
for (int pendingReader : enumeratorContext.registeredReaders()) {
110+
// Remove pending assignment for the reader
111+
final Set<FakeSourceSplit> pendingAssignmentForReader =
112+
pendingSplits.remove(pendingReader);
113+
114+
if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) {
115+
// Assign pending splits to reader
116+
LOG.info("Assigning splits to readers {}", pendingAssignmentForReader);
117+
enumeratorContext.assignSplit(pendingReader, new ArrayList<>(pendingAssignmentForReader));
118+
enumeratorContext.signalNoMoreSplits(pendingReader);
119+
}
120+
}
121+
}
122+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.fake.state;
19+
20+
import java.io.Serializable;
21+
22+
public class FakeSourceState implements Serializable {
23+
}

seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2121
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2222
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
23+
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
2324

2425
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2526
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
env {
2222
# You can set flink configuration here
23-
execution.parallelism = 1
24-
#job.mode = "BATCH"
23+
execution.parallelism = 2
24+
job.mode = "STREAMING"
2525
#execution.checkpoint.interval = 10000
2626
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
2727
}

0 commit comments

Comments
 (0)