-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feature][connector][fake] Support mutil splits for fake source connector #2974
Conversation
|
||
public FakeSourceReader(SingleSplitReaderContext context, FakeDataGenerator randomData) { | ||
public FakeSourceReader(SourceReader.Context context, FakeDataGenerator randomData) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public FakeSourceReader(SourceReader.Context context, FakeDataGenerator randomData) { | |
public FakeSourceReader(SourceReader.Context context, FakeDataGenerator fakeDataGenerator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename randomData
to fakeDataGenerator
?
private final Map<Integer, Set<FakeSourceSplit>> pendingSplits; | ||
private final Integer totalRowNum; | ||
|
||
public FakeSourceSplitEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext, Integer totalRowNum) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public FakeSourceSplitEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext, Integer totalRowNum) { | |
public FakeSourceSplitEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext, int totalRowNum) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the review comment in FakeSourceSplitEnumerator, this parameter maybe need to be removed.
private static final Logger LOG = LoggerFactory.getLogger(FakeSourceSplitEnumerator.class); | ||
private final SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext; | ||
private final Map<Integer, Set<FakeSourceSplit>> pendingSplits; | ||
private final Integer totalRowNum; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private final Integer totalRowNum; | |
private final int totalRowNum; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the review comment in FakeSourceSplitEnumerator, this parameter maybe need to be removed.
@Data | ||
@AllArgsConstructor | ||
public class FakeSourceSplit implements SourceSplit { | ||
private Integer rowNum; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private Integer rowNum; | |
private int rowNum; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the review comment in FakeSourceSplitEnumerator
, this parameter maybe need to be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed?
...ake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
Outdated
Show resolved
Hide resolved
LOG.info("Starting to calculate splits."); | ||
int numReaders = enumeratorContext.currentParallelism(); | ||
|
||
if (null != totalRowNum) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using the parameter row.size
to control the row num of each reader not the total number? The disadvantage of slicing now is that the user needs to calculate how many data will be generated by each parallel task during configuration, our original intention is to make user configuration simpler, the user only needs to configure how many data will be generated by each slice, so that the slice only needs one parameter id, then only need to use the id to take the balance of the parallelism when the slice is assigned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think fake should be simple,so a simple sharding approach is needed here, and the configuration file does not need to be changed.
@@ -57,9 +58,9 @@ private SeaTunnelRow randomRow() { | |||
return new SeaTunnelRow(randomRow.toArray()); | |||
} | |||
|
|||
public List<SeaTunnelRow> generateFakedRows() { | |||
public List<SeaTunnelRow> generateFakedRows(int rowNum) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the review comment in FakeSourceSplitEnumerator
, this method maybe need to revert because every split has the same row num.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert?
The reason why CI error is #2984 , please review. @TyrantLucifer @EricJoy2048 |
@Data | ||
@AllArgsConstructor | ||
public class FakeSourceSplit implements SourceSplit { | ||
private Integer rowNum; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed?
...ake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
Outdated
Show resolved
Hide resolved
@@ -57,9 +58,9 @@ private SeaTunnelRow randomRow() { | |||
return new SeaTunnelRow(randomRow.toArray()); | |||
} | |||
|
|||
public List<SeaTunnelRow> generateFakedRows() { | |||
public List<SeaTunnelRow> generateFakedRows(int rowNum) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert?
@@ -51,7 +51,17 @@ public SeaTunnelRowType getProducedType() { | |||
} | |||
|
|||
@Override | |||
public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception { | |||
public SourceSplitEnumerator<FakeSourceSplit, FakeSourceState> createEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) throws Exception { | |||
return new FakeSourceSplitEnumerator(enumeratorContext, fakeConfig.getRowNum()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return new FakeSourceSplitEnumerator(enumeratorContext, fakeConfig.getRowNum()); | |
return new FakeSourceSplitEnumerator(enumeratorContext, fakeConfig); |
|
||
@Override | ||
public SourceSplitEnumerator<FakeSourceSplit, FakeSourceState> restoreEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext, FakeSourceState checkpointState) throws Exception { | ||
return new FakeSourceSplitEnumerator(enumeratorContext, fakeConfig.getRowNum()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return new FakeSourceSplitEnumerator(enumeratorContext, fakeConfig.getRowNum()); | |
return new FakeSourceSplitEnumerator(enumeratorContext, fakeConfig); |
FakeSourceSplit split = splits.poll(); | ||
if (null != split) { | ||
// Generate a random number of rows to emit. | ||
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows(split.getRowNum()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows(split.getRowNum()); | |
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows(); |
LOG.info("Starting to calculate splits."); | ||
int numReaders = enumeratorContext.currentParallelism(); | ||
for (int i = 0; i < numReaders; i++) { | ||
allSplit.add(new FakeSourceSplit(rowNum, i)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allSplit.add(new FakeSourceSplit(rowNum, i)); | |
allSplit.add(new FakeSourceSplit(i)); |
@@ -46,7 +47,7 @@ public void testComplexSchemaParse(String conf) throws FileNotFoundException, UR | |||
SeaTunnelRowType seaTunnelRowType = seaTunnelSchema.getSeaTunnelRowType(); | |||
FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig); | |||
FakeDataGenerator fakeDataGenerator = new FakeDataGenerator(seaTunnelSchema, fakeConfig); | |||
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows(); | |||
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum()); | |
List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows(fakeConfig); |
please review again @TyrantLucifer |
|
||
public FakeSourceReader(SingleSplitReaderContext context, FakeDataGenerator randomData) { | ||
public FakeSourceReader(SourceReader.Context context, FakeDataGenerator randomData) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename randomData
to fakeDataGenerator
?
|
||
@Override | ||
public String splitId() { | ||
return splitId.toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return splitId.toString(); | |
return String.valueOf(splitId); |
… improve-supportparallelism
@Override | ||
public void handleNoMoreSplits() { | ||
noMoreSplit = true; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove useless blank line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, cc @EricJoy2048
...tor-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
Show resolved
Hide resolved
Please fix the CI error |
Error: JobMasterTest.testHandleCheckpointTimeout:128 » ConditionTimeout Assertion con... Maybe time setting is too short? |
rerun CI Thanks. @EricJoy2048 |
Some bug we must fix to resolve the CI problems. Please wait for us. |
rerun CI please. @EricJoy2048 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@ashulin help to review . |
b991a6b
to
c2e8e0b
Compare
|
||
private final SourceReader.Context context; | ||
private final Deque<FakeSourceSplit> splits = new LinkedList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private final Deque<FakeSourceSplit> splits = new LinkedList<>(); | |
private final Queue<FakeSourceSplit> splits = new LinkedList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused deque api?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused deque api?
cc @ashulin
...ke/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
Outdated
Show resolved
Hide resolved
…e/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java Co-authored-by: hailin0 <hailin088@gmail.com>
… improve-supportparallelism # Conflicts: # seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java # seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java # seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
+1 |
close #2961
Purpose of this pull request
feature Support more than splits and parallelism for fake connector.
Check list
New License Guide