From f77814f2eef931e3f2fdf82250fc8f1175d9ea8a Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Thu, 31 Mar 2016 10:40:37 -0700 Subject: [PATCH 1/2] Explicitly track the Source a ReadEvaluator is using This permits use of sources that are not the initial source used in the transform. BoundedSource#splitIntoBundles and UnboundedSource#generateInitialSplits generate multiple source objects for the same transform in order to permit parallelism. --- .../BoundedReadEvaluatorFactory.java | 20 +++++++++++-------- .../UnboundedReadEvaluatorFactory.java | 14 ++++++++++--- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java index caec1fc7c13da..9db2d5aba5bf4 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java @@ -62,8 +62,7 @@ public TransformEvaluator forApplication( private TransformEvaluator getTransformEvaluator( final AppliedPTransform, Bounded> transform, - final InProcessEvaluationContext evaluationContext) - throws IOException { + final InProcessEvaluationContext evaluationContext) { BoundedReadEvaluator evaluator = getTransformEvaluatorQueue(transform, evaluationContext).poll(); if (evaluator == null) { @@ -93,8 +92,9 @@ private Queue> getTransformEvaluatorQueu if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) { // If no queue existed in the evaluators, add an evaluator to initialize the evaluator // factory for this transform + BoundedSource source = transform.getTransform().getSource(); BoundedReadEvaluator evaluator = - new BoundedReadEvaluator(transform, evaluationContext); + new BoundedReadEvaluator(transform, evaluationContext, source); evaluatorQueue.offer(evaluator); } else { // otherwise return the existing Queue that arrived before us @@ -117,12 +117,19 @@ private static class BoundedReadEvaluator implements TransformEvaluator private final AppliedPTransform, Bounded> transform; private final InProcessEvaluationContext evaluationContext; private boolean contentsRemaining; + /** + * The source being read from by this {@link BoundedReadEvaluator}. This may not be the same + * as the source derived from {@link #transform} due to splitting. + */ + private BoundedSource source; public BoundedReadEvaluator( AppliedPTransform, Bounded> transform, - InProcessEvaluationContext evaluationContext) { + InProcessEvaluationContext evaluationContext, + BoundedSource source) { this.transform = transform; this.evaluationContext = evaluationContext; + this.source = source; } @Override @@ -131,10 +138,7 @@ public void processElement(WindowedValue element) {} @Override public InProcessTransformResult finishBundle() throws IOException { try (final Reader reader = - transform - .getTransform() - .getSource() - .createReader(evaluationContext.getPipelineOptions());) { + source.createReader(evaluationContext.getPipelineOptions());) { contentsRemaining = reader.start(); UncommittedBundle output = evaluationContext.createRootBundle(transform.getOutput()); diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java index fa162903c25a1..0f2e4f4879486 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java @@ -90,8 +90,10 @@ private Queue> getTransformEvaluatorQu if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) { // If no queue existed in the evaluators, add an evaluator to initialize the evaluator // factory for this transform + UnboundedSource source = transform.getTransform().getSource(); UnboundedReadEvaluator evaluator = - new UnboundedReadEvaluator(transform, evaluationContext, evaluatorQueue); + new UnboundedReadEvaluator( + transform, evaluationContext, source, evaluatorQueue); evaluatorQueue.offer(evaluator); } else { // otherwise return the existing Queue that arrived before us @@ -116,15 +118,22 @@ private static class UnboundedReadEvaluator implements TransformEvaluat private final AppliedPTransform, Unbounded> transform; private final InProcessEvaluationContext evaluationContext; private final Queue> evaluatorQueue; + /** + * The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same + * source as derived from {@link #transform} due to splitting. + */ + private final UnboundedSource source; private CheckpointMark checkpointMark; public UnboundedReadEvaluator( AppliedPTransform, Unbounded> transform, InProcessEvaluationContext evaluationContext, + UnboundedSource source, Queue> evaluatorQueue) { this.transform = transform; this.evaluationContext = evaluationContext; this.evaluatorQueue = evaluatorQueue; + this.source = source; this.checkpointMark = null; } @@ -135,8 +144,7 @@ public void processElement(WindowedValue element) {} public InProcessTransformResult finishBundle() throws IOException { UncommittedBundle output = evaluationContext.createRootBundle(transform.getOutput()); try (UnboundedReader reader = - createReader( - transform.getTransform().getSource(), evaluationContext.getPipelineOptions());) { + createReader(source, evaluationContext.getPipelineOptions());) { int numElements = 0; if (reader.start()) { do { From a8f862d1e8d6ff3ef007d7a973fe40747e1bc1f6 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Thu, 31 Mar 2016 10:43:56 -0700 Subject: [PATCH 2/2] Use proper scoping, interfaces in BoundedReadEvaluator Use BoundedReader instead of Reader. contentsRemaining should be method-scoped not instance-scoped. --- .../sdk/runners/inprocess/BoundedReadEvaluatorFactory.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java index 9db2d5aba5bf4..f034e2fc0a147 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java @@ -20,7 +20,6 @@ import com.google.cloud.dataflow.sdk.io.BoundedSource; import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader; import com.google.cloud.dataflow.sdk.io.Read.Bounded; -import com.google.cloud.dataflow.sdk.io.Source.Reader; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; @@ -116,7 +115,6 @@ private Queue> getTransformEvaluatorQueu private static class BoundedReadEvaluator implements TransformEvaluator { private final AppliedPTransform, Bounded> transform; private final InProcessEvaluationContext evaluationContext; - private boolean contentsRemaining; /** * The source being read from by this {@link BoundedReadEvaluator}. This may not be the same * as the source derived from {@link #transform} due to splitting. @@ -137,9 +135,9 @@ public void processElement(WindowedValue element) {} @Override public InProcessTransformResult finishBundle() throws IOException { - try (final Reader reader = + try (final BoundedReader reader = source.createReader(evaluationContext.getPipelineOptions());) { - contentsRemaining = reader.start(); + boolean contentsRemaining = reader.start(); UncommittedBundle output = evaluationContext.createRootBundle(transform.getOutput()); while (contentsRemaining) {