From 6428d2f58e938ef96fde6c54cfbdb1a672da6af2 Mon Sep 17 00:00:00 2001 From: Siyuan Hua Date: Thu, 20 Oct 2016 10:52:04 -0700 Subject: [PATCH] APEXMALHAR-2203 support control tuple and watermark in high-level API Test control tuple connection Add test for watermark injection --- .../apache/apex/malhar/stream/api/Option.java | 64 +++++++- .../stream/api/annotation/ControlPort.java | 33 ++++ .../apex/malhar/stream/api/impl/DagMeta.java | 143 +++++++++++++++--- .../malhar/stream/api/impl/IDGenerator.java | 5 + .../api/operator/PostprocessOperator.java | 114 ++++++++++++++ .../stream/api/impl/ApexStreamImplTest.java | 139 +++++++++++++++++ 6 files changed, 470 insertions(+), 28 deletions(-) create mode 100644 stream/src/main/java/org/apache/apex/malhar/stream/api/annotation/ControlPort.java create mode 100644 stream/src/main/java/org/apache/apex/malhar/stream/api/operator/PostprocessOperator.java diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/Option.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/Option.java index 6aa829ef51..27615aa2fb 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/Option.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/Option.java @@ -18,31 +18,79 @@ */ package org.apache.apex.malhar.stream.api; -import org.apache.hadoop.classification.InterfaceStability; +import org.apache.apex.malhar.lib.window.ControlTuple; import com.datatorrent.api.Attribute; +import com.datatorrent.api.Component; +import com.datatorrent.api.Context; /** - * Options for the operators in the dag - * - * @since 3.5.0 + * Options for each transformation in the dag */ -@InterfaceStability.Evolving public interface Option { + abstract class WatermarkGenerator implements Option, Component + { + + @Override + public void setup(Context.OperatorContext context) + { + + } + + @Override + public void teardown() + { + + } + /** + * The current watermark generation will be called each endWindow to generate watermark + * @return null if you don't want to generate watermark for this window + */ + public abstract ControlTuple.Watermark currentWatermark(); + + /** + * Generate the watermark from current tuple + * @param input + * @return null if you can't generate watermark from current tuple + */ + public abstract ControlTuple.Watermark getWatermarkFromTuple(T input); + } + + /** + * Factory class to generate options + */ class Options { + /** + * Create name options so the operator will be create with the name + * @param name operator name + * @return + */ public static Option name(String name) { return new OpName(name); } + /** + * Create property options to set properties for the operator + * @param name + * @param value + * @return + */ public static Option prop(String name, Object value) { return new PropSetting(name, value); } + /** + * Create attribte options to set the operator attributes + * @param attr + * @param obj + * @param + * @return + */ public static Option attr(Attribute attr, T obj) { return new AttributeSetting<>(attr, obj); @@ -50,7 +98,7 @@ public static Option attr(Attribute attr, T obj) } /** - * An option used to set the name of the operator + * An option to set the operator name */ class OpName implements Option { @@ -69,7 +117,7 @@ public String getName() } /** - * An option used to set the property value of the operator + * An option to set an operator property value */ class PropSetting implements Option { @@ -96,7 +144,7 @@ public Object getVal() } /** - * An option used to set the {@link Attribute} + * An option to set an attribute value * @param */ class AttributeSetting implements Option diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/annotation/ControlPort.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/annotation/ControlPort.java new file mode 100644 index 0000000000..89aa56ffd5 --- /dev/null +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/annotation/ControlPort.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.api.annotation; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +import com.datatorrent.api.Operator; + +/** + * Mark an {@link Operator.InputPort} or an {@link Operator.OutputPort} as for control tuple only + * So the port would be recognized and connected by high-level API + */ +@Retention(RetentionPolicy.RUNTIME) +public @interface ControlPort +{ +} diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java index eae8b1533b..7d5804ce6a 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java @@ -18,6 +18,7 @@ */ package org.apache.apex.malhar.stream.api.impl; +import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.util.HashMap; @@ -27,29 +28,34 @@ import java.util.Map; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.lib.window.impl.AbstractWindowedOperator; import org.apache.apex.malhar.stream.api.Option; +import org.apache.apex.malhar.stream.api.annotation.ControlPort; +import org.apache.apex.malhar.stream.api.operator.PostprocessOperator; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceStability; import com.datatorrent.api.Attribute; import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.Operator; import com.datatorrent.stram.plan.logical.LogicalPlan; /** - * Logical graph data structure for DAG
- * - * With the build method({@link #buildDAG()}, {@link #buildDAG(DAG)}) to convert it to Apex DAG + * Graph data structure for DAG + * With this data structure, the framework can do lazy load and optimization * * @since 3.4.0 */ @InterfaceStability.Evolving public class DagMeta { + + public static String POST_PROCESS_OPERATOR_SUFFIX = "_PostProcess"; + private List heads = new LinkedList<>(); List> dagAttributes = new LinkedList<>(); @@ -132,6 +138,11 @@ public String getOperatorName() } return operator.toString(); } + + public Option[] getOptions() + { + return options; + } } public DagMeta() @@ -149,37 +160,67 @@ public DAG buildDAG() public void buildDAG(DAG dag) { for (NodeMeta nm : heads) { - visitNode(nm, dag); + visitNode(nm, dag, null); } - logger.debug("Finish building the dag:\n {}", dag.toString()); } - private void visitNode(NodeMeta nm, DAG dag) + private void visitNode(NodeMeta nm, DAG dag, DefaultOutputPort controlTupleOutputPort) { - String opName = nm.getOperatorName(); - logger.debug("Building DAG: add operator {}: {}", opName, nm.operator); - dag.addOperator(opName, nm.operator); + + DefaultOutputPort possibleControlTuplePort = controlTupleOutputPort; + // add logical operator + dag.addOperator(nm.getOperatorName(), nm.operator); + + possibleControlTuplePort = connectControlPort(dag, possibleControlTuplePort, nm.getOperator()); + + PostprocessOperator postprocessOperator = null; + //deal with options + for (Option opt : nm.getOptions()) { + if (opt instanceof Option.WatermarkGenerator) { + postprocessOperator = getOrNew(postprocessOperator); + postprocessOperator.setWatermarkGenerator((Option.WatermarkGenerator)opt); + } + } + + if (postprocessOperator != null) { + dag.addOperator(nm.getOperatorName() + POST_PROCESS_OPERATOR_SUFFIX, postprocessOperator); + possibleControlTuplePort = connectControlPort(dag, possibleControlTuplePort, postprocessOperator); + Map.Entry, DAG.Locality>> toReplace = null; + for (Map.Entry, DAG.Locality>> entry : nm.nodeStreams.entrySet()) { + if (entry.getKey() == null || entry.getValue().getKey() == null || 0 == entry.getValue().getKey().size()) { + continue; + } else { + DAG.StreamMeta streamMeta = dag.addStream("inner_stream_post", entry.getKey(), + postprocessOperator.dataInput); + // always run postprocess operator to thread local + streamMeta.setLocality(DAG.Locality.THREAD_LOCAL); + toReplace = entry; + } + } + if (toReplace != null) { + Pair, DAG.Locality> val = nm.nodeStreams.remove(toReplace.getKey()); + nm.nodeStreams.put(postprocessOperator.dataOutput, val); + } + } + for (NodeMeta child : nm.children) { - visitNode(child, dag); + visitNode(child, dag, possibleControlTuplePort); } for (Map.Entry, DAG.Locality>> entry : nm.nodeStreams.entrySet()) { if (entry.getKey() == null || entry.getValue().getKey() == null || 0 == entry.getValue().getKey().size()) { continue; } - logger.debug("Building DAG: add stream {} from {} to {}", entry.getKey().toString(), entry.getKey(), entry.getValue().getLeft().toArray(new Operator.InputPort[]{})); DAG.StreamMeta streamMeta = dag.addStream(entry.getKey().toString(), entry.getKey(), entry.getValue().getLeft().toArray(new Operator.InputPort[]{})); // set locality if (entry.getValue().getRight() != null) { - logger.debug("Building DAG: set locality of the stream {} to {}", entry.getKey().toString(), entry.getValue().getRight()); streamMeta.setLocality(entry.getValue().getRight()); } //set attributes for output port if (nm.outputPortAttributes.containsKey(entry.getKey())) { for (Pair attr : nm.outputPortAttributes.get(entry.getKey())) { - logger.debug("Building DAG: set port attribute {} to {} for port {}", attr.getLeft(), attr.getValue(), entry.getKey()); dag.setOutputPortAttribute(entry.getKey(), attr.getLeft(), attr.getValue()); } } @@ -190,7 +231,6 @@ private void visitNode(NodeMeta nm, DAG dag) //set input port attributes if (nm.inputPortAttributes.containsKey(input)) { for (Pair attr : nm.inputPortAttributes.get(input)) { - logger.debug("Building DAG: set port attribute {} to {} for port {}", attr.getLeft(), attr.getValue(), input); dag.setInputPortAttribute(input, attr.getLeft(), attr.getValue()); } } @@ -198,12 +238,77 @@ private void visitNode(NodeMeta nm, DAG dag) // set operator attributes for (Pair attr : nm.operatorAttributes) { - logger.debug("Building DAG: set operator attribute {} to {} for operator {}", attr.getLeft(), attr.getValue(), nm.operator); dag.setAttribute(nm.operator, attr.getLeft(), attr.getValue()); } } + /** + * Connect transitiveControlTupleOutputPort to operator if the operator has control tuple port + * and return the new control output port if possible + * @param dag + * @param transitiveControlTupleOutputPort + * @param operator + * @return + */ + private DefaultOutputPort connectControlPort(DAG dag, DefaultOutputPort transitiveControlTupleOutputPort, Operator operator) + { + if (operator == null) { + return transitiveControlTupleOutputPort; + } + + DefaultOutputPort newOutput = null; + DefaultInputPort inputToBeConnected = null; + + if (operator instanceof AbstractWindowedOperator) { + inputToBeConnected = ((AbstractWindowedOperator)operator).controlInput; + newOutput = ((AbstractWindowedOperator)operator).controlOutput; + } else { + Operator.Port[] inputOutput = findControlTuplePort(operator); + inputToBeConnected = (DefaultInputPort)inputOutput[0]; + newOutput = (DefaultOutputPort)inputOutput[1]; + } + + + // connect control tuple port if possible + if (transitiveControlTupleOutputPort != null && inputToBeConnected != null) { + dag.addStream(IDGenerator.generateControlStreamNameWithUUID(), transitiveControlTupleOutputPort, inputToBeConnected); + } + + return newOutput != null ? newOutput : transitiveControlTupleOutputPort; + } + + private Operator.Port[] findControlTuplePort(Operator operator) + { + Operator.Port[] result = new Operator.Port[2]; + try { + for (Field f : operator.getClass().getFields()) { + for (Annotation an : f.getDeclaredAnnotations()) { + if (an instanceof ControlPort) { + if (Operator.InputPort.class.isAssignableFrom(f.getType())) { + result[0] = (Operator.Port)f.get(operator); + } else if (Operator.OutputPort.class.isAssignableFrom(f.getType())) { + result[1] = (Operator.Port)f.get(operator); + } + } + } + } + } catch (IllegalAccessException e) { + // should never get into this block + throw new RuntimeException("Port field needs to be public"); + } + return result; + } + + private PostprocessOperator getOrNew(PostprocessOperator postprocessOperator) + { + if (postprocessOperator == null) { + return new PostprocessOperator(); + } else { + return postprocessOperator; + } + } + public NodeMeta addNode(Operator operator, NodeMeta parent, Operator.OutputPort parentOutput, Operator.InputPort inputPort, Option... options) { @@ -218,6 +323,4 @@ public NodeMeta addNode(Operator operator, NodeMeta parent, Operator.OutputPort return newNode; } - private static final Logger logger = LoggerFactory.getLogger(DagMeta.class); - } diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java index b5bc286e4b..885c10029e 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java @@ -34,6 +34,7 @@ @InterfaceStability.Evolving public class IDGenerator { + public static final String STREAM_PREFIX = "ControlPassThrough"; public static String generateOperatorIDWithClock(Class operatorClazz) { @@ -45,5 +46,9 @@ public static String generateOperatorIDWithUUID(Class operat return operatorClazz.getName() + UUID.randomUUID(); } + public static String generateControlStreamNameWithUUID() { + return STREAM_PREFIX + UUID.randomUUID(); + } + } diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/PostprocessOperator.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/PostprocessOperator.java new file mode 100644 index 0000000000..c477130288 --- /dev/null +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/PostprocessOperator.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.api.operator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.stream.api.Option.WatermarkGenerator; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; + +/** + * A Post-Process operator that can be attached to direct upstream operator to generate {@link ControlTuple.Watermark} + */ +public class PostprocessOperator implements Operator +{ + private static final Logger logger = LoggerFactory.getLogger(PostprocessOperator.class); + + + public final transient DefaultOutputPort controlOutput = new DefaultOutputPort<>(); + + public final transient DefaultInputPort dataInput = new DefaultInputPort() + { + @Override + public void process(T o) + { + if (watermarkGenerator != null) { + ControlTuple.Watermark watermark = watermarkGenerator.getWatermarkFromTuple(o); + if (watermark != null) { + controlOutput.emit(watermark); + } + } + // data is just passing through + dataOutput.emit(o); + } + }; + + public final transient DefaultInputPort controlInput = new DefaultInputPort() + { + @Override + public void process(ControlTuple controlTuple) + { + controlOutput.emit(controlTuple); + } + }; + + public final transient DefaultOutputPort dataOutput = new DefaultOutputPort<>(); + + @Override + public void setup(Context.OperatorContext context) + { + if (watermarkGenerator != null) { + watermarkGenerator.setup(context); + } + } + + @Override + public void teardown() + { + if (watermarkGenerator != null) { + watermarkGenerator.teardown(); + } + } + + @Override + public void beginWindow(long l) + { + } + + @Override + public void endWindow() + { + if (watermarkGenerator != null) { + ControlTuple.Watermark w = watermarkGenerator.currentWatermark(); + if (w != null) { + logger.debug("Emitting watermark {}", w); + controlOutput.emit(w); + } + } + } + + private WatermarkGenerator watermarkGenerator; + + public void setWatermarkGenerator(WatermarkGenerator watermarkGenerator) + { + this.watermarkGenerator = watermarkGenerator; + } + + public WatermarkGenerator getWatermarkGenerator() + { + return watermarkGenerator; + } + +} diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java index 99d5ca6c0a..81cee87065 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java @@ -23,13 +23,26 @@ import java.util.HashSet; import java.util.Set; +import javax.annotation.Nullable; + import org.junit.Assert; import org.junit.Test; +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.stream.api.Option; +import org.apache.apex.malhar.stream.api.annotation.ControlPort; +import org.apache.apex.malhar.stream.api.function.Function; + +import com.esotericsoftware.kryo.io.Output; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; + import com.datatorrent.api.Context; import com.datatorrent.api.DAG; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.stram.plan.logical.LogicalPlan; @@ -88,6 +101,75 @@ public void testAddOperator() } + + @Test + public void testControlPortConnection() + { + LogicalPlan dag = new LogicalPlan(); + + OperatorWithControlPorts first = new OperatorWithControlPorts<>(); + OperatorWithControlPorts second = new OperatorWithControlPorts<>(); + OperatorWithControlPorts last = new OperatorWithControlPorts<>(); + + new ApexStreamImpl().addOperator(first, null, first.output, name("first")) + .addOperator(second, second.input, second.output, name("second")) + .map(new Function.MapFunction() + { + @Override + public String f(String input) + { + // mock function + return input; + } + }, name("third")).endWith(last, last.input, name("fourth")).populateDag(dag); + + // There are 3 data streams and 2 control streams (The map operator is skipped in this case) + Assert.assertEquals(5, dag.getAllStreams().size()); + for (LogicalPlan.StreamMeta sm : dag.getAllStreams()){ + if (!sm.getSource().getPortName().equals("controlTupleOutput")) { + continue; + } + + if (sm.getSource().getOperatorMeta().getName().equals("first")) { + Assert.assertEquals(1, sm.getSinks().size()); + Assert.assertEquals("second", sm.getSinks().get(0).getOperatorWrapper().getName()); + } else if (sm.getSource().getOperatorMeta().getName().equals("second")) { + Assert.assertEquals(1, sm.getSinks().size()); + // third operator is skipped because it has no control tuple port + Assert.assertEquals("fourth", sm.getSinks().get(0).getOperatorWrapper().getName()); + } + } + + } + + + @Test + public void testWatermarkInjection() + { + LogicalPlan dag = new LogicalPlan(); + + OperatorWithControlPorts first = new OperatorWithControlPorts<>(); + OperatorWithControlPorts second = new OperatorWithControlPorts<>(); + + new ApexStreamImpl().addOperator(first, null, first.output, name("first"), new TestWatermarkGenerator()) + .addOperator(second, second.input, second.output, name("second")).populateDag(dag); + + // There are 3 data streams and 2 control streams (The map operator is skipped in this case) + Assert.assertEquals(3, dag.getAllOperators().size()); + // 2 data stream first -> first_postprocess -> second + // 1 control stream first_postprocess -> second + Assert.assertEquals(3, dag.getAllStreams().size()); + Assert.assertTrue(Iterables.any(dag.getAllOperators(), new Predicate() + { + @Override + public boolean apply(@Nullable LogicalPlan.OperatorMeta operatorMeta) + { + return operatorMeta.getName().endsWith(DagMeta.POST_PROCESS_OPERATOR_SUFFIX); + } + })); + + } + /** * A mock operator for test * @param @@ -121,4 +203,61 @@ public void process(T o) } + public static class OperatorWithControlPorts extends TestOperator + { + @Override + public void beginWindow(long l) + { + + } + + @Override + public void endWindow() + { + + } + + @Override + public void setup(Context.OperatorContext context) + { + + } + + @Override + public void teardown() + { + + } + + @ControlPort + public final transient OutputPort controlTupleOutputPort = new DefaultOutputPort<>(); + + @ControlPort + public final transient DefaultInputPort controlTupleInputOperator = new DefaultInputPort() + { + @Override + public void process(ControlTuple o) + { + + } + }; + + } + + public static class TestWatermarkGenerator extends Option.WatermarkGenerator + { + + @Override + public ControlTuple.Watermark currentWatermark() + { + return null; + } + + @Override + public ControlTuple.Watermark getWatermarkFromTuple(T input) + { + return null; + } + } + }