Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion runners/apex/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@
<groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
<excludedGroups>
org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
org.apache.beam.sdk.testing.UsesStatefulParDo,
org.apache.beam.sdk.testing.UsesTimersInParDo,
org.apache.beam.sdk.testing.UsesSplittableParDo,
org.apache.beam.sdk.testing.UsesAttemptedMetrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,11 @@
import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElements;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
Expand All @@ -64,15 +61,6 @@ public void translate(ParDo.MultiOutput<InputT, OutputT> transform, TranslationC
String.format(
"%s does not support splittable DoFn: %s", ApexRunner.class.getSimpleName(), doFn));
}
if (signature.stateDeclarations().size() > 0) {
throw new UnsupportedOperationException(
String.format(
"Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
DoFn.StateId.class.getSimpleName(),
doFn.getClass().getName(),
DoFn.class.getSimpleName(),
ApexRunner.class.getSimpleName()));
}

if (signature.timerDeclarations().size() > 0) {
throw new UnsupportedOperationException(
Expand All @@ -87,10 +75,6 @@ public void translate(ParDo.MultiOutput<InputT, OutputT> transform, TranslationC
Map<TupleTag<?>, PValue> outputs = context.getOutputs();
PCollection<InputT> input = context.getInput();
List<PCollectionView<?>> sideInputs = transform.getSideInputs();
Coder<InputT> inputCoder = input.getCoder();
WindowedValueCoder<InputT> wvInputCoder =
FullWindowedValueCoder.of(
inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder());

ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
context.getPipelineOptions(),
Expand All @@ -99,7 +83,7 @@ public void translate(ParDo.MultiOutput<InputT, OutputT> transform, TranslationC
transform.getAdditionalOutputTags().getAll(),
input.getWindowingStrategy(),
sideInputs,
wvInputCoder,
input.getCoder(),
context.getStateBackend());

Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
Expand Down Expand Up @@ -144,10 +128,6 @@ public void translate(
Map<TupleTag<?>, PValue> outputs = context.getOutputs();
PCollection<InputT> input = context.getInput();
List<PCollectionView<?>> sideInputs = transform.getSideInputs();
Coder<InputT> inputCoder = input.getCoder();
WindowedValueCoder<InputT> wvInputCoder =
FullWindowedValueCoder.of(
inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder());

@SuppressWarnings({ "rawtypes", "unchecked" })
DoFn<InputT, OutputT> doFn = (DoFn) transform.newProcessFn(transform.getFn());
Expand All @@ -158,7 +138,7 @@ public void translate(
transform.getAdditionalOutputTags().getAll(),
input.getWindowingStrategy(),
sideInputs,
wvInputCoder,
input.getCoder(),
context.getStateBackend());

Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,14 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
Expand Down Expand Up @@ -133,7 +136,7 @@ public ApexParDoOperator(
List<TupleTag<?>> additionalOutputTags,
WindowingStrategy<?, ?> windowingStrategy,
List<PCollectionView<?>> sideInputs,
Coder<WindowedValue<InputT>> inputCoder,
Coder<InputT> linputCoder,
ApexStateBackend stateBackend
) {
this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
Expand All @@ -151,10 +154,13 @@ public ApexParDoOperator(
throw new UnsupportedOperationException(msg);
}

Coder<List<WindowedValue<InputT>>> listCoder = ListCoder.of(inputCoder);
WindowedValueCoder<InputT> wvCoder =
FullWindowedValueCoder.of(
linputCoder, this.windowingStrategy.getWindowFn().windowCoder());
Coder<List<WindowedValue<InputT>>> listCoder = ListCoder.of(wvCoder);
this.pushedBack = new ValueAndCoderKryoSerializable<>(new ArrayList<WindowedValue<InputT>>(),
listCoder);
this.inputCoder = inputCoder;
this.inputCoder = wvCoder;

TimerInternals.TimerDataCoder timerCoder =
TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
Expand All @@ -165,8 +171,16 @@ public ApexParDoOperator(
Coder<?> keyCoder = StringUtf8Coder.of();
this.currentKeyStateInternals = new StateInternalsProxy<>(
stateBackend.newStateInternalsFactory(keyCoder));
} else {
DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
if (signature.usesState()) {
checkArgument(linputCoder instanceof KvCoder, "keyed input required for stateful DoFn");
@SuppressWarnings("rawtypes")
Coder<?> keyCoder = ((KvCoder) linputCoder).getKeyCoder();
this.currentKeyStateInternals = new StateInternalsProxy<>(
stateBackend.newStateInternalsFactory(keyCoder));
}
}

}

@SuppressWarnings("unused") // for Kryo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTag.StateBinder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.ListCoder;
Expand Down Expand Up @@ -141,7 +140,6 @@ public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
namespace,
address,
accumCoder,
key,
combineFn
);
}
Expand Down Expand Up @@ -184,7 +182,7 @@ protected T readValue() {
// TODO: reuse input
Input input = new Input(buf);
try {
return coder.decode(input, Context.OUTER);
return coder.decode(input);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -195,7 +193,7 @@ protected T readValue() {
public void writeValue(T input) {
ByteArrayOutputStream output = new ByteArrayOutputStream();
try {
coder.encode(input, output, Context.OUTER);
coder.encode(input, output);
stateTable.put(namespace.stringKey(), address.getId(), output.toByteArray());
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -306,15 +304,13 @@ public TimestampCombiner getTimestampCombiner() {
private final class ApexCombiningState<K, InputT, AccumT, OutputT>
extends AbstractState<AccumT>
implements CombiningState<InputT, AccumT, OutputT> {
private final K key;
private final CombineFn<InputT, AccumT, OutputT> combineFn;

private ApexCombiningState(StateNamespace namespace,
StateTag<CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> coder,
K key, CombineFn<InputT, AccumT, OutputT> combineFn) {
CombineFn<InputT, AccumT, OutputT> combineFn) {
super(namespace, address, coder);
this.key = key;
this.combineFn = combineFn;
}

Expand All @@ -330,8 +326,7 @@ public OutputT read() {

@Override
public void add(InputT input) {
AccumT accum = getAccum();
combineFn.addInput(accum, input);
AccumT accum = combineFn.addInput(getAccum(), input);
writeValue(accum);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public class FlattenPCollectionTranslatorTest {
@Test
public void test() throws Exception {
ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
options.setApplicationName("FlattenPCollection");
options.setRunner(ApexRunner.class);
Pipeline p = Pipeline.create(options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
Expand Down Expand Up @@ -202,7 +201,6 @@ public void testSerialization() throws Exception {
.as(ApexPipelineOptions.class);
options.setRunner(TestApexRunner.class);
Pipeline pipeline = Pipeline.create(options);
Coder<WindowedValue<Integer>> coder = WindowedValue.getValueOnlyCoder(VarIntCoder.of());

PCollectionView<Integer> singletonView = pipeline.apply(Create.of(1))
.apply(Sum.integersGlobally().asSingletonView());
Expand All @@ -215,7 +213,7 @@ public void testSerialization() throws Exception {
TupleTagList.empty().getAll(),
WindowingStrategy.globalDefault(),
Collections.<PCollectionView<?>>singletonList(singletonView),
coder,
VarIntCoder.of(),
new ApexStateInternals.ApexStateBackend());
operator.setup(null);
operator.beginWindow(0);
Expand Down