Skip to content

Commit

Permalink
[hotfix][connector][jdbc] fix JDBC split exception (#2904)
Browse files Browse the repository at this point in the history
* [hotfix][connector][jdbc] fix JDBC split exception

* [hotfix][connector][jdbc] Make sure spark doesn't end when a single split is not finished reading
  • Loading branch information
ashulin committed Sep 27, 2022
1 parent 8dc1c60 commit 57342c6
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 28 deletions.
Expand Up @@ -58,20 +58,22 @@ public void close() throws IOException {
@Override
@SuppressWarnings("magicnumber")
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
JdbcSourceSplit split = splits.poll();
if (null != split) {
inputFormat.open(split);
while (!inputFormat.reachedEnd()) {
SeaTunnelRow seaTunnelRow = inputFormat.nextRecord();
output.collect(seaTunnelRow);
synchronized (output.getCheckpointLock()) {
JdbcSourceSplit split = splits.poll();
if (null != split) {
inputFormat.open(split);
while (!inputFormat.reachedEnd()) {
SeaTunnelRow seaTunnelRow = inputFormat.nextRecord();
output.collect(seaTunnelRow);
}
inputFormat.close();
} else if (noMoreSplit) {
// signal to the source that we have reached the end of the data.
LOG.info("Closed the bounded jdbc source");
context.signalNoMoreElement();
} else {
Thread.sleep(1000L);
}
inputFormat.close();
} else if (noMoreSplit) {
// signal to the source that we have reached the end of the data.
LOG.info("Closed the bounded jdbc source");
context.signalNoMoreElement();
} else {
Thread.sleep(1000L);
}
}

Expand Down
Expand Up @@ -28,47 +28,81 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import java.util.Map;
import java.util.Set;

public class JdbcSourceSplitEnumerator implements SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> {
private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSplitEnumerator.class);
private final SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext;
private List<JdbcSourceSplit> allSplit = new ArrayList<>();

private final Map<Integer, Set<JdbcSourceSplit>> pendingSplits;

private JdbcSourceOptions jdbcSourceOptions;
private final PartitionParameter partitionParameter;
private final int parallelism;

public JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter partitionParameter) {
this.enumeratorContext = enumeratorContext;
this.jdbcSourceOptions = jdbcSourceOptions;
this.partitionParameter = partitionParameter;
this.parallelism = enumeratorContext.currentParallelism();
this.pendingSplits = new HashMap<>();
}

@Override
public void open() {
// No connection needs to be opened
}

@Override
public void run() throws Exception {
discoverySplits();
assignPendingSplits();
}

private void discoverySplits() {
List<JdbcSourceSplit> allSplit = new ArrayList<>();
LOG.info("Starting to calculate splits.");
if (null != partitionParameter) {
JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider =
new JdbcNumericBetweenParametersProvider(partitionParameter.minValue, partitionParameter.maxValue).ofBatchNum(parallelism);
new JdbcNumericBetweenParametersProvider(partitionParameter.minValue, partitionParameter.maxValue).ofBatchNum(enumeratorContext.currentParallelism());
Serializable[][] parameterValues = jdbcNumericBetweenParametersProvider.getParameterValues();
for (int i = 0; i < parameterValues.length; i++) {
allSplit.add(new JdbcSourceSplit(parameterValues[i], i));
}
} else {
allSplit.add(new JdbcSourceSplit(null, 0));
}
int numReaders = enumeratorContext.currentParallelism();
for (JdbcSourceSplit split : allSplit) {
int ownerReader = split.splitId % numReaders;
pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
.add(split);
}
LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
}

@Override
public void run() throws Exception {
private void assignPendingSplits() {
// Check if there's any pending splits for given readers
for (int pendingReader : enumeratorContext.registeredReaders()) {
// Remove pending assignment for the reader
final Set<JdbcSourceSplit> pendingAssignmentForReader =
pendingSplits.remove(pendingReader);

if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) {
// Assign pending splits to reader
LOG.info("Assigning splits to readers {}", pendingAssignmentForReader);
enumeratorContext.assignSplit(pendingReader, new ArrayList<>(pendingAssignmentForReader));
enumeratorContext.signalNoMoreSplits(pendingReader);
}
}
}

@Override
public void close() throws IOException {

// nothing
}

@Override
Expand All @@ -87,12 +121,7 @@ public void handleSplitRequest(int subtaskId) {

@Override
public void registerReader(int subtaskId) {
// Filter the split that the current task needs to run
List<JdbcSourceSplit> splits = allSplit.stream()
.filter(p -> p.splitId % parallelism == subtaskId)
.collect(Collectors.toList());
enumeratorContext.assignSplit(subtaskId, splits);
enumeratorContext.signalNoMoreSplits(subtaskId);
// nothing
}

@Override
Expand Down
Expand Up @@ -42,7 +42,7 @@ public ParallelEnumeratorContext(ParallelSource<?, SplitT, ?> parallelSource,

@Override
public int currentParallelism() {
return running ? parallelism : 0;
return parallelism;
}

@Override
Expand Down

0 comments on commit 57342c6

Please sign in to comment.