diff --git a/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java b/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java index a6269a085e..6a6ca4d438 100644 --- a/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java +++ b/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java @@ -20,7 +20,6 @@ import org.apache.nemo.common.exception.CompileTimeOptimizationException; import org.apache.nemo.common.ir.edge.IREdge; -import org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty; import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty; import org.apache.nemo.common.ir.edge.executionproperty.MetricCollectionProperty; import org.apache.nemo.common.ir.vertex.*; @@ -258,14 +257,6 @@ private void sinkCheck() { * Helper method to check that all execution properties are correct and makes sense. */ private void executionPropertyCheck() { - // SideInput is not compatible with Push - vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e instanceof IREdge).map(e -> (IREdge) e) - .filter(e -> e.getPropertyValue(BroadcastVariableIdProperty.class).isPresent()) - .filter(e -> DataFlowProperty.Value.Push.equals(e.getPropertyValue(DataFlowProperty.class).get())) - .forEach(e -> { - throw new CompileTimeOptimizationException("DAG execution property check: " - + "Broadcast edge is not compatible with push" + e.getId()); - })); // DataSizeMetricCollection is not compatible with Push (All data have to be stored before the data collection) vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e instanceof IREdge).map(e -> (IREdge) e) .filter(e -> Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass) diff --git a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java index e676e6e8c9..1055f0bb9d 100644 --- a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java +++ b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java @@ -49,7 +49,7 @@ public boolean equals(final Object o) { @Override public String toString() { - return String.valueOf(timestamp); + return String.valueOf("Watermark(" + timestamp + ")"); } @Override diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java new file mode 100644 index 0000000000..9a18b0c9f8 --- /dev/null +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam; + +import org.apache.beam.runners.core.ReadyCheckingSideInputReader; +import org.apache.beam.sdk.transforms.ViewFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.nemo.common.Pair; +import org.apache.nemo.compiler.frontend.beam.transform.CreateViewTransform; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.*; + +/** + * Accumulates and provides side inputs in memory. + */ +public final class InMemorySideInputReader implements ReadyCheckingSideInputReader { + private static final Logger LOG = LoggerFactory.getLogger(InMemorySideInputReader.class.getName()); + + private long curWatermark = Long.MIN_VALUE; + + private final Collection> sideInputsToRead; + private final Map, BoundedWindow>, Object> inMemorySideInputs; + + public InMemorySideInputReader(final Collection> sideInputsToRead) { + this.sideInputsToRead = sideInputsToRead; + this.inMemorySideInputs = new HashMap<>(); + } + + @Override + public boolean isReady(final PCollectionView view, final BoundedWindow window) { + return window.maxTimestamp().getMillis() < curWatermark + || inMemorySideInputs.containsKey(Pair.of(view, window)); + } + + @Nullable + @Override + public T get(final PCollectionView view, final BoundedWindow window) { + // This gets called after isReady() + final T sideInputData = (T) inMemorySideInputs.get(Pair.of(view, window)); + return sideInputData == null + // The upstream gave us an empty sideInput + ? ((ViewFn) view.getViewFn()).apply(new CreateViewTransform.MultiView(Collections.emptyList())) + // The upstream gave us a concrete sideInput + : sideInputData; + } + + @Override + public boolean contains(final PCollectionView view) { + return sideInputsToRead.contains(view); + } + + @Override + public boolean isEmpty() { + return sideInputsToRead.isEmpty(); + } + + /** + * Stores the side input in memory to be used with main inputs. + * @param view of the side input. + * @param sideInputElement to add. + */ + public void addSideInputElement(final PCollectionView view, + final WindowedValue> sideInputElement) { + for (final BoundedWindow bw : sideInputElement.getWindows()) { + inMemorySideInputs.put(Pair.of(view, bw), sideInputElement.getValue().getSideInputValue()); + } + } + + /** + * Say a DoFn of this reader has 3 main inputs and 4 side inputs. + * {@link org.apache.nemo.runtime.executor.datatransfer.InputWatermarkManager} guarantees that the watermark here + * is the minimum of the all 7 input streams. + * @param newWatermark to set. + */ + public void setCurrentWatermarkOfAllMainAndSideInputs(final long newWatermark) { + if (curWatermark > newWatermark) { + // Cannot go backwards in time. + throw new IllegalStateException(curWatermark + " > " + newWatermark); + } + + this.curWatermark = newWatermark; + // TODO #282: Handle late data + inMemorySideInputs.entrySet().removeIf(entry -> { + return entry.getKey().right().maxTimestamp().getMillis() <= this.curWatermark; // Discard old sideinputs. + }); + } +} diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java index 722f421469..d54a7cdf0d 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java @@ -35,6 +35,7 @@ import org.apache.nemo.common.ir.vertex.transform.Transform; import org.apache.nemo.compiler.frontend.beam.coder.BeamDecoderFactory; import org.apache.nemo.compiler.frontend.beam.coder.BeamEncoderFactory; +import org.apache.nemo.compiler.frontend.beam.coder.SideInputCoder; import org.apache.nemo.compiler.frontend.beam.transform.*; import java.util.*; @@ -91,6 +92,45 @@ void addVertex(final IRVertex vertex) { builder.addVertex(vertex, loopVertexStack); } + /** + * Say the dstIRVertex consumes three views: view0, view1, and view2. + * + * We translate that as the following: + * view0 -> SideInputTransform(index=0) -> + * view1 -> SideInputTransform(index=1) -> dstIRVertex(with a map from indices to PCollectionViews) + * view2 -> SideInputTransform(index=2) -> + * + * @param dstVertex vertex. + * @param sideInputs of the vertex. + */ + void addSideInputEdges(final IRVertex dstVertex, final Map> sideInputs) { + for (final Map.Entry> entry : sideInputs.entrySet()) { + final int index = entry.getKey(); + final PCollectionView view = entry.getValue(); + + final IRVertex srcVertex = pValueToProducerVertex.get(view); + final IRVertex sideInputTransformVertex = new OperatorVertex(new SideInputTransform(index)); + addVertex(sideInputTransformVertex); + final Coder viewCoder = getCoderForView(view, this); + final Coder windowCoder = view.getPCollection().getWindowingStrategy().getWindowFn().windowCoder(); + + // First edge: view to transform + final IREdge firstEdge = + new IREdge(CommunicationPatternProperty.Value.OneToOne, srcVertex, sideInputTransformVertex); + addEdge(firstEdge, viewCoder, windowCoder); + + // Second edge: transform to the dstIRVertex + final IREdge secondEdge = + new IREdge(CommunicationPatternProperty.Value.OneToOne, sideInputTransformVertex, dstVertex); + final WindowedValue.FullWindowedValueCoder sideInputElementCoder = + WindowedValue.getFullCoder(SideInputCoder.of(viewCoder), windowCoder); + + secondEdge.setProperty(EncoderProperty.of(new BeamEncoderFactory(sideInputElementCoder))); + secondEdge.setProperty(DecoderProperty.of(new BeamDecoderFactory(sideInputElementCoder))); + builder.connectVertices(secondEdge); + } + } + /** * Add IR edge to the builder. * @@ -98,62 +138,38 @@ void addVertex(final IRVertex vertex) { * @param input the {@link PValue} {@code dst} consumes */ void addEdgeTo(final IRVertex dst, final PValue input) { - final Coder coder; if (input instanceof PCollection) { - coder = ((PCollection) input).getCoder(); - } else if (input instanceof PCollectionView) { - coder = getCoderForView((PCollectionView) input, this); - } else { - throw new RuntimeException(String.format("While adding an edge to %s, coder for PValue %s cannot " - + "be determined", dst, input)); - } - addEdgeTo(dst, input, coder); - } + final Coder elementCoder = ((PCollection) input).getCoder(); + final Coder windowCoder = ((PCollection) input).getWindowingStrategy().getWindowFn().windowCoder(); + final IRVertex src = pValueToProducerVertex.get(input); + if (src == null) { + throw new IllegalStateException(String.format("Cannot find a vertex that emits pValue %s", input)); + } - void addEdgeTo(final IRVertex dst, final PValue input, final Coder elementCoder) { - final IRVertex src = pValueToProducerVertex.get(input); - if (src == null) { - throw new IllegalStateException(String.format("Cannot find a vertex that emits pValue %s", input)); - } + final CommunicationPatternProperty.Value communicationPattern = getCommPattern(src, dst); + final IREdge edge = new IREdge(communicationPattern, src, dst); - final Coder windowCoder; - final CommunicationPatternProperty.Value communicationPattern = getCommPattern(src, dst); - final IREdge edge = new IREdge(communicationPattern, src, dst); + if (pValueToTag.containsKey(input)) { + edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId())); + } - if (pValueToTag.containsKey(input)) { - edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId())); - } - if (input instanceof PCollectionView) { - edge.setProperty(BroadcastVariableIdProperty.of((PCollectionView) input)); - } - if (input instanceof PCollection) { - windowCoder = ((PCollection) input).getWindowingStrategy().getWindowFn().windowCoder(); - } else if (input instanceof PCollectionView) { - windowCoder = ((PCollectionView) input).getPCollection() - .getWindowingStrategy().getWindowFn().windowCoder(); + addEdge(edge, elementCoder, windowCoder); } else { - throw new RuntimeException(String.format("While adding an edge from %s, to %s, coder for PValue %s cannot " - + "be determined", src, dst, input)); + throw new IllegalStateException(input.toString()); } - - addEdgeTo(edge, elementCoder, windowCoder); } - void addEdgeTo(final IREdge edge, - final Coder elementCoder, - final Coder windowCoder) { + void addEdge(final IREdge edge, final Coder elementCoder, final Coder windowCoder) { edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor())); - if (elementCoder instanceof KvCoder) { Coder keyCoder = ((KvCoder) elementCoder).getKeyCoder(); edge.setProperty(KeyEncoderProperty.of(new BeamEncoderFactory(keyCoder))); edge.setProperty(KeyDecoderProperty.of(new BeamDecoderFactory(keyCoder))); } - edge.setProperty(EncoderProperty.of( - new BeamEncoderFactory<>(WindowedValue.getFullCoder(elementCoder, windowCoder)))); - edge.setProperty(DecoderProperty.of( - new BeamDecoderFactory<>(WindowedValue.getFullCoder(elementCoder, windowCoder)))); + final WindowedValue.FullWindowedValueCoder coder = WindowedValue.getFullCoder(elementCoder, windowCoder); + edge.setProperty(EncoderProperty.of(new BeamEncoderFactory<>(coder))); + edge.setProperty(DecoderProperty.of(new BeamDecoderFactory<>(coder))); builder.connectVertices(edge); } diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java index 7a22ba8a99..a6ae6ceb89 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.nemo.common.ir.edge.IREdge; import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty; @@ -48,7 +49,9 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * A collection of translators for the Beam PTransforms. @@ -164,7 +167,7 @@ Pipeline.PipelineVisitor.CompositeBehavior translateComposite(final PipelineTran private static void unboundedReadTranslator(final PipelineTranslationContext ctx, final TransformHierarchy.Node beamNode, final Read.Unbounded transform) { - final IRVertex vertex = new BeamUnboundedSourceVertex<>(transform.getSource()); + final IRVertex vertex = new BeamUnboundedSourceVertex<>(transform.getSource(), DisplayData.from(transform)); ctx.addVertex(vertex); beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input)); beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output)); @@ -174,7 +177,7 @@ private static void unboundedReadTranslator(final PipelineTranslationContext ctx private static void boundedReadTranslator(final PipelineTranslationContext ctx, final TransformHierarchy.Node beamNode, final Read.Bounded transform) { - final IRVertex vertex = new BeamBoundedSourceVertex<>(transform.getSource()); + final IRVertex vertex = new BeamBoundedSourceVertex<>(transform.getSource(), DisplayData.from(transform)); ctx.addVertex(vertex); beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input)); beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output)); @@ -184,14 +187,15 @@ private static void boundedReadTranslator(final PipelineTranslationContext ctx, private static void parDoSingleOutputTranslator(final PipelineTranslationContext ctx, final TransformHierarchy.Node beamNode, final ParDo.SingleOutput transform) { - final DoFnTransform doFnTransform = createDoFnTransform(ctx, beamNode); + final Map> sideInputMap = getSideInputMap(transform.getSideInputs()); + final AbstractDoFnTransform doFnTransform = createDoFnTransform(ctx, beamNode, sideInputMap); final IRVertex vertex = new OperatorVertex(doFnTransform); ctx.addVertex(vertex); beamNode.getInputs().values().stream() .filter(input -> !transform.getAdditionalInputs().values().contains(input)) .forEach(input -> ctx.addEdgeTo(vertex, input)); - transform.getSideInputs().forEach(input -> ctx.addEdgeTo(vertex, input)); + ctx.addSideInputEdges(vertex, sideInputMap); beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output)); } @@ -199,13 +203,14 @@ private static void parDoSingleOutputTranslator(final PipelineTranslationContext private static void parDoMultiOutputTranslator(final PipelineTranslationContext ctx, final TransformHierarchy.Node beamNode, final ParDo.MultiOutput transform) { - final DoFnTransform doFnTransform = createDoFnTransform(ctx, beamNode); + final Map> sideInputMap = getSideInputMap(transform.getSideInputs()); + final AbstractDoFnTransform doFnTransform = createDoFnTransform(ctx, beamNode, sideInputMap); final IRVertex vertex = new OperatorVertex(doFnTransform); ctx.addVertex(vertex); beamNode.getInputs().values().stream() .filter(input -> !transform.getAdditionalInputs().values().contains(input)) .forEach(input -> ctx.addEdgeTo(vertex, input)); - transform.getSideInputs().forEach(input -> ctx.addEdgeTo(vertex, input)); + ctx.addSideInputEdges(vertex, sideInputMap); beamNode.getOutputs().entrySet().stream() .filter(pValueWithTupleTag -> pValueWithTupleTag.getKey().equals(transform.getMainOutputTag())) .forEach(pValueWithTupleTag -> ctx.registerMainOutputFrom(beamNode, vertex, pValueWithTupleTag.getValue())); @@ -237,7 +242,8 @@ private static void windowTranslator(final PipelineTranslationContext ctx, } else { throw new UnsupportedOperationException(String.format("%s is not supported", transform)); } - final IRVertex vertex = new OperatorVertex(new WindowFnTransform(windowFn)); + final IRVertex vertex = new OperatorVertex( + new WindowFnTransform(windowFn, DisplayData.from(beamNode.getTransform()))); ctx.addVertex(vertex); beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input)); beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output)); @@ -317,7 +323,7 @@ private static Pipeline.PipelineVisitor.CompositeBehavior combinePerKeyTranslato final IRVertex finalCombine = new OperatorVertex(new CombineFnFinalTransform<>(combineFn)); ctx.addVertex(finalCombine); final IREdge edge = new IREdge(CommunicationPatternProperty.Value.Shuffle, partialCombine, finalCombine); - ctx.addEdgeTo( + ctx.addEdge( edge, KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder), input.getWindowingStrategy().getWindowFn().windowCoder()); @@ -348,27 +354,45 @@ private static Pipeline.PipelineVisitor.CompositeBehavior loopTranslator( //////////////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////// HELPER METHODS - private static DoFnTransform createDoFnTransform(final PipelineTranslationContext ctx, - final TransformHierarchy.Node beamNode) { + private static Map> getSideInputMap(final List> viewList) { + return IntStream.range(0, viewList.size()).boxed().collect(Collectors.toMap(Function.identity(), viewList::get)); + } + + private static AbstractDoFnTransform createDoFnTransform(final PipelineTranslationContext ctx, + final TransformHierarchy.Node beamNode, + final Map> sideInputMap) { try { final AppliedPTransform pTransform = beamNode.toAppliedPTransform(ctx.getPipeline()); final DoFn doFn = ParDoTranslation.getDoFn(pTransform); final TupleTag mainOutputTag = ParDoTranslation.getMainOutputTag(pTransform); - final List> sideInputs = ParDoTranslation.getSideInputs(pTransform); final TupleTagList additionalOutputTags = ParDoTranslation.getAdditionalOutputTags(pTransform); final PCollection mainInput = (PCollection) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform)); - return new DoFnTransform( - doFn, - mainInput.getCoder(), - getOutputCoders(pTransform), - mainOutputTag, - additionalOutputTags.getAll(), - mainInput.getWindowingStrategy(), - sideInputs, - ctx.getPipelineOptions()); + if (sideInputMap.isEmpty()) { + return new DoFnTransform( + doFn, + mainInput.getCoder(), + getOutputCoders(pTransform), + mainOutputTag, + additionalOutputTags.getAll(), + mainInput.getWindowingStrategy(), + ctx.getPipelineOptions(), + DisplayData.from(beamNode.getTransform())); + } else { + return new PushBackDoFnTransform( + doFn, + mainInput.getCoder(), + getOutputCoders(pTransform), + mainOutputTag, + additionalOutputTags.getAll(), + mainInput.getWindowingStrategy(), + sideInputMap, + ctx.getPipelineOptions(), + DisplayData.from(beamNode.getTransform())); + + } } catch (final IOException e) { throw new RuntimeException(e); } @@ -404,11 +428,10 @@ private static Transform createGBKTransform( return new GroupByKeyAndWindowDoFnTransform( getOutputCoders(pTransform), mainOutputTag, - Collections.emptyList(), /* GBK does not have additional outputs */ mainInput.getWindowingStrategy(), - Collections.emptyList(), /* GBK does not have additional side inputs */ ctx.getPipelineOptions(), - SystemReduceFn.buffering(mainInput.getCoder())); + SystemReduceFn.buffering(mainInput.getCoder()), + DisplayData.from(beamNode.getTransform())); } } diff --git a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/BroadcastVariableIdProperty.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/SideInputElement.java similarity index 54% rename from common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/BroadcastVariableIdProperty.java rename to compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/SideInputElement.java index d7e8aa461f..22f8d72a46 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/BroadcastVariableIdProperty.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/SideInputElement.java @@ -16,31 +16,27 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.nemo.common.ir.edge.executionproperty; - -import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty; - -import java.io.Serializable; +package org.apache.nemo.compiler.frontend.beam; /** - * Edges with this property fetch a broadcast variable. + * {@link org.apache.nemo.compiler.frontend.beam.transform.DoFnTransform} treats elements of this type as side inputs. + * TODO #289: Prevent using SideInputElement in UDFs + * @param type of the side input value. */ -public final class BroadcastVariableIdProperty extends EdgeExecutionProperty { +public final class SideInputElement { + private final int sideInputIndex; + private final T sideInputValue; + + public SideInputElement(final int sideInputIndex, final T sideInputValue) { + this.sideInputIndex = sideInputIndex; + this.sideInputValue = sideInputValue; + } - /** - * Constructor. - * @param value id. - */ - private BroadcastVariableIdProperty(final Serializable value) { - super(value); + public int getSideInputIndex() { + return sideInputIndex; } - /** - * Static method exposing constructor. - * @param value id. - * @return the newly created execution property. - */ - public static BroadcastVariableIdProperty of(final Serializable value) { - return new BroadcastVariableIdProperty(value); + public T getSideInputValue() { + return sideInputValue; } } diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java new file mode 100644 index 0000000000..59a1792cd7 --- /dev/null +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam.coder; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.nemo.compiler.frontend.beam.SideInputElement; + +import java.io.*; + +/** + * EncoderFactory for side inputs. + * @param type of the side input value. + */ +public final class SideInputCoder extends AtomicCoder> { + private final Coder valueCoder; + + /** + * Private constructor. + */ + private SideInputCoder(final Coder valueCoder) { + this.valueCoder = valueCoder; + } + + /** + * @return a new coder + */ + public static SideInputCoder of(final Coder valueCoder) { + return new SideInputCoder<>(valueCoder); + } + + @Override + public void encode(final SideInputElement sideInputElement, final OutputStream outStream) throws IOException { + final DataOutputStream dataOutputStream = new DataOutputStream(outStream); + dataOutputStream.writeInt(sideInputElement.getSideInputIndex()); + valueCoder.encode(sideInputElement.getSideInputValue(), dataOutputStream); + } + + @Override + public SideInputElement decode(final InputStream inStream) throws IOException { + final DataInputStream dataInputStream = new DataInputStream(inStream); + final int index = dataInputStream.readInt(); + final T value = valueCoder.decode(inStream); + return new SideInputElement<>(index, value); + } +} diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java index bc672b7b93..30947fd3ea 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java @@ -19,6 +19,7 @@ package org.apache.nemo.compiler.frontend.beam.source; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.WindowedValue; import org.apache.nemo.common.ir.Readable; @@ -42,17 +43,18 @@ public final class BeamBoundedSourceVertex extends SourceVertex> { private static final Logger LOG = LoggerFactory.getLogger(BeamBoundedSourceVertex.class.getName()); private BoundedSource source; - private final String sourceDescription; + private final DisplayData displayData; /** * Constructor of BeamBoundedSourceVertex. * * @param source BoundedSource to read from. + * @param displayData data to display. */ - public BeamBoundedSourceVertex(final BoundedSource source) { + public BeamBoundedSourceVertex(final BoundedSource source, final DisplayData displayData) { super(); this.source = source; - this.sourceDescription = source.toString(); + this.displayData = displayData; } /** @@ -63,7 +65,7 @@ public BeamBoundedSourceVertex(final BoundedSource source) { public BeamBoundedSourceVertex(final BeamBoundedSourceVertex that) { super(that); this.source = that.source; - this.sourceDescription = that.source.toString(); + this.displayData = that.displayData; } @Override @@ -94,7 +96,7 @@ public void clearInternalStates() { @Override public ObjectNode getPropertiesAsJsonNode() { final ObjectNode node = getIRVertexPropertiesAsJsonNode(); - node.put("source", sourceDescription); + node.put("source", displayData.toString()); return node; } diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java index ad40d1b1f8..5942925b01 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.nemo.common.ir.Readable; @@ -44,22 +45,23 @@ public final class BeamUnboundedSourceVertex source; - private final String sourceDescription; + private final DisplayData displayData; /** * The default constructor for beam unbounded source. * @param source unbounded source. */ - public BeamUnboundedSourceVertex(final UnboundedSource source) { + public BeamUnboundedSourceVertex(final UnboundedSource source, + final DisplayData displayData) { super(); this.source = source; - this.sourceDescription = source.toString(); + this.displayData = displayData; } private BeamUnboundedSourceVertex(final BeamUnboundedSourceVertex that) { super(that); this.source = that.source; - this.sourceDescription = that.source.toString(); + this.displayData = that.displayData; } @Override @@ -88,7 +90,7 @@ public void clearInternalStates() { @Override public ObjectNode getPropertiesAsJsonNode() { final ObjectNode node = getIRVertexPropertiesAsJsonNode(); - node.put("source", sourceDescription); + node.put("source", displayData.toString()); return node; } diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java index dd5ca35be0..72113d6540 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.util.WindowedValue; @@ -31,11 +32,12 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.nemo.common.ir.OutputCollector; import org.apache.nemo.common.ir.vertex.transform.Transform; +import org.apache.nemo.compiler.frontend.beam.InMemorySideInputReader; import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -52,7 +54,7 @@ public abstract class AbstractDoFnTransform implements private final TupleTag mainOutputTag; private final List> additionalOutputTags; - private final Collection> sideInputs; + private final Map> sideInputs; private final WindowingStrategy windowingStrategy; private final DoFn doFn; private final SerializablePipelineOptions serializedOptions; @@ -61,9 +63,13 @@ public abstract class AbstractDoFnTransform implements private transient OutputCollector> outputCollector; private transient DoFnRunner doFnRunner; - private transient SideInputReader sideInputReader; + + // null when there is no side input. + private transient PushbackSideInputDoFnRunner pushBackRunner; + private transient DoFnInvoker doFnInvoker; private transient DoFnRunners.OutputManager outputManager; + private transient InMemorySideInputReader sideInputReader; // Variables for bundle. // We consider count and time millis for start/finish bundle. @@ -74,6 +80,7 @@ public abstract class AbstractDoFnTransform implements private long prevBundleStartTime; private long currBundleCount = 0; private boolean bundleFinished = true; + private final DisplayData displayData; /** * AbstractDoFnTransform constructor. @@ -85,6 +92,7 @@ public abstract class AbstractDoFnTransform implements * @param windowingStrategy windowing strategy * @param sideInputs side inputs * @param options pipeline options + * @param displayData display data. */ public AbstractDoFnTransform(final DoFn doFn, final Coder inputCoder, @@ -92,8 +100,9 @@ public AbstractDoFnTransform(final DoFn doFn, final TupleTag mainOutputTag, final List> additionalOutputTags, final WindowingStrategy windowingStrategy, - final Collection> sideInputs, - final PipelineOptions options) { + final Map> sideInputs, + final PipelineOptions options, + final DisplayData displayData) { this.doFn = doFn; this.inputCoder = inputCoder; this.outputCoders = outputCoders; @@ -102,28 +111,37 @@ public AbstractDoFnTransform(final DoFn doFn, this.sideInputs = sideInputs; this.serializedOptions = new SerializablePipelineOptions(options); this.windowingStrategy = windowingStrategy; + this.displayData = displayData; } - protected final DoFnRunners.OutputManager getOutputManager() { - return outputManager; + final Map> getSideInputs() { + return sideInputs; } - protected final WindowingStrategy getWindowingStrategy() { - return windowingStrategy; + final DoFnRunners.OutputManager getOutputManager() { + return outputManager; } - protected final SideInputReader getSideInputReader() { - return sideInputReader; + final WindowingStrategy getWindowingStrategy() { + return windowingStrategy; } - protected final TupleTag getMainOutputTag() { + final TupleTag getMainOutputTag() { return mainOutputTag; } - protected final DoFnRunner getDoFnRunner() { + final DoFnRunner getDoFnRunner() { return doFnRunner; } + final PushbackSideInputDoFnRunner getPushBackRunner() { + return pushBackRunner; + } + + final InMemorySideInputReader getSideInputReader() { + return sideInputReader; + } + public final DoFn getDoFn() { return doFn; } @@ -131,26 +149,51 @@ public final DoFn getDoFn() { /** * Checks whether the bundle is finished or not. * Starts the bundle if it is done. + * + * TODO #263: Partial Combining for Beam Streaming + * We may want to use separate methods for doFnRunner/pushBackRunner + * (same applies to the other bundle-related methods) */ - protected final void checkAndInvokeBundle() { + final void checkAndInvokeBundle() { if (bundleFinished) { bundleFinished = false; - doFnRunner.startBundle(); + if (pushBackRunner == null) { + doFnRunner.startBundle(); + } else { + pushBackRunner.startBundle(); + } prevBundleStartTime = System.currentTimeMillis(); currBundleCount = 0; } currBundleCount += 1; } - /** * Checks whether it is time to finish the bundle and finish it. */ - protected final void checkAndFinishBundle() { + final void checkAndFinishBundle() { if (!bundleFinished) { if (currBundleCount >= bundleSize || System.currentTimeMillis() - prevBundleStartTime >= bundleMillis) { bundleFinished = true; + if (pushBackRunner == null) { + doFnRunner.finishBundle(); + } else { + pushBackRunner.finishBundle(); + } + } + } + } + + /** + * Finish bundle without checking for conditions. + */ + final void forceFinishBundle() { + if (!bundleFinished) { + bundleFinished = true; + if (pushBackRunner == null) { doFnRunner.finishBundle(); + } else { + pushBackRunner.finishBundle(); } } } @@ -168,11 +211,7 @@ public final void prepare(final Context context, final OutputCollector(outputCollector, mainOutputTag); // create side input reader - if (!sideInputs.isEmpty()) { - sideInputReader = new BroadcastVariableSideInputReader(context, sideInputs); - } else { - sideInputReader = NullSideInputReader.of(sideInputs); - } + sideInputReader = new InMemorySideInputReader(new ArrayList<>(sideInputs.values())); // this transform does not support state and timer. final StepContext stepContext = new StepContext() { @@ -205,6 +244,10 @@ public TimerInternals timerInternals() { inputCoder, outputCoders, windowingStrategy); + + pushBackRunner = sideInputs.isEmpty() + ? null + : SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs.values(), sideInputReader); } public final OutputCollector> getOutputCollector() { @@ -214,12 +257,15 @@ public final OutputCollector> getOutputCollector() { @Override public final void close() { beforeClose(); - if (!bundleFinished) { - doFnRunner.finishBundle(); - } + forceFinishBundle(); doFnInvoker.invokeTeardown(); } + @Override + public final String toString() { + return this.getClass().getSimpleName() + " / " + displayData.toString().replace(":", " / "); + } + /** * An abstract function that wraps the original doFn. * @param originalDoFn the original doFn. @@ -234,9 +280,6 @@ public final void close() { */ abstract OutputCollector wrapOutputCollector(final OutputCollector oc); - @Override - public abstract void onData(final WindowedValue data); - /** * An abstract function that is called before close. */ diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java deleted file mode 100644 index 64460f9770..0000000000 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.nemo.compiler.frontend.beam.transform; - -import org.apache.beam.runners.core.SideInputReader; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.nemo.common.ir.vertex.transform.Transform; - -import javax.annotation.Nullable; -import java.util.Collection; - -/** - * A sideinput reader that reads/writes side input values to context. - */ -public final class BroadcastVariableSideInputReader implements SideInputReader { - - // Nemo context for storing/getting side inputs - private final Transform.Context context; - - // The list of side inputs that we're handling - private final Collection> sideInputs; - - BroadcastVariableSideInputReader(final Transform.Context context, - final Collection> sideInputs) { - this.context = context; - this.sideInputs = sideInputs; - } - - @Nullable - @Override - public T get(final PCollectionView view, final BoundedWindow window) { - // TODO #216: implement side input and windowing - return ((WindowedValue) context.getBroadcastVariable(view)).getValue(); - } - - @Override - public boolean contains(final PCollectionView view) { - return sideInputs.contains(view); - } - - @Override - public boolean isEmpty() { - return sideInputs.isEmpty(); - } -} diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java index 05e5af610d..03313c44e3 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java @@ -37,14 +37,12 @@ * @param input type * @param materialized output type */ -public final class CreateViewTransform implements - Transform>, WindowedValue> { - private OutputCollector> outputCollector; +public final class CreateViewTransform implements Transform>, WindowedValue> { private final ViewFn, O> viewFn; private final Map> windowListMap; - // TODO #259: we can remove this variable by implementing ReadyCheckingSideInputReader - private boolean isEmitted = false; + private OutputCollector> outputCollector; + private long currentOutputWatermark; /** @@ -75,7 +73,6 @@ public void onData(final WindowedValue> element) { @Override public void onWatermark(final Watermark inputWatermark) { - // If no data, just forwards the watermark if (windowListMap.size() == 0 && currentOutputWatermark < inputWatermark.getTimestamp()) { currentOutputWatermark = inputWatermark.getTimestamp(); @@ -90,11 +87,10 @@ public void onWatermark(final Watermark inputWatermark) { final Map.Entry> entry = iterator.next(); if (entry.getKey().maxTimestamp().getMillis() <= inputWatermark.getTimestamp()) { // emit the windowed data if the watermark timestamp > the window max boundary - final O view = viewFn.apply(new MultiView<>(entry.getValue())); + final O output = viewFn.apply(new MultiView<>(entry.getValue())); outputCollector.emit(WindowedValue.of( - view, entry.getKey().maxTimestamp(), entry.getKey(), PaneInfo.ON_TIME_AND_ONLY_FIRING)); + output, entry.getKey().maxTimestamp(), entry.getKey(), PaneInfo.ON_TIME_AND_ONLY_FIRING)); iterator.remove(); - isEmitted = true; minOutputTimestampOfEmittedWindows = Math.min(minOutputTimestampOfEmittedWindows, entry.getKey().maxTimestamp().getMillis()); @@ -112,20 +108,12 @@ public void onWatermark(final Watermark inputWatermark) { @Override public void close() { onWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); - - if (!isEmitted) { - // TODO #259: This is an ad-hoc code to resolve the view that has no data - // Currently, broadCastWorker reads the view data, but it throws exception if no data is available for a view. - // We should use watermark value to track whether the materialized data in a view is available or not. - final O view = viewFn.apply(new MultiView<>(Collections.emptyList())); - outputCollector.emit(WindowedValue.valueInGlobalWindow(view)); - } } @Override public String toString() { final StringBuilder sb = new StringBuilder(); - sb.append("CreateViewTransform:" + viewFn); + sb.append("CreateViewTransform " + viewFn.getClass().getName()); return sb.toString(); } @@ -133,13 +121,13 @@ public String toString() { * Represents {@code PrimitiveViewT} supplied to the {@link ViewFn}. * @param primitive view type */ - public final class MultiView implements Materializations.MultimapView, Serializable { + public static final class MultiView implements Materializations.MultimapView, Serializable { private final Iterable iterable; /** * Constructor. */ - MultiView(final Iterable iterable) { + public MultiView(final Iterable iterable) { // Create a placeholder for side input data. CreateViewTransform#onData stores data to this list. this.iterable = iterable; } diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java index 9f7a4e0f74..699a0dd7fd 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java @@ -21,8 +21,8 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.nemo.common.ir.OutputCollector; @@ -30,12 +30,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; /** - * DoFn transform implementation. + * DoFn transform implementation when there is no side input. * * @param input type. * @param output type. @@ -45,9 +45,6 @@ public final class DoFnTransform extends AbstractDoFnTransform< /** * DoFnTransform Constructor. - * - * @param doFn doFn. - * @param options Pipeline options. */ public DoFnTransform(final DoFn doFn, final Coder inputCoder, @@ -55,10 +52,10 @@ public DoFnTransform(final DoFn doFn, final TupleTag mainOutputTag, final List> additionalOutputTags, final WindowingStrategy windowingStrategy, - final Collection> sideInputs, - final PipelineOptions options) { + final PipelineOptions options, + final DisplayData displayData) { super(doFn, inputCoder, outputCoders, mainOutputTag, - additionalOutputTags, windowingStrategy, sideInputs, options); + additionalOutputTags, windowingStrategy, Collections.emptyMap(), options, displayData); } @Override @@ -68,6 +65,7 @@ protected DoFn wrapDoFn(final DoFn initDoFn) { @Override public void onData(final WindowedValue data) { + // Do not need any push-back logic. checkAndInvokeBundle(); getDoFnRunner().processElement(data); checkAndFinishBundle(); @@ -76,26 +74,16 @@ public void onData(final WindowedValue data) { @Override public void onWatermark(final Watermark watermark) { checkAndInvokeBundle(); - // TODO #216: We should consider push-back data that waits for side input - // TODO #216: If there are push-back data, input watermark >= output watermark getOutputCollector().emitWatermark(watermark); checkAndFinishBundle(); } @Override protected void beforeClose() { - // nothing } @Override OutputCollector wrapOutputCollector(final OutputCollector oc) { return oc; } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder(); - sb.append("DoTransform:" + getDoFn()); - return sb.toString(); - } } diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java index 84b6835b95..06dde58618 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java @@ -22,9 +22,9 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.sdk.values.KV; @@ -57,19 +57,19 @@ public final class GroupByKeyAndWindowDoFnTransform */ public GroupByKeyAndWindowDoFnTransform(final Map, Coder> outputCoders, final TupleTag>> mainOutputTag, - final List> additionalOutputTags, final WindowingStrategy windowingStrategy, - final Collection> sideInputs, final PipelineOptions options, - final SystemReduceFn reduceFn) { + final SystemReduceFn reduceFn, + final DisplayData displayData) { super(null, /* doFn */ null, /* inputCoder */ outputCoders, mainOutputTag, - additionalOutputTags, + Collections.emptyList(), /* GBK does not have additional outputs */ windowingStrategy, - sideInputs, - options); + Collections.emptyMap(), /* GBK does not have additional side inputs */ + options, + displayData); this.keyToValues = new HashMap<>(); this.reduceFn = reduceFn; this.prevOutputWatermark = new Watermark(Long.MIN_VALUE); @@ -93,7 +93,7 @@ protected DoFn wrapDoFn(final DoFn doFn) { getWindowingStrategy(), inMemoryStateInternalsFactory, inMemoryTimerInternalsFactory, - getSideInputReader(), + null, // GBK has no sideinput. reduceFn, getOutputManager(), getMainOutputTag()); @@ -163,23 +163,19 @@ private void processElementsAndTriggerTimers(final Watermark inputWatermark, * @param inputWatermark input watermark */ private void emitOutputWatermark(final Watermark inputWatermark) { - - if (keyAndWatermarkHoldMap.isEmpty()) { - return; - } - // Find min watermark hold - final Watermark minWatermarkHold = Collections.min(keyAndWatermarkHoldMap.values()); + final Watermark minWatermarkHold = keyAndWatermarkHoldMap.isEmpty() + ? new Watermark(Long.MAX_VALUE) // set this to MAX, in order to just use the input watermark. + : Collections.min(keyAndWatermarkHoldMap.values()); + final Watermark outputWatermarkCandidate = new Watermark( + Math.max(prevOutputWatermark.getTimestamp(), + Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp()))); if (LOG.isDebugEnabled()) { LOG.debug("Watermark hold: {}, " + "inputWatermark: {}, outputWatermark: {}", minWatermarkHold, inputWatermark, prevOutputWatermark); } - final Watermark outputWatermarkCandidate = new Watermark( - Math.max(prevOutputWatermark.getTimestamp(), - Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp()))); - if (outputWatermarkCandidate.getTimestamp() > prevOutputWatermark.getTimestamp()) { // progress! prevOutputWatermark = outputWatermarkCandidate; @@ -211,8 +207,6 @@ protected void beforeClose() { // Finish any pending windows by advancing the input watermark to infinity. processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()), BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE); - // Emit watermark to downstream operators - emitOutputWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); } /** @@ -258,13 +252,6 @@ private void triggerTimers(final K key, } } - @Override - public String toString() { - final StringBuilder sb = new StringBuilder(); - sb.append("GroupByKeyAndWindowDoFnTransform:"); - return sb.toString(); - } - /** * Get timer data. */ diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java index 0f4cf5b647..71c68eab38 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java @@ -69,12 +69,4 @@ public void close() { } } } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder(); - sb.append("GroupByKeyTransform:"); - sb.append(super.toString()); - return sb.toString(); - } } diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java new file mode 100644 index 0000000000..d8f0d8ff49 --- /dev/null +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam.transform; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.nemo.common.ir.OutputCollector; +import org.apache.nemo.common.punctuation.Watermark; +import org.apache.nemo.compiler.frontend.beam.SideInputElement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * DoFn transform implementation with push backs for side inputs. + * + * @param input type. + * @param output type. + */ +public final class PushBackDoFnTransform extends AbstractDoFnTransform { + private static final Logger LOG = LoggerFactory.getLogger(PushBackDoFnTransform.class.getName()); + + private List> curPushedBacks; + private long curPushedBackWatermark; // Long.MAX_VALUE when no pushed-back exists. + private long curInputWatermark; + private long curOutputWatermark; + + /** + * PushBackDoFnTransform Constructor. + */ + public PushBackDoFnTransform(final DoFn doFn, + final Coder inputCoder, + final Map, Coder> outputCoders, + final TupleTag mainOutputTag, + final List> additionalOutputTags, + final WindowingStrategy windowingStrategy, + final Map> sideInputs, + final PipelineOptions options, + final DisplayData displayData) { + super(doFn, inputCoder, outputCoders, mainOutputTag, + additionalOutputTags, windowingStrategy, sideInputs, options, displayData); + this.curPushedBacks = new ArrayList<>(); + this.curPushedBackWatermark = Long.MAX_VALUE; + this.curInputWatermark = Long.MIN_VALUE; + this.curOutputWatermark = Long.MIN_VALUE; + } + + @Override + protected DoFn wrapDoFn(final DoFn initDoFn) { + return initDoFn; + } + + @Override + public void onData(final WindowedValue data) { + // Need to distinguish side/main inputs and push-back main inputs. + if (data.getValue() instanceof SideInputElement) { + // This element is a Side Input + // TODO #287: Consider Explicit Multi-Input IR Transform + final WindowedValue sideInputElement = (WindowedValue) data; + final PCollectionView view = getSideInputs().get(sideInputElement.getValue().getSideInputIndex()); + getSideInputReader().addSideInputElement(view, data); + + handlePushBacks(); + + // See if we can emit a new watermark, as we may have processed some pushed-back elements + onWatermark(new Watermark(curInputWatermark)); + } else { + // This element is the Main Input + checkAndInvokeBundle(); + final Iterable> pushedBack = + getPushBackRunner().processElementInReadyWindows(data); + for (final WindowedValue wv : pushedBack) { + curPushedBackWatermark = Math.min(curPushedBackWatermark, wv.getTimestamp().getMillis()); + curPushedBacks.add(wv); + } + checkAndFinishBundle(); + } + } + + private void handlePushBacks() { + // Force-finish, before (possibly) processing pushed-back data. + // + // Main reason: + // {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner} + // caches for each bundle the side inputs that are not ready. + // We need to re-start the bundle to advertise the (possibly) newly available side input. + forceFinishBundle(); // forced + + // With the new side input added, we may be able to process some pushed-back elements. + final List> pushedBackAgain = new ArrayList<>(); + long pushedBackAgainWatermark = Long.MAX_VALUE; + for (final WindowedValue curPushedBack : curPushedBacks) { + checkAndInvokeBundle(); + final Iterable> pushedBack = + getPushBackRunner().processElementInReadyWindows(curPushedBack); + checkAndFinishBundle(); + for (final WindowedValue wv : pushedBack) { + pushedBackAgainWatermark = Math.min(pushedBackAgainWatermark, wv.getTimestamp().getMillis()); + pushedBackAgain.add(wv); + } + } + curPushedBacks = pushedBackAgain; + curPushedBackWatermark = pushedBackAgainWatermark; + } + + @Override + public void onWatermark(final Watermark watermark) { + // TODO #298: Consider Processing DoFn PushBacks on Watermark + checkAndInvokeBundle(); + curInputWatermark = watermark.getTimestamp(); + getSideInputReader().setCurrentWatermarkOfAllMainAndSideInputs(curInputWatermark); + + final long outputWatermarkCandidate = Math.min(curInputWatermark, curPushedBackWatermark); + if (outputWatermarkCandidate > curOutputWatermark) { + // Watermark advances! + getOutputCollector().emitWatermark(new Watermark(outputWatermarkCandidate)); + curOutputWatermark = outputWatermarkCandidate; + } + checkAndFinishBundle(); + } + + @Override + protected void beforeClose() { + // This makes all unavailable side inputs as available empty side inputs. + onWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); + // All push-backs should be processed here. + handlePushBacks(); + } + + @Override + OutputCollector wrapOutputCollector(final OutputCollector oc) { + return oc; + } +} diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/SideInputTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/SideInputTransform.java new file mode 100644 index 0000000000..b1536e6774 --- /dev/null +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/SideInputTransform.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam.transform; + +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.nemo.common.ir.OutputCollector; +import org.apache.nemo.common.ir.vertex.transform.Transform; +import org.apache.nemo.common.punctuation.Watermark; +import org.apache.nemo.compiler.frontend.beam.SideInputElement; + +/** + * Side input transform implementation. + * TODO #297: Consider Removing SideInputTransform + * @param input/output type. + */ +public final class SideInputTransform implements Transform, WindowedValue>> { + private OutputCollector>> outputCollector; + private final int index; + + /** + * Constructor. + */ + public SideInputTransform(final int index) { + this.index = index; + } + + @Override + public void prepare(final Context context, final OutputCollector>> oc) { + this.outputCollector = oc; + } + + @Override + public void onData(final WindowedValue element) { + outputCollector.emit(element.withValue(new SideInputElement<>(index, element.getValue()))); + } + + @Override + public void onWatermark(final Watermark watermark) { + outputCollector.emitWatermark(watermark); + } + + @Override + public void close() { + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(); + sb.append("SideInputTransform:"); + sb.append("(index-"); + sb.append(String.valueOf(index)); + sb.append(")"); + sb.append(super.toString()); + return sb.toString(); + } +} diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java index a4346182ec..f8f5c9fb6f 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java @@ -19,6 +19,7 @@ package org.apache.nemo.compiler.frontend.beam.transform; import com.google.common.collect.Iterables; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -40,14 +41,16 @@ public final class WindowFnTransform implements Transform, WindowedValue> { private final WindowFn windowFn; + private final DisplayData displayData; private OutputCollector> outputCollector; /** * Default Constructor. * @param windowFn windowFn for the Transform. */ - public WindowFnTransform(final WindowFn windowFn) { + public WindowFnTransform(final WindowFn windowFn, final DisplayData displayData) { this.windowFn = windowFn; + this.displayData = displayData; } @Override @@ -101,7 +104,7 @@ public void close() { @Override public String toString() { final StringBuilder sb = new StringBuilder(); - sb.append("WindowFnTransform:" + windowFn); + sb.append("WindowFnTransform / " + displayData.toString().replaceAll(":", " / ")); return sb.toString(); } } diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java index be3f0083c2..278c83c8ca 100644 --- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java +++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java @@ -41,7 +41,7 @@ public void testALSDAG() throws Exception { final DAG producedDAG = CompilerTestUtil.compileALSDAG(); assertEquals(producedDAG.getTopologicalSort(), producedDAG.getTopologicalSort()); - assertEquals(38, producedDAG.getVertices().size()); + assertEquals(44, producedDAG.getVertices().size()); // producedDAG.getTopologicalSort().forEach(v -> System.out.println(v.getId())); final IRVertex vertexX = producedDAG.getTopologicalSort().get(5); diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java index 632c30aff0..c6f100d4e6 100644 --- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java +++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java @@ -41,7 +41,7 @@ public void testMLRDAG() throws Exception { final DAG producedDAG = CompilerTestUtil.compileMLRDAG(); assertEquals(producedDAG.getTopologicalSort(), producedDAG.getTopologicalSort()); - assertEquals(36, producedDAG.getVertices().size()); + assertEquals(39, producedDAG.getVertices().size()); final IRVertex vertexX = producedDAG.getTopologicalSort().get(5); assertEquals(1, producedDAG.getIncomingEdgesOf(vertexX).size()); @@ -51,6 +51,6 @@ public void testMLRDAG() throws Exception { final IRVertex vertexY = producedDAG.getTopologicalSort().get(13); assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY).size()); assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY.getId()).size()); - assertEquals(2, producedDAG.getOutgoingEdgesOf(vertexY).size()); + assertEquals(1, producedDAG.getOutgoingEdgesOf(vertexY).size()); } } diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java index 762e327fd0..702ca9d6ee 100644 --- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java +++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.transforms.Materialization; import org.apache.beam.sdk.transforms.Materializations; import org.apache.beam.sdk.transforms.ViewFn; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -54,6 +55,7 @@ public final class CreateViewTransformTest { public void test() { final FixedWindows fixedwindows = FixedWindows.of(Duration.standardSeconds(1)); + final CreateViewTransform viewTransform = new CreateViewTransform(new SumViewFn()); diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java index bad7584855..fa1169c3f8 100644 --- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java +++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java @@ -20,14 +20,14 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -36,17 +36,17 @@ import org.apache.nemo.common.ir.vertex.transform.Transform; import org.apache.nemo.common.punctuation.Watermark; import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions; +import org.apache.nemo.compiler.frontend.beam.SideInputElement; import org.apache.reef.io.Tuple; import org.junit.Before; import org.junit.Test; import java.util.*; -import static java.util.Collections.emptyList; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public final class DoFnTransformTest { @@ -78,8 +78,8 @@ public void testSingleOutput() { outputTag, Collections.emptyList(), WindowingStrategy.globalDefault(), - emptyList(), /* side inputs */ - PipelineOptionsFactory.as(NemoPipelineOptions.class)); + PipelineOptionsFactory.as(NemoPipelineOptions.class), + DisplayData.none()); final Transform.Context context = mock(Transform.Context.class); final OutputCollector> oc = new TestOutputCollector<>(); @@ -112,8 +112,8 @@ public void testCountBundle() { outputTag, Collections.emptyList(), WindowingStrategy.globalDefault(), - emptyList(), /* side inputs */ - pipelineOptions); + pipelineOptions, + DisplayData.none()); final Transform.Context context = mock(Transform.Context.class); final OutputCollector> oc = new TestOutputCollector<>(); @@ -156,8 +156,8 @@ public void testTimeBundle() { outputTag, Collections.emptyList(), WindowingStrategy.globalDefault(), - emptyList(), /* side inputs */ - pipelineOptions); + pipelineOptions, + DisplayData.none()); final Transform.Context context = mock(Transform.Context.class); final OutputCollector> oc = new TestOutputCollector<>(); @@ -208,8 +208,8 @@ public void testMultiOutputOutput() { mainOutput, tags, WindowingStrategy.globalDefault(), - emptyList(), /* side inputs */ - PipelineOptionsFactory.as(NemoPipelineOptions.class)); + PipelineOptionsFactory.as(NemoPipelineOptions.class), + DisplayData.none()); // mock context final Transform.Context context = mock(Transform.Context.class); @@ -244,48 +244,70 @@ public void testMultiOutputOutput() { doFnTransform.close(); } - // TODO #216: implement side input and windowing @Test public void testSideInputs() { // mock context final Transform.Context context = mock(Transform.Context.class); - when(context.getBroadcastVariable(view1)).thenReturn( - WindowedValue.valueInGlobalWindow(ImmutableList.of("1"))); - when(context.getBroadcastVariable(view2)).thenReturn( - WindowedValue.valueInGlobalWindow(ImmutableList.of("2"))); - TupleTag>> outputTag = new TupleTag<>("main-output"); - WindowedValue first = WindowedValue.valueInGlobalWindow("first"); - WindowedValue second = WindowedValue.valueInGlobalWindow("second"); + WindowedValue firstElement = WindowedValue.valueInGlobalWindow("first"); + WindowedValue secondElement = WindowedValue.valueInGlobalWindow("second"); - final Map>> eventAndViewMap = - ImmutableMap.of(first.getValue(), view1, second.getValue(), view2); + SideInputElement firstSideinput = new SideInputElement<>(0, ImmutableList.of("1")); + SideInputElement secondSideinput = new SideInputElement(1, ImmutableList.of("2")); - final DoFnTransform>> doFnTransform = - new DoFnTransform<>( - new SimpleSideInputDoFn<>(eventAndViewMap), + final Map> sideInputMap = new HashMap<>(); + sideInputMap.put(firstSideinput.getSideInputIndex(), view1); + sideInputMap.put(secondSideinput.getSideInputIndex(), view2); + final PushBackDoFnTransform doFnTransform = + new PushBackDoFnTransform( + new SimpleSideInputDoFn(view1, view2), NULL_INPUT_CODER, NULL_OUTPUT_CODERS, outputTag, Collections.emptyList(), WindowingStrategy.globalDefault(), - ImmutableList.of(view1, view2), /* side inputs */ - PipelineOptionsFactory.as(NemoPipelineOptions.class)); + sideInputMap, /* side inputs */ + PipelineOptionsFactory.as(NemoPipelineOptions.class), + DisplayData.none()); - final OutputCollector>>> oc = new TestOutputCollector<>(); + final TestOutputCollector oc = new TestOutputCollector<>(); doFnTransform.prepare(context, oc); - doFnTransform.onData(first); - doFnTransform.onData(second); - - assertEquals(WindowedValue.valueInGlobalWindow(new Tuple<>("first", ImmutableList.of("1"))), - ((TestOutputCollector>>) oc).getOutput().get(0)); - - assertEquals(WindowedValue.valueInGlobalWindow(new Tuple<>("second", ImmutableList.of("2"))), - ((TestOutputCollector>>) oc).getOutput().get(1)); - + // Main input first, Side inputs later + doFnTransform.onData(firstElement); + + doFnTransform.onData(WindowedValue.valueInGlobalWindow(firstSideinput)); + doFnTransform.onData(WindowedValue.valueInGlobalWindow(secondSideinput)); + assertEquals( + WindowedValue.valueInGlobalWindow( + concat(firstElement.getValue(), firstSideinput.getSideInputValue(), secondSideinput.getSideInputValue())), + oc.getOutput().get(0)); + + // Side inputs first, Main input later + doFnTransform.onData(secondElement); + assertEquals( + WindowedValue.valueInGlobalWindow( + concat(secondElement.getValue(), firstSideinput.getSideInputValue(), secondSideinput.getSideInputValue())), + oc.getOutput().get(1)); + + // There should be only 2 final outputs + assertEquals(2, oc.getOutput().size()); + + // The side inputs should be "READY" + assertTrue(doFnTransform.getSideInputReader().isReady(view1, GlobalWindow.INSTANCE)); + assertTrue(doFnTransform.getSideInputReader().isReady(view2, GlobalWindow.INSTANCE)); + + // This watermark should remove the side inputs. (Now should be "NOT READY") + doFnTransform.onWatermark(new Watermark(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis())); + Iterable materializedSideInput1 = doFnTransform.getSideInputReader().get(view1, GlobalWindow.INSTANCE); + Iterable materializedSideInput2 = doFnTransform.getSideInputReader().get(view2, GlobalWindow.INSTANCE); + assertFalse(materializedSideInput1.iterator().hasNext()); + assertFalse(materializedSideInput2.iterator().hasNext()); + + // There should be only 2 final outputs doFnTransform.close(); + assertEquals(2, oc.getOutput().size()); } @@ -334,21 +356,30 @@ public void processElement(final ProcessContext c) throws Exception { * Side input do fn. * @param type */ - private static class SimpleSideInputDoFn extends DoFn> { - private final Map> idAndViewMap; - - public SimpleSideInputDoFn(final Map> idAndViewMap) { - this.idAndViewMap = idAndViewMap; + private static class SimpleSideInputDoFn extends DoFn { + private final PCollectionView view1; + private final PCollectionView view2; + + public SimpleSideInputDoFn(final PCollectionView view1, + final PCollectionView view2) { + this.view1 = view1; + this.view2 = view2; } @ProcessElement public void processElement(final ProcessContext c) throws Exception { - final PCollectionView view = idAndViewMap.get(c.element()); - final V sideInput = c.sideInput(view); - c.output(new Tuple<>(c.element(), sideInput)); + final T element = c.element(); + final Object view1Value = c.sideInput(view1); + final Object view2Value = c.sideInput(view2); + + c.output(concat(element, view1Value, view2Value)); } } + private static String concat(final Object obj1, final Object obj2, final Object obj3) { + return obj1.toString() + " / " + obj2 + " / " + obj3; + } + /** * Multi output do fn. diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java index f9a44ec968..474c79c0d7 100644 --- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java +++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java @@ -21,6 +21,7 @@ import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.*; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -89,11 +90,10 @@ public void test() { new GroupByKeyAndWindowDoFnTransform( NULL_OUTPUT_CODERS, outputTag, - Collections.emptyList(), /* additional outputs */ WindowingStrategy.of(slidingWindows), - emptyList(), /* side inputs */ PipelineOptionsFactory.as(NemoPipelineOptions.class), - SystemReduceFn.buffering(NULL_INPUT_CODER)); + SystemReduceFn.buffering(NULL_INPUT_CODER), + DisplayData.none()); final Instant ts1 = new Instant(1); final Instant ts2 = new Instant(100); @@ -167,10 +167,17 @@ public void test() { doFnTransform.onData(WindowedValue.of( KV.of("1", "a"), ts4, slidingWindows.assignWindows(ts4), PaneInfo.NO_FIRING)); - // do not emit anything + doFnTransform.onWatermark(watermark2); - assertEquals(0, oc.outputs.size()); - assertEquals(0, oc.watermarks.size()); + + assertEquals(0, oc.outputs.size()); // do not emit anything + assertEquals(1, oc.watermarks.size()); + + // check output watermark + assertEquals(1400, + oc.watermarks.get(0).getTimestamp()); + + oc.watermarks.clear(); doFnTransform.onData(WindowedValue.of( KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5), PaneInfo.NO_FIRING)); diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java index bdc2a859d5..e2f2c0ab2d 100644 --- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java +++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java @@ -63,7 +63,7 @@ public void setUp() throws Exception { final LoopVertex alsLoop = alsLoopOpt.get(); final IRVertex vertex7 = groupedDAG.getTopologicalSort().get(3); - final IRVertex vertex15 = alsLoop.getDAG().getTopologicalSort().get(4); + final IRVertex vertex15 = alsLoop.getDAG().getTopologicalSort().get(5); final Set oldDAGIncomingEdges = alsLoop.getDagIncomingEdges().get(vertex15); final List newDAGIncomingEdge = groupedDAG.getIncomingEdgesOf(vertex7); diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java index 84da0ddde8..76bdc03aef 100644 --- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java +++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java @@ -21,8 +21,6 @@ import com.github.fommil.netlib.BLAS; import com.github.fommil.netlib.LAPACK; import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner; -import org.apache.nemo.compiler.frontend.beam.coder.FloatArrayCoder; -import org.apache.nemo.compiler.frontend.beam.coder.IntArrayCoder; import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CoderProviders; diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java index e431019730..ab1760fb53 100644 --- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java +++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java @@ -18,8 +18,6 @@ */ package org.apache.nemo.examples.beam; -import org.apache.nemo.compiler.frontend.beam.coder.FloatArrayCoder; -import org.apache.nemo.compiler.frontend.beam.coder.IntArrayCoder; import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform; import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner; import org.apache.beam.sdk.Pipeline; diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/FloatArrayCoder.java similarity index 97% rename from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java rename to examples/beam/src/main/java/org/apache/nemo/examples/beam/FloatArrayCoder.java index dff48eed82..104e994495 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java +++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/FloatArrayCoder.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.nemo.compiler.frontend.beam.coder; +package org.apache.nemo.examples.beam; import org.apache.beam.sdk.coders.AtomicCoder; diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/IntArrayCoder.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/IntArrayCoder.java similarity index 97% rename from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/IntArrayCoder.java rename to examples/beam/src/main/java/org/apache/nemo/examples/beam/IntArrayCoder.java index ac9205b4c7..f7201378c7 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/IntArrayCoder.java +++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/IntArrayCoder.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.nemo.compiler.frontend.beam.coder; +package org.apache.nemo.examples.beam; import org.apache.beam.sdk.coders.AtomicCoder; diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java new file mode 100644 index 0000000000..30ee405cd0 --- /dev/null +++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.examples.beam; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.*; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions; +import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner; +import org.joda.time.Duration; +import org.joda.time.Instant; + +import java.util.List; + +/** + * A Windowed WordCount application. + */ +public final class WindowedBroadcast { + /** + * Private Constructor. + */ + private WindowedBroadcast() { + } + + private static PCollection getSource(final Pipeline p) { + return p.apply(GenerateSequence + .from(1) + .withRate(2, Duration.standardSeconds(1)) + .withTimestampFn(num -> new Instant(num * 500))); // 0.5 second between subsequent elements + } + /** + * Main function for the MR BEAM program. + * @param args arguments. + */ + public static void main(final String[] args) { + final String outputFilePath = args[0]; + + final Window windowFn = Window + .into(SlidingWindows.of(Duration.standardSeconds(2)) + .every(Duration.standardSeconds(1))); + + final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class); + options.setRunner(NemoPipelineRunner.class); + options.setJobName("WindowedBroadcast"); + + final Pipeline p = Pipeline.create(options); + + final PCollection windowedElements = getSource(p).apply(windowFn); + final PCollectionView> windowedView = windowedElements.apply(View.asList()); + + windowedElements.apply(ParDo.of(new DoFn() { + @ProcessElement + public void processElement(final ProcessContext c) { + final Long anElementInTheWindow = c.element(); + final List allElementsInTheWindow = c.sideInput(windowedView); + System.out.println(anElementInTheWindow + " / " + allElementsInTheWindow); + if (!allElementsInTheWindow.contains(anElementInTheWindow)) { + throw new RuntimeException(anElementInTheWindow + " not in " + allElementsInTheWindow.toString()); + } else { + c.output(anElementInTheWindow + " is in " + allElementsInTheWindow); + } + } + }).withSideInputs(windowedView) + ).apply(new WriteOneFilePerWindow(outputFilePath, 1)); + + p.run(); + } +} diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java index d7f8c85c8b..0f13dc4653 100644 --- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java +++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java @@ -78,7 +78,7 @@ public KV apply(final String line) { return p.apply(GenerateSequence .from(1) .withRate(2, Duration.standardSeconds(1)) - .withTimestampFn(num -> new Instant(num * 500))) + .withTimestampFn(num -> new Instant(num * 500))) // 0.5 second between subsequent elements .apply(MapElements.via(new SimpleFunction>() { @Override public KV apply(final Long val) { diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedBroadcastITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedBroadcastITCase.java new file mode 100644 index 0000000000..5e2fba38f0 --- /dev/null +++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedBroadcastITCase.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.examples.beam; + +import org.apache.nemo.client.JobLauncher; +import org.apache.nemo.common.test.ArgBuilder; +import org.apache.nemo.common.test.ExampleTestUtil; +import org.apache.nemo.examples.beam.policy.StreamingPolicyParallelismFive; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +/** + * Test Windowed word count program with JobLauncher. + * TODO #291: ITCase for Empty PCollectionViews + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(JobLauncher.class) +public final class WindowedBroadcastITCase { + + private static final int TIMEOUT = 120000; + private static ArgBuilder builder; + private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/"; + + private static final String outputFileName = "test_output_windowed_broadcast"; + private static final String expectedOutputFileName = "expected_output_windowed_broadcast"; + private static final String expectedSlidingWindowOutputFileName = "expected_output_sliding_windowed_broadcast"; + private static final String executorResourceFileName = fileBasePath + "beam_test_executor_resources.json"; + private static final String outputFilePath = fileBasePath + outputFileName; + + // TODO #271: We currently disable this test because we cannot force close Nemo + // @Test (timeout = TIMEOUT) + public void testUnboundedSlidingWindow() throws Exception { + builder = new ArgBuilder() + .addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler") + .addUserMain(WindowedBroadcast.class.getCanonicalName()) + .addUserArgs(outputFilePath); + + JobLauncher.main(builder + .addResourceJson(executorResourceFileName) + .addJobId(WindowedBroadcastITCase.class.getSimpleName()) + .addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName()) + .build()); + + try { + ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedSlidingWindowOutputFileName); + } finally { + } + } +} diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java index c0134aa332..7ef0e0281b 100644 --- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java +++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java @@ -33,6 +33,7 @@ /** * Test Windowed word count program with JobLauncher. + * TODO #299: WindowedWordCountITCase Hangs (Heisenbug) */ @RunWith(PowerMockRunner.class) @PrepareForTest(JobLauncher.class) @@ -58,7 +59,7 @@ public void testBatchFixedWindow() throws Exception { JobLauncher.main(builder .addResourceJson(executorResourceFileName) - .addJobId(WindowedWordCountITCase.class.getSimpleName()) + .addJobId(WindowedWordCountITCase.class.getSimpleName() + "testBatchFixedWindow") .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName()) .build()); @@ -78,7 +79,7 @@ public void testBatchSlidingWindow() throws Exception { JobLauncher.main(builder .addResourceJson(executorResourceFileName) - .addJobId(WindowedWordCountITCase.class.getSimpleName()) + .addJobId(WindowedWordCountITCase.class.getSimpleName() + "testBatchSlidingWindow") .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName()) .build()); @@ -98,7 +99,7 @@ public void testStreamingSchedulerAndPipeFixedWindow() throws Exception { JobLauncher.main(builder .addResourceJson(executorResourceFileName) - .addJobId(WindowedWordCountITCase.class.getSimpleName()) + .addJobId(WindowedWordCountITCase.class.getSimpleName() + "testStreamingSchedulerAndPipeFixedWindow") .addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName()) .build()); @@ -119,7 +120,7 @@ public void testStreamingSchedulerAndPipeSlidingWindow() throws Exception { JobLauncher.main(builder .addResourceJson(executorResourceFileName) - .addJobId(WindowedWordCountITCase.class.getSimpleName()) + .addJobId(WindowedWordCountITCase.class.getSimpleName() + "testStreamingSchedulerAndPipeSlidingWindow") .addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName()) .build()); @@ -132,7 +133,7 @@ public void testStreamingSchedulerAndPipeSlidingWindow() throws Exception { // TODO #271: We currently disable this test because we cannot force close Nemo - //@Test (timeout = TIMEOUT) + // @Test (timeout = TIMEOUT) public void testUnboundedSlidingWindow() throws Exception { builder = new ArgBuilder() .addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler") diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java index f7aa184a80..f972ce067b 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java @@ -120,7 +120,7 @@ private synchronized void onTaskReceived(final Task task) { * @param task to launch. */ private void launchTask(final Task task) { - LOG.info("Launch task: {}", task); + LOG.info("Launch task: {}", task.getTaskId()); try { final DAG> irDag = SerializationUtils.deserialize(task.getSerializedIRDag()); diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java index 17a62c5cdb..42806b7de0 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java @@ -27,7 +27,6 @@ import org.apache.nemo.runtime.common.comm.ControlMessage; import org.apache.nemo.runtime.common.message.MessageEnvironment; import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap; -import org.apache.nemo.runtime.executor.datatransfer.InputReader; import net.jcip.annotations.ThreadSafe; import org.apache.commons.lang.SerializationUtils; import org.apache.reef.tang.annotations.Parameter; @@ -36,9 +35,7 @@ import javax.inject.Inject; import java.io.Serializable; -import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -50,7 +47,6 @@ public final class BroadcastManagerWorker { private static final Logger LOG = LoggerFactory.getLogger(BroadcastManagerWorker.class.getName()); private static BroadcastManagerWorker staticReference; - private final ConcurrentHashMap idToReader; private final LoadingCache idToVariableCache; /** @@ -63,65 +59,33 @@ public final class BroadcastManagerWorker { */ @Inject private BroadcastManagerWorker(@Parameter(JobConf.ExecutorId.class) final String executorId, - final PersistentConnectionToMasterMap toMaster) { + final PersistentConnectionToMasterMap toMaster) { staticReference = this; - this.idToReader = new ConcurrentHashMap<>(); this.idToVariableCache = CacheBuilder.newBuilder() .maximumSize(100) .expireAfterWrite(10, TimeUnit.MINUTES) .build( new CacheLoader() { public Object load(final Serializable id) throws Exception { - LOG.info("Start to load broadcast {}", id.toString()); - if (idToReader.containsKey(id)) { - // Get from reader - final InputReader inputReader = idToReader.get(id); - final List> iterators = inputReader.read(); - if (iterators.size() != 1) { - throw new IllegalStateException(id.toString()); - } - final DataUtil.IteratorWithNumBytes iterator = iterators.get(0).get(); - if (!iterator.hasNext()) { - throw new IllegalStateException(id.toString() + " (no element) " + iterator.toString()); - } - final Object result = iterator.next(); - if (iterator.hasNext()) { - throw new IllegalStateException(id.toString() + " (more than single element) " + iterator.toString()); - } - return result; - } else { - // Get from master - final CompletableFuture responseFromMasterFuture = toMaster - .getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).request( - ControlMessage.Message.newBuilder() - .setId(RuntimeIdManager.generateMessageId()) - .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID) - .setType(ControlMessage.MessageType.RequestBroadcastVariable) - .setRequestbroadcastVariableMsg( - ControlMessage.RequestBroadcastVariableMessage.newBuilder() - .setExecutorId(executorId) - .setBroadcastId(ByteString.copyFrom(SerializationUtils.serialize(id))) - .build()) - .build()); - return SerializationUtils.deserialize( - responseFromMasterFuture.get().getBroadcastVariableMsg().getVariable().toByteArray()); - } + // Get from master + final CompletableFuture responseFromMasterFuture = toMaster + .getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).request( + ControlMessage.Message.newBuilder() + .setId(RuntimeIdManager.generateMessageId()) + .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID) + .setType(ControlMessage.MessageType.RequestBroadcastVariable) + .setRequestbroadcastVariableMsg( + ControlMessage.RequestBroadcastVariableMessage.newBuilder() + .setExecutorId(executorId) + .setBroadcastId(ByteString.copyFrom(SerializationUtils.serialize(id))) + .build()) + .build()); + return SerializationUtils.deserialize( + responseFromMasterFuture.get().getBroadcastVariableMsg().getVariable().toByteArray()); } }); } - /** - * When the broadcast variable can be read by an input reader. - * (i.e., the variable is expressed as an IREdge, and reside in a executor as a block) - * - * @param id of the broadcast variable. - * @param inputReader the {@link InputReader} to register. - */ - public void registerInputReader(final Serializable id, - final InputReader inputReader) { - this.idToReader.put(id, inputReader); - } - /** * Get the variable with the id. * @param id of the variable. diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java index ed6073ef6b..43d11cfc45 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java @@ -33,6 +33,7 @@ public final class DedicatedKeyPerElementPartitioner implements Partitioner void emit(final String dstVertexId, final T output) { @Override public void emitWatermark(final Watermark watermark) { - if (LOG.isDebugEnabled()) { LOG.debug("{} emits watermark {}", irVertex.getId(), watermark); } diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java index dd70394167..03d7470f0c 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java @@ -78,7 +78,6 @@ public final class PipeOutputWriter implements OutputWriter { private void writeData(final Object element, final List pipeList) { pipeList.forEach(pipe -> { - try (final ByteOutputContext.ByteOutputStream pipeToWriteTo = pipe.newOutputStream()) { // Serialize (Do not compress) final DirectByteArrayOutputStream bytesOutputStream = new DirectByteArrayOutputStream(); @@ -149,10 +148,14 @@ private void doInitialize() { } private List getPipeToWrite(final Object element) { - return runtimeEdge.getPropertyValue(CommunicationPatternProperty.class) - .get() - .equals(CommunicationPatternProperty.Value.OneToOne) - ? Collections.singletonList(pipes.get(0)) - : Collections.singletonList(pipes.get((int) partitioner.partition(element))); + final CommunicationPatternProperty.Value comm = + (CommunicationPatternProperty.Value) runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).get(); + if (comm.equals(CommunicationPatternProperty.Value.OneToOne)) { + return Collections.singletonList(pipes.get(0)); + } else if (comm.equals(CommunicationPatternProperty.Value.BroadCast)) { + return pipes; + } else { + return Collections.singletonList(pipes.get((int) partitioner.partition(element))); + } } } diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java index e8135f9f05..e3562ed395 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java @@ -20,12 +20,15 @@ import org.apache.nemo.common.ir.OutputCollector; import org.apache.nemo.common.punctuation.Watermark; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This is a special implementation for single input data stream for optimization. */ public final class SingleInputWatermarkManager implements InputWatermarkManager { + private static final Logger LOG = LoggerFactory.getLogger(SingleInputWatermarkManager.class.getName()); private final OutputCollector watermarkCollector; diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java index 2ca3df86c1..215a6a85ff 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java @@ -31,6 +31,10 @@ abstract class DataFetcher implements AutoCloseable { private final IRVertex dataSource; private final OutputCollector outputCollector; + /** + * @param dataSource to fetch from. + * @param outputCollector for the data fetched. + */ DataFetcher(final IRVertex dataSource, final OutputCollector outputCollector) { this.dataSource = dataSource; @@ -48,4 +52,8 @@ abstract class DataFetcher implements AutoCloseable { OutputCollector getOutputCollector() { return outputCollector; } + + IRVertex getDataSource() { + return dataSource; + } } diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java index 001060c7a0..7ce1ed91f6 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java @@ -63,6 +63,7 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher { // A watermark manager private InputWatermarkManager inputWatermarkManager; + MultiThreadParentTaskDataFetcher(final IRVertex dataSource, final InputReader readerForParentTask, final OutputCollector outputCollector) { @@ -113,8 +114,6 @@ private void fetchDataLazily() { // Consume this iterator to the end. while (iterator.hasNext()) { // blocked on the iterator. final Object element = iterator.next(); - - if (element instanceof WatermarkWithIndex) { // watermark element // the input watermark manager is accessed by multiple threads @@ -177,17 +176,14 @@ public void close() throws Exception { * It receives the watermark from InputWatermarkManager. */ private final class WatermarkCollector implements OutputCollector { - @Override public void emit(final Object output) { throw new IllegalStateException("Should not be called"); } - @Override public void emitWatermark(final Watermark watermark) { elementQueue.offer(watermark); } - @Override public void emit(final String dstVertexId, final Object output) { throw new IllegalStateException("Should not be called"); diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java index 80bbea2413..d3c223f66b 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java @@ -50,7 +50,8 @@ class ParentTaskDataFetcher extends DataFetcher { private long serBytes = 0; private long encodedBytes = 0; - ParentTaskDataFetcher(final IRVertex dataSource, final InputReader readerForParentTask, + ParentTaskDataFetcher(final IRVertex dataSource, + final InputReader readerForParentTask, final OutputCollector outputCollector) { super(dataSource, outputCollector); this.readersForParentTask = readerForParentTask; diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java index 518bff36f3..b08b12a251 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java @@ -25,12 +25,11 @@ import org.apache.nemo.common.ir.OutputCollector; import org.apache.nemo.common.ir.Readable; import org.apache.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty; -import org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty; import org.apache.nemo.common.ir.vertex.*; import org.apache.nemo.common.ir.vertex.transform.AggregateMetricTransform; import org.apache.nemo.common.ir.vertex.transform.Transform; -import org.apache.nemo.runtime.executor.datatransfer.MultiInputWatermarkManager; import org.apache.nemo.common.punctuation.Watermark; +import org.apache.nemo.runtime.executor.datatransfer.MultiInputWatermarkManager; import org.apache.nemo.common.punctuation.Finishmark; import org.apache.nemo.runtime.common.RuntimeIdManager; import org.apache.nemo.runtime.common.comm.ControlMessage; @@ -71,7 +70,7 @@ public final class TaskExecutor { private boolean isExecuted; private final String taskId; private final TaskStateManager taskStateManager; - private final List nonBroadcastDataFetchers; + private final List dataFetchers; private final BroadcastManagerWorker broadcastManagerWorker; private final List sortedHarnesses; @@ -121,7 +120,7 @@ public TaskExecutor(final Task task, // Prepare data structures final Pair, List> pair = prepare(task, irVertexDag, intermediateDataIOFactory); - this.nonBroadcastDataFetchers = pair.left(); + this.dataFetchers = pair.left(); this.sortedHarnesses = pair.right(); } @@ -188,7 +187,6 @@ private Pair, List> prepare( // in {@link this#getInternalMainOutputs and this#internalMainOutputs} final Map operatorWatermarkManagerMap = new HashMap<>(); reverseTopologicallySorted.forEach(childVertex -> { - if (childVertex instanceof OperatorVertex) { final List edges = getAllIncomingEdges(task, irVertexDag, childVertex); if (edges.size() == 1) { @@ -201,11 +199,10 @@ private Pair, List> prepare( new OperatorWatermarkCollector((OperatorVertex) childVertex))); } } - }); // Create a harness for each vertex - final List nonBroadcastDataFetcherList = new ArrayList<>(); + final List dataFetcherList = new ArrayList<>(); final Map vertexIdToHarness = new HashMap<>(); reverseTopologicallySorted.forEach(irVertex -> { @@ -250,38 +247,17 @@ irVertex, outputCollector, new TransformContextImpl(broadcastManagerWorker), // Source read if (irVertex instanceof SourceVertex) { // Source vertex read - nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher( - (SourceVertex) irVertex, sourceReader.get(), outputCollector)); + dataFetcherList.add(new SourceVertexDataFetcher( + (SourceVertex) irVertex, + sourceReader.get(), + outputCollector)); } - // Parent-task read (broadcasts) - final List inEdgesForThisVertex = task.getTaskIncomingEdges() - .stream() - .filter(inEdge -> inEdge.getDstIRVertex().getId().equals(irVertex.getId())) - .collect(Collectors.toList()); - final List broadcastInEdges = inEdgesForThisVertex - .stream() - .filter(stageEdge -> stageEdge.getPropertyValue(BroadcastVariableIdProperty.class).isPresent()) - .collect(Collectors.toList()); - final List broadcastReaders = - getParentTaskReaders(taskIndex, broadcastInEdges, intermediateDataIOFactory); - if (broadcastInEdges.size() != broadcastReaders.size()) { - throw new IllegalStateException(broadcastInEdges.toString() + ", " + broadcastReaders.toString()); - } - for (int i = 0; i < broadcastInEdges.size(); i++) { - final StageEdge inEdge = broadcastInEdges.get(i); - broadcastManagerWorker.registerInputReader( - inEdge.getPropertyValue(BroadcastVariableIdProperty.class) - .orElseThrow(() -> new IllegalStateException(inEdge.toString())), - broadcastReaders.get(i)); - } - - // Parent-task read (non-broadcasts) - final List nonBroadcastInEdges = new ArrayList<>(inEdgesForThisVertex); - nonBroadcastInEdges.removeAll(broadcastInEdges); - - nonBroadcastInEdges + // Parent-task read + // TODO #285: Cache broadcasted data + task.getTaskIncomingEdges() .stream() + .filter(inEdge -> inEdge.getDstIRVertex().getId().equals(irVertex.getId())) // edge to this vertex .map(incomingEdge -> Pair.of(incomingEdge, intermediateDataIOFactory .createReader(taskIndex, incomingEdge.getSrcIRVertex(), incomingEdge))) @@ -291,14 +267,21 @@ irVertex, outputCollector, new TransformContextImpl(broadcastManagerWorker), final int edgeIndex = edgeIndexMap.get(edge); final InputWatermarkManager watermarkManager = operatorWatermarkManagerMap.get(irVertex); final InputReader parentTaskReader = pair.right(); + final OutputCollector dataFetcherOutputCollector = + new DataFetcherOutputCollector((OperatorVertex) irVertex, edgeIndex, watermarkManager); + if (parentTaskReader instanceof PipeInputReader) { - nonBroadcastDataFetcherList.add( - new MultiThreadParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader, - new DataFetcherOutputCollector((OperatorVertex) irVertex, edgeIndex, watermarkManager))); + dataFetcherList.add( + new MultiThreadParentTaskDataFetcher( + parentTaskReader.getSrcIrVertex(), + parentTaskReader, + dataFetcherOutputCollector)); } else { - nonBroadcastDataFetcherList.add( - new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader, - new DataFetcherOutputCollector((OperatorVertex) irVertex, edgeIndex, watermarkManager))); + dataFetcherList.add( + new ParentTaskDataFetcher( + parentTaskReader.getSrcIrVertex(), + parentTaskReader, + dataFetcherOutputCollector)); } } }); @@ -309,7 +292,7 @@ irVertex, outputCollector, new TransformContextImpl(broadcastManagerWorker), .map(vertex -> vertexIdToHarness.get(vertex.getId())) .collect(Collectors.toList()); - return Pair.of(nonBroadcastDataFetcherList, sortedHarnessList); + return Pair.of(dataFetcherList, sortedHarnessList); } /** @@ -319,7 +302,8 @@ private void processElement(final OutputCollector outputCollector, final Object outputCollector.emit(dataElement); } - private void processWatermark(final OutputCollector outputCollector, final Watermark watermark) { + private void processWatermark(final OutputCollector outputCollector, + final Watermark watermark) { outputCollector.emitWatermark(watermark); } @@ -338,7 +322,7 @@ public void execute() { /** * The task is executed in the following two phases. - * - Phase 1: Consume task-external input data (non-broadcasts) + * - Phase 1: Consume task-external input data * - Phase 2: Finalize task-internal states and data elements */ private void doExecute() { @@ -349,8 +333,8 @@ private void doExecute() { LOG.info("{} started", taskId); taskStateManager.onTaskStateChanged(TaskState.State.EXECUTING, Optional.empty(), Optional.empty()); - // Phase 1: Consume task-external input data. (non-broadcasts) - if (!handleDataFetchers(nonBroadcastDataFetchers)) { + // Phase 1: Consume task-external input data. + if (!handleDataFetchers(dataFetchers)) { return; } @@ -383,14 +367,14 @@ private void finalizeVertex(final VertexHarness vertexHarness) { } /** - * Process an element generated from the dataFetcher. - * If the element is an instance of Finishmark, we remove the dataFetcher from the current list. - * @param element element + * Process an event generated from the dataFetcher. + * If the event is an instance of Finishmark, we remove the dataFetcher from the current list. + * @param event event * @param dataFetcher current data fetcher */ - private void handleElement(final Object element, - final DataFetcher dataFetcher) { - if (element instanceof Finishmark) { + private void onEventFromDataFetcher(final Object event, + final DataFetcher dataFetcher) { + if (event instanceof Finishmark) { // We've consumed all the data from this data fetcher. if (dataFetcher instanceof SourceVertexDataFetcher) { boundedSourceReadTime += ((SourceVertexDataFetcher) dataFetcher).getBoundedSourceReadTime(); @@ -401,12 +385,12 @@ private void handleElement(final Object element, serializedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getSerializedBytes(); encodedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getEncodedBytes(); } - } else if (element instanceof Watermark) { + } else if (event instanceof Watermark) { // Watermark - processWatermark(dataFetcher.getOutputCollector(), (Watermark) element); + processWatermark(dataFetcher.getOutputCollector(), (Watermark) event); } else { // Process data element - processElement(dataFetcher.getOutputCollector(), element); + processElement(dataFetcher.getOutputCollector(), event); } } @@ -457,7 +441,7 @@ private boolean handleDataFetchers(final List fetchers) { final DataFetcher dataFetcher = availableIterator.next(); try { final Object element = dataFetcher.fetchDataElement(); - handleElement(element, dataFetcher); + onEventFromDataFetcher(element, dataFetcher); if (element instanceof Finishmark) { availableIterator.remove(); } @@ -485,7 +469,7 @@ && isPollingTime(pollingInterval, currentTime, prevPollingTime)) { final DataFetcher dataFetcher = pendingIterator.next(); try { final Object element = dataFetcher.fetchDataElement(); - handleElement(element, dataFetcher); + onEventFromDataFetcher(element, dataFetcher); // We processed data. This means the data fetcher is now available. // Add current data fetcher to available diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java index 6cbf69401d..6b94ca754a 100644 --- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java +++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java @@ -24,6 +24,7 @@ import org.apache.nemo.runtime.executor.data.DataUtil; import org.apache.nemo.runtime.executor.datatransfer.BlockInputReader; import org.apache.nemo.runtime.executor.datatransfer.InputReader; +import org.apache.nemo.runtime.executor.datatransfer.InputWatermarkManager; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; @@ -121,9 +122,9 @@ public void testErrorWhenReadingData() throws Exception { private ParentTaskDataFetcher createFetcher(final InputReader readerForParentTask) { return new ParentTaskDataFetcher( - mock(IRVertex.class), - readerForParentTask, // This is the only argument that affects the behavior of ParentTaskDataFetcher - mock(OutputCollector.class)); + mock(IRVertex.class), + readerForParentTask, // This is the only argument that affects the behavior of ParentTaskDataFetcher + mock(OutputCollector.class)); } private InputReader generateInputReader(final CompletableFuture completableFuture) { diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java index 6ae716ae2d..919143b3c5 100644 --- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java +++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java @@ -26,7 +26,6 @@ import org.apache.nemo.common.ir.Readable; import org.apache.nemo.common.ir.edge.IREdge; import org.apache.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty; -import org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty; import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty; import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty; import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty; @@ -66,7 +65,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -422,16 +420,16 @@ public void testTwoOperatorsWithBroadcastVariable() { .buildWithoutSourceSinkCheck(); final StageEdge taskOutEdge = mockStageEdgeFrom(operatorIRVertex2); + final StageEdge taskInEdge = mockStageEdgeTo(operatorIRVertex1); - final StageEdge broadcastInEdge = mockBroadcastVariableStageEdgeTo( - new OperatorVertex(singleListTransform), operatorIRVertex2, broadcastId, elements); + when(broadcastManagerWorker.get(broadcastId)).thenReturn(new ArrayList<>(elements)); final Task task = new Task( "testSourceVertexDataFetching", generateTaskId(), TASK_EXECUTION_PROPERTY_MAP, new byte[0], - Arrays.asList(mockStageEdgeTo(operatorIRVertex1), broadcastInEdge), + Collections.singletonList(taskInEdge), Collections.singletonList(taskOutEdge), Collections.emptyMap()); @@ -541,23 +539,6 @@ private StageEdge mockStageEdgeTo(final IRVertex irVertex) { mock(Stage.class)); } - private StageEdge mockBroadcastVariableStageEdgeTo(final IRVertex srcVertex, - final IRVertex dstVertex, - final Serializable broadcastVariableId, - final Object broadcastVariable) { - when(broadcastManagerWorker.get(broadcastVariableId)).thenReturn(broadcastVariable); - - final ExecutionPropertyMap executionPropertyMap = - ExecutionPropertyMap.of(mock(IREdge.class), CommunicationPatternProperty.Value.OneToOne); - executionPropertyMap.put(BroadcastVariableIdProperty.of(broadcastVariableId)); - return new StageEdge("runtime outgoing edge id", - executionPropertyMap, - srcVertex, - dstVertex, - mock(Stage.class), - mock(Stage.class)); - } - /** * Represents the answer return an inter-stage {@link InputReader}, * which will have multiple iterable according to the source parallelism.