Skip to content

Commit

Permalink
[GOBBLIN-492] Refactor LoopingDatasetFinderSource to make it extensible
Browse files Browse the repository at this point in the history
Closes #2363 from
autumnust/refactorIteratorFactory
  • Loading branch information
autumnust authored and htran1 committed May 15, 2018
1 parent 249d5a1 commit 1c2d30b
Showing 1 changed file with 47 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.PeekingIterator;
import com.typesafe.config.Config;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
Expand All @@ -44,7 +43,6 @@
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.util.ConfigUtils;

import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -93,9 +91,8 @@ public WorkUnitStream getWorkunitStream(SourceState state) {
public WorkUnitStream getWorkunitStream(SourceState state, boolean isDatasetStateStoreEnabled) {
this.isDatasetStateStoreEnabled = isDatasetStateStoreEnabled;
try {
int maxWorkUnits = state.getPropAsInt(MAX_WORK_UNITS_PER_RUN_KEY, MAX_WORK_UNITS_PER_RUN);
Preconditions.checkArgument(maxWorkUnits > 0, "Max work units must be greater than 0!");
Config config = ConfigUtils.propertiesToConfig(state.getProperties());
int maximumWorkUnits = state.getPropAsInt(MAX_WORK_UNITS_PER_RUN_KEY, MAX_WORK_UNITS_PER_RUN);
Preconditions.checkArgument(maximumWorkUnits > 0, "Max work units must be greater than 0!");

List<WorkUnitState> previousWorkUnitStates = (this.isDatasetStateStoreEnabled) ? state
.getPreviousWorkUnitStates(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN)
Expand All @@ -109,42 +106,50 @@ public WorkUnitStream getWorkunitStream(SourceState state, boolean isDatasetStat
}
}

IterableDatasetFinder datasetsFinder = createDatasetsFinder(state);

Stream<Dataset> datasetStream =
datasetsFinder.getDatasetsStream(Spliterator.SORTED, this.lexicographicalComparator);
datasetStream = sortStreamLexicographically(datasetStream);

String previousDatasetUrnWatermark = null;
String previousPartitionUrnWatermark = null;
if (maxWorkUnit.isPresent() && !maxWorkUnit.get().getPropAsBoolean(END_OF_DATASETS_KEY, false)) {
previousDatasetUrnWatermark = maxWorkUnit.get().getProp(DATASET_URN);
previousPartitionUrnWatermark = maxWorkUnit.get().getProp(PARTITION_URN);
}
return new BasicWorkUnitStream.Builder(getWorkUnitIterator(datasetStream.iterator(), previousDatasetUrnWatermark,
previousPartitionUrnWatermark, maximumWorkUnits)).setFiniteStream(true).build();

IterableDatasetFinder datasetsFinder = createDatasetsFinder(state);

Stream<Dataset> datasetStream =
datasetsFinder.getDatasetsStream(Spliterator.SORTED, this.lexicographicalComparator);
datasetStream = sortStreamLexicographically(datasetStream);

return new BasicWorkUnitStream.Builder(
new DeepIterator(datasetStream.iterator(), previousDatasetUrnWatermark, previousPartitionUrnWatermark,
maxWorkUnits, config)).setFiniteStream(true).build();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

/**
* A factory to generate {@link WorkUnitStream} given a generic type datasetIterator.
* @throws IOException
*/
protected Iterator<WorkUnit> getWorkUnitIterator(Iterator<Dataset> datasetIterator, String previousDatasetUrnWatermark,
@Nullable String previousPartitionUrnWatermark, int maximumWorkUnits) throws IOException {
return new DeepIterator(datasetIterator, previousDatasetUrnWatermark, previousPartitionUrnWatermark,
maximumWorkUnits);
}

/**
* A deep iterator that advances input streams until the correct position, then possibly iterates over partitions
* of {@link PartitionableDataset}s.
*/
private class DeepIterator extends AbstractIterator<WorkUnit> {
private final Iterator<Dataset> baseIterator;
private final int maxWorkUnits;

protected class DeepIterator extends AbstractIterator<WorkUnit> {
protected final Iterator<Dataset> baseIterator;
protected final int maxWorkUnits;
protected int generatedWorkUnits = 0;
protected Dataset previousDataset;
private Iterator<PartitionableDataset.DatasetPartition> currentPartitionIterator;
private int generatedWorkUnits = 0;
private Dataset previousDataset;
private PartitionableDataset.DatasetPartition previousPartition;

public DeepIterator(Iterator<Dataset> baseIterator, String previousDatasetUrnWatermark,
String previousPartitionUrnWatermark, int maxWorkUnits, Config config)
String previousPartitionUrnWatermark, int maxWorkUnits)
throws IOException {
this.maxWorkUnits = maxWorkUnits;
this.baseIterator = baseIterator;
Expand Down Expand Up @@ -201,6 +206,21 @@ protected WorkUnit computeNext() {
return endOfData();
}

WorkUnit resultWU = doComputeNext();
if (resultWU == null) {
resultWU = generateNoopWorkUnit();
this.generatedWorkUnits = Integer.MAX_VALUE;
resultWU.setProp(END_OF_DATASETS_KEY, true);
}
return resultWU;
}

/**
* A extensible method that generate a workunit based on the Iterator generated from {@link #getWorkUnitIterator}.
* It interacts with {@link #baseIterator} and {@link #currentPartitionIterator} to know the very next
* dataset/partition to be converted into a workunit.
*/
protected WorkUnit doComputeNext() {
while (this.baseIterator.hasNext() || this.currentPartitionIterator.hasNext()) {
if (this.currentPartitionIterator != null && this.currentPartitionIterator.hasNext()) {
PartitionableDataset.DatasetPartition partition = this.currentPartitionIterator.next();
Expand All @@ -215,7 +235,6 @@ protected WorkUnit computeNext() {
this.generatedWorkUnits++;
return workUnit;
}

Dataset dataset = this.baseIterator.next();
if (drilldownIntoPartitions && dataset instanceof PartitionableDataset) {
this.currentPartitionIterator = getPartitionIterator((PartitionableDataset) dataset);
Expand All @@ -230,13 +249,14 @@ protected WorkUnit computeNext() {
return workUnit;
}
}
WorkUnit workUnit = generateNoopWorkUnit();
this.generatedWorkUnits = Integer.MAX_VALUE;
workUnit.setProp(END_OF_DATASETS_KEY, true);
return workUnit;
return null;
}

private void addDatasetInfoToWorkUnit(WorkUnit workUnit, Dataset dataset) {
/**
* It is not necessary the case that each workunit is corresponding to a single {@link Dataset},
* thus we make this method extensible.
*/
protected void addDatasetInfoToWorkUnit(WorkUnit workUnit, Dataset dataset) {
if (isDatasetStateStoreEnabled) {
workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, dataset.getUrn());
}
Expand Down

0 comments on commit 1c2d30b

Please sign in to comment.