Skip to content

Commit

Permalink
Merge e0a5202 into 4927cc1
Browse files Browse the repository at this point in the history
  • Loading branch information
amitsela committed Dec 14, 2016
2 parents 4927cc1 + e0a5202 commit 950f0d4
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 5 deletions.
Expand Up @@ -19,16 +19,26 @@
package org.apache.beam.runners.spark;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;

import org.apache.beam.runners.core.UnboundedReadFromBoundedSource;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.ValueWithRecordId;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;


/**
* The SparkRunner translate operations defined on a pipeline to a representation executable
* by Spark, and then submitting the job to Spark to be executed. If we wanted to run a Beam
Expand All @@ -53,9 +63,11 @@
public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {

private SparkRunner delegate;
private boolean isStreaming;

private TestSparkRunner(SparkPipelineOptions options) {
this.delegate = SparkRunner.fromOptions(options);
this.isStreaming = options.isStreaming();
}

public static TestSparkRunner fromOptions(PipelineOptions options) {
Expand All @@ -65,11 +77,23 @@ public static TestSparkRunner fromOptions(PipelineOptions options) {
return new TestSparkRunner(sparkOptions);
}

/**
* Overrides for the test runner.
*/
@SuppressWarnings("unchecked")
@Override
public <OutputT extends POutput, InputT extends PInput>
OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
return delegate.apply(transform, input);
};
public <OutputT extends POutput, InputT extends PInput> OutputT apply(
PTransform<InputT, OutputT> transform, InputT input) {
// if the pipeline is required to execute as a streaming pipeline,
// and the source is an adapted unbounded source (as bounded),
// read it as unbounded source via UnboundedReadFromBoundedSource.
if (isStreaming && transform instanceof BoundedReadFromUnboundedSource) {
return (OutputT) delegate.apply(new AdaptedBoundedAsUnbounded(
(BoundedReadFromUnboundedSource) transform), input);
} else {
return delegate.apply(transform, input);
}
}

@Override
public SparkPipelineResult run(Pipeline pipeline) {
Expand All @@ -78,6 +102,33 @@ public SparkPipelineResult run(Pipeline pipeline) {
result.waitUntilFinish();
assertThat(result, testPipelineOptions.getOnCreateMatcher());
assertThat(result, testPipelineOptions.getOnSuccessMatcher());
// if the pipeline was executed in streaming mode, validate aggregators.
if (isStreaming) {
// validate assertion succeeded (at least once).
int success = result.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class);
assertThat("Success aggregator should be greater than zero.", success, not(0));
// validate assertion didn't fail.
int failure = result.getAggregatorValue(PAssert.FAILURE_COUNTER, Integer.class);
assertThat("Failure aggregator should be zero.", failure, is(0));
}
return result;
}

private static class AdaptedBoundedAsUnbounded<T> extends PTransform<PBegin, PCollection<T>> {
private final BoundedReadFromUnboundedSource<T> source;

AdaptedBoundedAsUnbounded(BoundedReadFromUnboundedSource<T> source) {
this.source = source;
}

@SuppressWarnings("unchecked")
@Override
public PCollection<T> expand(PBegin input) {
PTransform<PBegin, ? extends PCollection<ValueWithRecordId<T>>> replacingTransform =
new UnboundedReadFromBoundedSource<>(source.getAdaptedSource());
return (PCollection<T>) input.apply(replacingTransform)
.apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn()));
}
}

}
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.spark;

import static org.hamcrest.MatcherAssert.assertThat;

import java.io.IOException;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
import org.junit.Test;


/**
* Test that we can "force streaming" on pipelines with {@link BoundedReadFromUnboundedSource}
* inputs using the {@link TestSparkRunner}.
*
* <p>The test validates that when a pipeline reads from a {@link BoundedReadFromUnboundedSource},
* with {@link SparkPipelineOptions#setStreaming(boolean)} true
* and using the {@link TestSparkRunner}; the {@link Read.Bounded} transform
* is replaced by an {@link Read.Unbounded} transform.
*
* <p>This test does not execute a pipeline.
*/
public class ForceStreamingTest {

@Test
public void test() throws IOException {
SparkPipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
options.setRunner(TestSparkRunner.class);
// force streaming.
options.setStreaming(true);

Pipeline pipeline = Pipeline.create(options);

// apply the BoundedReadFromUnboundedSource.
@SuppressWarnings("unchecked")
BoundedReadFromUnboundedSource boundedRead =
Read.from(new FakeUnboundedSource()).withMaxNumRecords(-1);
//noinspection unchecked
pipeline.apply(boundedRead);

UnboundedReadDetector unboundedReadDetector = new UnboundedReadDetector();
pipeline.traverseTopologically(unboundedReadDetector);

// assert that the applied BoundedReadFromUnboundedSource
// is being treated as an unbounded read.
assertThat("Expected to have an unbounded read.", unboundedReadDetector.isUnbounded);
}

/**
* Traverses the Pipeline to check if the input is indeed a {@link Read.Unbounded}.
*/
private class UnboundedReadDetector extends Pipeline.PipelineVisitor.Defaults {
private boolean isUnbounded = false;

@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
Class<? extends PTransform> transformClass = node.getTransform().getClass();
if (transformClass == Read.Unbounded.class) {
isUnbounded = true;
}
}

}

/**
* A fake {@link UnboundedSource} to satisfy the compiler.
*/
private static class FakeUnboundedSource extends UnboundedSource {

@Override
public List<? extends UnboundedSource> generateInitialSplits(
int desiredNumSplits,
PipelineOptions options) throws Exception {
return null;
}

@Override
public UnboundedReader createReader(
PipelineOptions options,
CheckpointMark checkpointMark) throws IOException {
return null;
}

@Override
public Coder getCheckpointMarkCoder() {
return null;
}

@Override
public void validate() { }

@Override
public Coder getDefaultOutputCoder() {
return null;
}
}

}
Expand Up @@ -27,6 +27,7 @@
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Distinct;
Expand All @@ -50,6 +51,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
private final UnboundedSource<T, ?> source;
private final long maxNumRecords;
private final Duration maxReadTime;
private final BoundedSource<ValueWithRecordId<T>> adaptedSource;
private static final FluentBackoff BACKOFF_FACTORY =
FluentBackoff.DEFAULT
.withInitialBackoff(Duration.millis(10))
Expand Down Expand Up @@ -81,12 +83,22 @@ public BoundedReadFromUnboundedSource<T> withMaxReadTime(Duration maxReadTime) {
this.source = source;
this.maxNumRecords = maxNumRecords;
this.maxReadTime = maxReadTime;
this.adaptedSource = new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime);
}

/**
* Returns an adapted {@link BoundedSource} wrapping the underlying {@link UnboundedSource},
* with the specified bounds on number of records and read time.
*/
@Experimental
public BoundedSource<ValueWithRecordId<T>> getAdaptedSource() {
return adaptedSource;
}

@Override
public PCollection<T> expand(PBegin input) {
PCollection<ValueWithRecordId<T>> read = Pipeline.applyTransform(input,
Read.from(new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime)));
Read.from(getAdaptedSource()));
if (source.requiresDeduping()) {
read = read.apply(Distinct.withRepresentativeValueFn(
new SerializableFunction<ValueWithRecordId<T>, byte[]>() {
Expand Down

0 comments on commit 950f0d4

Please sign in to comment.