Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-492] Refactor LoopingDatasetFinderSource to make it extensible #2363

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.PeekingIterator;
import com.typesafe.config.Config;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
Expand All @@ -44,7 +43,6 @@
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.util.ConfigUtils;

import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -93,9 +91,8 @@ public WorkUnitStream getWorkunitStream(SourceState state) {
public WorkUnitStream getWorkunitStream(SourceState state, boolean isDatasetStateStoreEnabled) {
this.isDatasetStateStoreEnabled = isDatasetStateStoreEnabled;
try {
int maxWorkUnits = state.getPropAsInt(MAX_WORK_UNITS_PER_RUN_KEY, MAX_WORK_UNITS_PER_RUN);
Preconditions.checkArgument(maxWorkUnits > 0, "Max work units must be greater than 0!");
Config config = ConfigUtils.propertiesToConfig(state.getProperties());
int maximumWorkUnits = state.getPropAsInt(MAX_WORK_UNITS_PER_RUN_KEY, MAX_WORK_UNITS_PER_RUN);
Preconditions.checkArgument(maximumWorkUnits > 0, "Max work units must be greater than 0!");

List<WorkUnitState> previousWorkUnitStates = (this.isDatasetStateStoreEnabled) ? state
.getPreviousWorkUnitStates(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN)
Expand All @@ -109,42 +106,50 @@ public WorkUnitStream getWorkunitStream(SourceState state, boolean isDatasetStat
}
}

IterableDatasetFinder datasetsFinder = createDatasetsFinder(state);

Stream<Dataset> datasetStream =
datasetsFinder.getDatasetsStream(Spliterator.SORTED, this.lexicographicalComparator);
datasetStream = sortStreamLexicographically(datasetStream);

String previousDatasetUrnWatermark = null;
String previousPartitionUrnWatermark = null;
if (maxWorkUnit.isPresent() && !maxWorkUnit.get().getPropAsBoolean(END_OF_DATASETS_KEY, false)) {
previousDatasetUrnWatermark = maxWorkUnit.get().getProp(DATASET_URN);
previousPartitionUrnWatermark = maxWorkUnit.get().getProp(PARTITION_URN);
}
return new BasicWorkUnitStream.Builder(getWorkUnitIterator(datasetStream.iterator(), previousDatasetUrnWatermark,
previousPartitionUrnWatermark, maximumWorkUnits)).setFiniteStream(true).build();

IterableDatasetFinder datasetsFinder = createDatasetsFinder(state);

Stream<Dataset> datasetStream =
datasetsFinder.getDatasetsStream(Spliterator.SORTED, this.lexicographicalComparator);
datasetStream = sortStreamLexicographically(datasetStream);

return new BasicWorkUnitStream.Builder(
new DeepIterator(datasetStream.iterator(), previousDatasetUrnWatermark, previousPartitionUrnWatermark,
maxWorkUnits, config)).setFiniteStream(true).build();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

/**
* A factory to generate {@link WorkUnitStream} given a generic type datasetIterator.
* @throws IOException
*/
protected Iterator<WorkUnit> getWorkUnitIterator(Iterator<Dataset> datasetIterator, String previousDatasetUrnWatermark,
@Nullable String previousPartitionUrnWatermark, int maximumWorkUnits) throws IOException {
return new DeepIterator(datasetIterator, previousDatasetUrnWatermark, previousPartitionUrnWatermark,
maximumWorkUnits);
}

/**
* A deep iterator that advances input streams until the correct position, then possibly iterates over partitions
* of {@link PartitionableDataset}s.
*/
private class DeepIterator extends AbstractIterator<WorkUnit> {
private final Iterator<Dataset> baseIterator;
private final int maxWorkUnits;

protected class DeepIterator extends AbstractIterator<WorkUnit> {
protected final Iterator<Dataset> baseIterator;
protected final int maxWorkUnits;
protected int generatedWorkUnits = 0;
protected Dataset previousDataset;
private Iterator<PartitionableDataset.DatasetPartition> currentPartitionIterator;
private int generatedWorkUnits = 0;
private Dataset previousDataset;
private PartitionableDataset.DatasetPartition previousPartition;

public DeepIterator(Iterator<Dataset> baseIterator, String previousDatasetUrnWatermark,
String previousPartitionUrnWatermark, int maxWorkUnits, Config config)
String previousPartitionUrnWatermark, int maxWorkUnits)
throws IOException {
this.maxWorkUnits = maxWorkUnits;
this.baseIterator = baseIterator;
Expand Down Expand Up @@ -201,6 +206,21 @@ protected WorkUnit computeNext() {
return endOfData();
}

WorkUnit resultWU = doComputeNext();
if (resultWU == null) {
resultWU = generateNoopWorkUnit();
this.generatedWorkUnits = Integer.MAX_VALUE;
resultWU.setProp(END_OF_DATASETS_KEY, true);
}
return resultWU;
}

/**
* A extensible method that generate a workunit based on the Iterator generated from {@link #getWorkUnitIterator}.
* It interacts with {@link #baseIterator} and {@link #currentPartitionIterator} to know the very next
* dataset/partition to be converted into a workunit.
*/
protected WorkUnit doComputeNext() {
while (this.baseIterator.hasNext() || this.currentPartitionIterator.hasNext()) {
if (this.currentPartitionIterator != null && this.currentPartitionIterator.hasNext()) {
PartitionableDataset.DatasetPartition partition = this.currentPartitionIterator.next();
Expand All @@ -215,7 +235,6 @@ protected WorkUnit computeNext() {
this.generatedWorkUnits++;
return workUnit;
}

Dataset dataset = this.baseIterator.next();
if (drilldownIntoPartitions && dataset instanceof PartitionableDataset) {
this.currentPartitionIterator = getPartitionIterator((PartitionableDataset) dataset);
Expand All @@ -230,13 +249,14 @@ protected WorkUnit computeNext() {
return workUnit;
}
}
WorkUnit workUnit = generateNoopWorkUnit();
this.generatedWorkUnits = Integer.MAX_VALUE;
workUnit.setProp(END_OF_DATASETS_KEY, true);
return workUnit;
return null;
}

private void addDatasetInfoToWorkUnit(WorkUnit workUnit, Dataset dataset) {
/**
* It is not necessary the case that each workunit is corresponding to a single {@link Dataset},
* thus we make this method extensible.
*/
protected void addDatasetInfoToWorkUnit(WorkUnit workUnit, Dataset dataset) {
if (isDatasetStateStoreEnabled) {
workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, dataset.getUrn());
}
Expand Down