Skip to content

Commit

Permalink
Merge 0c5a771 into 26a2c47
Browse files Browse the repository at this point in the history
  • Loading branch information
tgroh committed Jan 23, 2017
2 parents 26a2c47 + 0c5a771 commit 45fb8ad
Show file tree
Hide file tree
Showing 34 changed files with 385 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@

import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
import org.apache.beam.runners.apex.translation.utils.ValuesSource;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PBegin;

import org.apache.beam.sdk.values.PCollection;

/**
* Wraps elements from Create.Values into an {@link UnboundedSource}.
Expand All @@ -35,14 +33,10 @@ class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>>

@Override
public void translate(Create.Values<T> transform, TranslationContext context) {
try {
UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(transform.getElements(),
transform.getDefaultOutputCoder((PBegin) context.getInput()));
ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
unboundedSource, context.getPipelineOptions());
context.addOperator(operator, operator.output);
} catch (CannotProvideCoderException e) {
throw new RuntimeException(e);
}
UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(transform.getElements(),
((PCollection<T>) context.getOutput()).getCoder());
ApexReadUnboundedInputOperator<T, ?> operator =
new ApexReadUnboundedInputOperator<>(unboundedSource, context.getPipelineOptions());
context.addOperator(operator, operator.output);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

package org.apache.beam.runners.apex.translation;

import com.google.common.collect.Lists;
import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.beam.runners.apex.translation.operators.ApexFlattenOperator;
import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
import org.apache.beam.runners.apex.translation.utils.ValuesSource;
Expand All @@ -32,7 +32,7 @@
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TaggedPValue;

/**
* {@link Flatten.FlattenPCollectionList} translation to Apex operator.
Expand All @@ -43,10 +43,9 @@ class FlattenPCollectionTranslator<T> implements

@Override
public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
PCollectionList<T> input = context.getInput();
List<PCollection<T>> collections = input.getAll();
List<TaggedPValue> inputs = context.getInputs();

if (collections.isEmpty()) {
if (inputs.isEmpty()) {
// create a dummy source that never emits anything
@SuppressWarnings("unchecked")
UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(Collections.EMPTY_LIST,
Expand All @@ -55,10 +54,23 @@ public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationCo
unboundedSource, context.getPipelineOptions());
context.addOperator(operator, operator.output);
} else {
PCollection<T> output = context.getOutput();
PCollection<T> output = (PCollection<T>) context.getOutput();
Map<PCollection<?>, Integer> unionTags = Collections.emptyMap();
flattenCollections(collections, unionTags, output, context);
flattenCollections(extractPCollections(inputs), unionTags, output, context);
}
}

private List<PCollection<T>> extractPCollections(List<TaggedPValue> inputs) {
List<PCollection<T>> collections = Lists.newArrayList();
for (TaggedPValue pv : inputs) {
checkArgument(
pv.getValue() instanceof PCollection,
"Non-PCollection provided as input to flatten: %s of type %s",
pv.getValue(),
pv.getClass().getSimpleName());
collections.add((PCollection<T>) pv.getValue());
}
return collections;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>

@Override
public void translate(GroupByKey<K, V> transform, TranslationContext context) {
PCollection<KV<K, V>> input = context.getInput();
PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) context.getInput();
ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(),
input, context.<K>stateInternalsFactory()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -76,8 +76,8 @@ public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationCo
ApexRunner.class.getSimpleName()));
}

PCollectionTuple output = context.getOutput();
PCollection<InputT> input = context.getInput();
List<TaggedPValue> outputs = context.getOutputs();
PCollection<InputT> input = (PCollection<InputT>) context.getInput();
List<PCollectionView<?>> sideInputs = transform.getSideInputs();
Coder<InputT> inputCoder = input.getCoder();
WindowedValueCoder<InputT> wvInputCoder =
Expand All @@ -90,21 +90,28 @@ public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationCo
doFn,
transform.getMainOutputTag(),
transform.getSideOutputTags().getAll(),
context.<PCollection<?>>getInput().getWindowingStrategy(),
((PCollection<InputT>) context.getInput()).getWindowingStrategy(),
sideInputs,
wvInputCoder,
context.<Void>stateInternalsFactory());

Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
if (outputEntry.getKey() == transform.getMainOutputTag()) {
ports.put(outputEntry.getValue(), operator.output);
for (TaggedPValue output : outputs) {
checkArgument(
output.getValue() instanceof PCollection,
"%s %s outputs non-PCollection %s of type %s",
ParDo.BoundMulti.class.getSimpleName(),
context.getFullName(),
output.getValue(),
output.getValue().getClass().getSimpleName());
PCollection<?> pc = (PCollection<?>) output.getValue();
if (output.getTag().equals(transform.getMainOutputTag())) {
ports.put(pc, operator.output);
} else {
int portIndex = 0;
for (TupleTag<?> tag : transform.getSideOutputTags().getAll()) {
if (tag == outputEntry.getKey()) {
ports.put(outputEntry.getValue(), operator.sideOutputPorts[portIndex]);
if (tag.equals(output.getTag())) {
ports.put(pc, operator.sideOutputPorts[portIndex]);
break;
}
portIndex++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext
ApexRunner.class.getSimpleName()));
}

PCollection<OutputT> output = context.getOutput();
PCollection<InputT> input = context.getInput();
PCollection<OutputT> output = (PCollection<OutputT>) context.getOutput();
PCollection<InputT> input = (PCollection<InputT>) context.getInput();
List<PCollectionView<?>> sideInputs = transform.getSideInputs();
Coder<InputT> inputCoder = input.getCoder();
WindowedValueCoder<InputT> wvInputCoder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.InputPort;
import com.datatorrent.api.Operator.OutputPort;
import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -40,7 +41,8 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

Expand Down Expand Up @@ -78,12 +80,24 @@ public ApexPipelineOptions getPipelineOptions() {
return pipelineOptions;
}

public <InputT extends PInput> InputT getInput() {
return (InputT) getCurrentTransform().getInput();
public String getFullName() {
return getCurrentTransform().getFullName();
}

public <OutputT extends POutput> OutputT getOutput() {
return (OutputT) getCurrentTransform().getOutput();
public List<TaggedPValue> getInputs() {
return getCurrentTransform().getInputs();
}

public PValue getInput() {
return Iterables.getOnlyElement(getCurrentTransform().getInputs()).getValue();
}

public List<TaggedPValue> getOutputs() {
return getCurrentTransform().getOutputs();
}

public PValue getOutput() {
return Iterables.getOnlyElement(getCurrentTransform().getOutputs()).getValue();
}

private AppliedPTransform<?, ?, ?> getCurrentTransform() {
Expand All @@ -92,7 +106,7 @@ public <OutputT extends POutput> OutputT getOutput() {
}

public void addOperator(Operator operator, OutputPort port) {
addOperator(operator, port, this.<PCollection<?>>getOutput());
addOperator(operator, port, (PCollection<?>) getOutput());
}

/**
Expand Down Expand Up @@ -170,5 +184,4 @@ public void populateDAG(DAG dag) {
public <K> StateInternalsFactory<K> stateInternalsFactory() {
return new ApexStateInternals.ApexStateInternalsFactory();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class WindowBoundTranslator<T> implements TransformTranslator<Window.Bound<T>> {

@Override
public void translate(Window.Bound<T> transform, TranslationContext context) {
PCollection<T> output = context.getOutput();
PCollection<T> input = context.getInput();
PCollection<T> output = (PCollection<T>) context.getOutput();
PCollection<T> input = (PCollection<T>) context.getInput();
@SuppressWarnings("unchecked")
WindowingStrategy<T, BoundedWindow> windowingStrategy =
(WindowingStrategy<T, BoundedWindow>) output.getWindowingStrategy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -102,7 +103,7 @@ public void cleanup() {
*/
private static class BoundedReadEvaluator<OutputT>
implements TransformEvaluator<BoundedSourceShard<OutputT>> {
private final AppliedPTransform<?, PCollection<OutputT>, ?> transform;
private final PCollection<OutputT> outputPCollection;
private final EvaluationContext evaluationContext;
private Builder resultBuilder;

Expand All @@ -114,9 +115,10 @@ public BoundedReadEvaluator(
EvaluationContext evaluationContext,
long minimumDynamicSplitSize,
ExecutorService executor) {
this.transform = transform;
this.evaluationContext = evaluationContext;
resultBuilder = StepTransformResult.withoutHold(transform);
this.outputPCollection =
(PCollection<OutputT>) Iterables.getOnlyElement(transform.getOutputs()).getValue();
this.resultBuilder = StepTransformResult.withoutHold(transform);
this.minimumDynamicSplitSize = minimumDynamicSplitSize;
this.produceSplitExecutor = executor;
}
Expand All @@ -129,7 +131,7 @@ public void processElement(WindowedValue<BoundedSourceShard<OutputT>> element)
source.createReader(evaluationContext.getPipelineOptions())) {
boolean contentsRemaining = reader.start();
Future<BoundedSource<OutputT>> residualFuture = startDynamicSplitThread(source, reader);
UncommittedBundle<OutputT> output = evaluationContext.createBundle(transform.getOutput());
UncommittedBundle<OutputT> output = evaluationContext.createBundle(outputPCollection);
while (contentsRemaining) {
output.add(
WindowedValue.timestampedValueInGlobalWindow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public boolean isDone(AppliedPTransform<?, ?, ?> transform) {
}
// If the PTransform has any unbounded outputs, and unbounded producers should not be shut down,
// the PTransform may produce additional output. It is not done.
for (TaggedPValue output : transform.getOutput().expand()) {
for (TaggedPValue output : transform.getOutputs()) {
if (output.getValue() instanceof PCollection) {
IsBounded bounded = ((PCollection<?>) output.getValue()).isBounded();
if (bounded.equals(IsBounded.UNBOUNDED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,9 @@ private void fireTimers() throws Exception {
evaluationContext
.createKeyedBundle(
transformTimers.getKey(),
(PCollection) transformTimers.getTransform().getInput())
(PCollection)
Iterables.getOnlyElement(transformTimers.getTransform().getInputs())
.getValue())
.add(WindowedValue.valueInGlobalWindow(work))
.commit(evaluationContext.now());
scheduleConsumption(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.direct;

import com.google.common.collect.Iterables;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
Expand Down Expand Up @@ -55,7 +56,8 @@ private <InputT> TransformEvaluator<InputT> createInMemoryEvaluator(
PCollectionList<InputT>, PCollection<InputT>, FlattenPCollectionList<InputT>>
application) {
final UncommittedBundle<InputT> outputBundle =
evaluationContext.createBundle(application.getOutput());
evaluationContext.createBundle(
(PCollection<InputT>) Iterables.getOnlyElement(application.getOutputs()).getValue());
final TransformResult<InputT> result =
StepTransformResult.<InputT>withoutHold(application).addOutput(outputBundle).build();
return new FlattenEvaluator<>(outputBundle, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ public void processElement(WindowedValue<KeyedWorkItem<K, V>> element) throws Ex
K key = workItem.key();

UncommittedBundle<KV<K, Iterable<V>>> bundle =
evaluationContext.createKeyedBundle(structuralKey, application.getOutput());
evaluationContext.createKeyedBundle(
structuralKey,
(PCollection<KV<K, Iterable<V>>>)
Iterables.getOnlyElement(application.getOutputs()).getValue());
outputBundles.add(bundle);
CopyOnAccessInMemoryStateInternals<K> stateInternals =
(CopyOnAccessInMemoryStateInternals<K>) stepContext.stateInternals();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkState;
import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;

import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -102,7 +103,10 @@ public GroupByKeyOnlyEvaluator(
DirectGroupByKeyOnly<K, V>> application) {
this.evaluationContext = evaluationContext;
this.application = application;
this.keyCoder = getKeyCoder(application.getInput().getCoder());
this.keyCoder =
getKeyCoder(
((PCollection<KV<K, V>>) Iterables.getOnlyElement(application.getInputs()).getValue())
.getCoder());
this.groupingMap = new HashMap<>();
}

Expand Down Expand Up @@ -152,7 +156,9 @@ public TransformResult<KV<K, V>> finishBundle() {
KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
UncommittedBundle<KeyedWorkItem<K, V>> bundle =
evaluationContext.createKeyedBundle(
StructuralKey.of(key, keyCoder), application.getOutput());
StructuralKey.of(key, keyCoder),
(PCollection<KeyedWorkItem<K, V>>)
Iterables.getOnlyElement(application.getOutputs()).getValue());
bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
resultBuilder.addOutput(bundle);
}
Expand Down

0 comments on commit 45fb8ad

Please sign in to comment.