Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][connector][fake] supports setting the number of split rows and reading interval #3098

Merged
merged 1 commit into from
Oct 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions docs/en/connector-v2/source/FakeSource.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ just for some test cases such as type conversion or connector new feature testin

## Options

| name | type | required | default value |
| -------------- | ------ | -------- | ------------- |
| schema | config | yes | - |
| row.num | int | no | 5 |
| map.size | int | no | 5 |
| array.size | int | no | 5 |
| bytes.length | int | no | 5 |
| string.length | int | no | 5 |
| common-options | | no | - |
| name | type | required | default value |
|---------------------|--------|----------|---------------|
| schema | config | yes | - |
| row.num | int | no | 5 |
| split.num | int | no | 1 |
| split.read-interval | long | no | 1 |
| map.size | int | no | 5 |
| array.size | int | no | 5 |
| bytes.length | int | no | 5 |
| string.length | int | no | 5 |
| common-options | | no | - |

### schema [config]

Expand Down Expand Up @@ -81,7 +83,15 @@ Source plugin common parameters, please refer to [Source Common Options](common-

### row.num

Total num of data that connector generated
The total number of data generated per degree of parallelism

### split.num

the number of splits generated by the enumerator for each degree of parallelism

### split.read-interval

The interval(mills) between two split reads in a reader

### map.size

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
@Getter
public class FakeConfig implements Serializable {
public static final String ROW_NUM = "row.num";
public static final String SPLIT_NUM = "split.num";
public static final String SPLIT_READ_INTERVAL = "split.read-interval";
public static final String MAP_SIZE = "map.size";
public static final String ARRAY_SIZE = "array.size";
public static final String BYTES_LENGTH = "bytes.length";
Expand All @@ -40,6 +42,10 @@ public class FakeConfig implements Serializable {
@Builder.Default
private int rowNum = DEFAULT_ROW_NUM;
@Builder.Default
private int splitNum = 1;
@Builder.Default
private int splitReadInterval = 1;
@Builder.Default
private int mapSize = DEFAULT_MAP_SIZE;
@Builder.Default
private int arraySize = DEFAULT_ARRAY_SIZE;
Expand All @@ -53,6 +59,12 @@ public static FakeConfig buildWithConfig(Config config) {
if (config.hasPath(ROW_NUM)) {
builder.rowNum(config.getInt(ROW_NUM));
}
if (config.hasPath(SPLIT_NUM)) {
builder.splitNum(config.getInt(SPLIT_NUM));
}
if (config.hasPath(SPLIT_READ_INTERVAL)) {
builder.splitReadInterval(config.getInt(SPLIT_READ_INTERVAL));
}
if (config.hasPath(MAP_SIZE)) {
builder.mapSize(config.getInt(MAP_SIZE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ private SeaTunnelRow randomRow() {
return new SeaTunnelRow(randomRow.toArray());
}

public List<SeaTunnelRow> generateFakedRows() {
public List<SeaTunnelRow> generateFakedRows(int rowNum) {
ArrayList<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
for (int i = 0; i < fakeConfig.getRowNum(); i++) {
for (int i = 0; i < rowNum; i++) {
seaTunnelRows.add(randomRow());
}
return seaTunnelRows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import com.google.auto.service.AutoService;

import java.io.Serializable;
import java.util.Collections;

@AutoService(SeaTunnelSource.class)
public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, Serializable> {
public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, FakeSourceState> {

private JobContext jobContext;
private SeaTunnelSchema schema;
Expand All @@ -52,18 +53,18 @@ public SeaTunnelRowType getProducedType() {
}

@Override
public SourceSplitEnumerator<FakeSourceSplit, Serializable> createEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) throws Exception {
return new FakeSourceSplitEnumerator(enumeratorContext);
public SourceSplitEnumerator<FakeSourceSplit, FakeSourceState> createEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) throws Exception {
return new FakeSourceSplitEnumerator(enumeratorContext, fakeConfig, Collections.emptySet());
}

@Override
public SourceSplitEnumerator<FakeSourceSplit, Serializable> restoreEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext, Serializable checkpointState) throws Exception {
return new FakeSourceSplitEnumerator(enumeratorContext);
public SourceSplitEnumerator<FakeSourceSplit, FakeSourceState> restoreEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext, FakeSourceState checkpointState) throws Exception {
return new FakeSourceSplitEnumerator(enumeratorContext, fakeConfig, checkpointState.getAssignedSplits());
}

@Override
public SourceReader<SeaTunnelRow, FakeSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
return new FakeSourceReader(readerContext, new FakeDataGenerator(schema, fakeConfig));
return new FakeSourceReader(readerContext, schema, fakeConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;

import lombok.extern.slf4j.Slf4j;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
Expand All @@ -34,12 +37,16 @@ public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSp

private final SourceReader.Context context;
private final Deque<FakeSourceSplit> splits = new LinkedList<>();

private final FakeConfig config;
private final FakeDataGenerator fakeDataGenerator;
boolean noMoreSplit;
private volatile boolean noMoreSplit;
private volatile long latestTimestamp = 0;

public FakeSourceReader(SourceReader.Context context, FakeDataGenerator fakeDataGenerator) {
public FakeSourceReader(SourceReader.Context context, SeaTunnelSchema schema, FakeConfig fakeConfig) {
this.context = context;
this.fakeDataGenerator = fakeDataGenerator;
this.config = fakeConfig;
this.fakeDataGenerator = new FakeDataGenerator(schema, fakeConfig);
}

@Override
Expand All @@ -53,27 +60,32 @@ public void close() {
}

@Override
@SuppressWarnings("magicnumber")
@SuppressWarnings("MagicNumber")
public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
long currentTimestamp = Instant.now().toEpochMilli();
if (currentTimestamp <= latestTimestamp + config.getSplitReadInterval()) {
return;
}
latestTimestamp = currentTimestamp;
synchronized (output.getCheckpointLock()) {
FakeSourceSplit split = splits.poll();
if (null != split) {
// Generate a random number of rows to emit.
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows();
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows(split.getRowNum());
for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
output.collect(seaTunnelRow);
}
log.info("{} rows of data have been generated in split({}). Generation time: {}", split.getRowNum(), split.splitId(), latestTimestamp);
} else {
if (noMoreSplit && Boundedness.BOUNDED.equals(context.getBoundedness())) {
// signal to the source that we have reached the end of the data.
log.info("Closed the bounded fake source");
context.signalNoMoreElement();
}
if (!noMoreSplit) {
log.info("wait split!");
}
}

}
if (splits.isEmpty() && noMoreSplit && Boundedness.BOUNDED.equals(context.getBoundedness())) {
// signal to the source that we have reached the end of the data.
log.info("Closed the bounded fake source");
context.signalNoMoreElement();
}
Thread.sleep(1000L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
public class FakeSourceSplit implements SourceSplit {
private int splitId;

private int rowNum;

@Override
public String splitId() {
return String.valueOf(splitId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,40 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;

import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSourceSplit, Serializable> {
public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSourceSplit, FakeSourceState> {

private static final Logger LOG = LoggerFactory.getLogger(FakeSourceSplitEnumerator.class);
private final SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext;
private final Map<Integer, Set<FakeSourceSplit>> pendingSplits;

public FakeSourceSplitEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) {
private final FakeConfig fakeConfig;
/**
* Partitions that have been assigned to readers.
*/
private final Set<FakeSourceSplit> assignedSplits;

public FakeSourceSplitEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext,
FakeConfig config,
Set<FakeSourceSplit> assignedSplits) {
this.enumeratorContext = enumeratorContext;
this.pendingSplits = new HashMap<>();
this.fakeConfig = config;
this.assignedSplits = new HashSet<>(assignedSplits);
}

@Override
Expand All @@ -60,12 +72,12 @@ public void close() throws IOException {

@Override
public void addSplitsBack(List<FakeSourceSplit> splits, int subtaskId) {

addSplitChangeToPendingAssignments(splits);
}

@Override
public int currentUnassignedSplitSize() {
return 0;
return pendingSplits.size();
}

@Override
Expand All @@ -79,8 +91,8 @@ public void registerReader(int subtaskId) {
}

@Override
public Serializable snapshotState(long checkpointId) throws Exception {
return null;
public FakeSourceState snapshotState(long checkpointId) throws Exception {
return new FakeSourceState(assignedSplits);
}

@Override
Expand All @@ -89,19 +101,31 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
}

private void discoverySplits() {
List<FakeSourceSplit> allSplit = new ArrayList<>();
Set<FakeSourceSplit> allSplit = new HashSet<>();
LOG.info("Starting to calculate splits.");
int numReaders = enumeratorContext.currentParallelism();
int readerRowNum = fakeConfig.getRowNum();
int splitNum = fakeConfig.getSplitNum();
int splitRowNum = (int) Math.ceil((double) readerRowNum / splitNum);
for (int i = 0; i < numReaders; i++) {
allSplit.add(new FakeSourceSplit(i));
int index = i;
for (int num = 0; num < readerRowNum; index += numReaders, num += splitRowNum) {
allSplit.add(new FakeSourceSplit(index, Math.min(splitRowNum, readerRowNum - num)));
}
}
for (FakeSourceSplit split : allSplit) {
int ownerReader = split.getSplitId() % numReaders;

assignedSplits.forEach(allSplit::remove);
addSplitChangeToPendingAssignments(allSplit);
LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
}

private void addSplitChangeToPendingAssignments(Collection<FakeSourceSplit> newSplits) {
for (FakeSourceSplit split : newSplits) {
int ownerReader = split.getSplitId() % enumeratorContext.currentParallelism();
pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
.add(split);
}
LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
}

private void assignPendingSplits() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@

package org.apache.seatunnel.connectors.seatunnel.fake.state;

import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSourceSplit;

import lombok.AllArgsConstructor;
import lombok.Getter;

import java.io.Serializable;
import java.util.Set;

@Getter
@AllArgsConstructor
public class FakeSourceState implements Serializable {
private final Set<FakeSourceSplit> assignedSplits;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void testComplexSchemaParse(String conf) throws FileNotFoundException, UR
SeaTunnelRowType seaTunnelRowType = seaTunnelSchema.getSeaTunnelRowType();
FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
FakeDataGenerator fakeDataGenerator = new FakeDataGenerator(seaTunnelSchema, fakeConfig);
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows();
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
Assertions.assertNotNull(seaTunnelRows);
Assertions.assertEquals(seaTunnelRows.size(), 10);
for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
row.num = 100
split.row = 25
split.read-interval = 2000
schema = {
fields {
name = "string"
Expand Down Expand Up @@ -53,7 +56,7 @@ sink {
row_rules = [
{
rule_type = MAX_ROW
rule_value = 10
rule_value = 100
},
{
rule_type = MIN_ROW
Expand Down