From f8d1aa966ba907b2e3a306f5fe498ed8ac886656 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Mon, 25 Jul 2016 09:09:27 -0400 Subject: [PATCH 1/2] [FLINK-4257 to be squashed] [gelly] Handle delegating algorithm change of class A class created by ProxyFactory can intercept and reinterpret method calls using its MethodHandler, but is restricted in that * the type of the proxy class cannot be changed * method return types must be honored We have algorithms such as VertexDegree and TriangleListing that change return type depending on configuration, even between single and dual input functions. This can be problematic, e.g. in OperatorTranslation where we test dataSet instanceof SingleInputOperator or dataSet instanceof TwoInputOperator. Even simply changing operator can be problematic, e.g. MapOperator.translateToDataFlow returns MapOperatorBase whereas ReduceOperator.translateToDataFlow returns SingleInputOperator. Making changes only within Gelly we can append a "no-op" pass-through MapFunction to any algorithm output which is not a SingleInputOperator. And Delegate can also walk the superclass hierarchy such we are always proxying SingleInputOperator. There is one additional issue. When we call DataSet.output the delegate's MethodHandler must reinterpret this call to add itself to the list of sinks. --- .../flink/graph/utils/proxy/Delegate.java | 54 +++++++++++++++++-- .../GraphAlgorithmDelegatingDataSet.java | 23 +++++++- .../proxy/GraphAlgorithmDelegatingGraph.java | 31 ++++++++++- .../apache/flink/graph/utils/proxy/NoOp.java | 35 ++++++++++++ 4 files changed, 135 insertions(+), 8 deletions(-) create mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/NoOp.java diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java index a2d724dbe0be4..785c56113df98 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java @@ -22,6 +22,14 @@ import javassist.util.proxy.MethodHandler; import javassist.util.proxy.ProxyFactory; import javassist.util.proxy.ProxyObject; +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DataSink; +import org.apache.flink.api.java.operators.Operator; +import org.apache.flink.api.java.typeutils.InputTypeConfigurable; +import org.apache.flink.util.Preconditions; import org.objenesis.ObjenesisStd; import java.lang.reflect.Method; @@ -68,8 +76,13 @@ public X getProxy() { return proxy; } + Class superclass = obj.getClass(); + while (! superclass.getSuperclass().equals(Operator.class)) { + superclass = superclass.getSuperclass(); + } + ProxyFactory factory = new ProxyFactory(); - factory.setSuperclass(obj.getClass()); + factory.setSuperclass(superclass); factory.setInterfaces(new Class[]{ReferentProxy.class}); // create the class and instantiate an instance without calling a constructor @@ -89,9 +102,42 @@ public Object invoke(Object self, Method thisMethod, Method proceed, Object[] ar // this method is provided by the ReferentProxy interface return obj; } else { - // method visibility may be restricted - thisMethod.setAccessible(true); - return thisMethod.invoke(obj, args); + Method objMethod; + try { + // lookup public method from class hierarchy + objMethod = obj.getClass().getMethod(thisMethod.getName(), thisMethod.getParameterTypes()); + } catch(NoSuchMethodException ignored) { + // lookup private method defined on class + objMethod = obj.getClass().getDeclaredMethod(thisMethod.getName(), thisMethod.getParameterTypes()); + } + + Class[] parameterTypes = thisMethod.getParameterTypes(); + + // reinterpret DataSet.output to add the delegate to the list of data sinks; otherwise, the proxy + // object will be added to the list of sinks and cannot be replaced + if (thisMethod.getName().equals("output") && parameterTypes.length == 1 && parameterTypes[0].equals(OutputFormat.class)) { + OutputFormat outputFormat = (OutputFormat)args[0]; + Preconditions.checkNotNull(outputFormat); + + ExecutionEnvironment context = ((DataSet)obj).getExecutionEnvironment(); + TypeInformation type = ((DataSet)obj).getType(); + + // configure the type if needed + if (outputFormat instanceof InputTypeConfigurable) { + ((InputTypeConfigurable) outputFormat).setInputType(type, context.getConfig() ); + } + + DataSink sink = new DataSink((DataSet)self, outputFormat, type); + + Method outputMethod = ExecutionEnvironment.class.getDeclaredMethod("registerDataSink", DataSink.class); + outputMethod.setAccessible(true); + outputMethod.invoke(context, sink); + return sink; + } else { + // method visibility may be restricted + objMethod.setAccessible(true); + return objMethod.invoke(obj, args); + } } } }); diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java index 8e796e6258ef5..1081bfccb909b 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java @@ -21,6 +21,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.SingleInputOperator; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; @@ -121,7 +122,7 @@ public final DataSet run(Graph input) for (GraphAlgorithmDelegatingDataSet other : cache.get(this)) { if (mergeConfiguration(other)) { // configuration has been merged so generate new output - DataSet output = runInternal(input); + DataSet output = checkOutput(runInternal(input)); // update delegatee object and reuse delegate other.delegate.setObject(output); @@ -133,7 +134,7 @@ public final DataSet run(Graph input) } // no mergeable configuration found so generate new output - DataSet output = runInternal(input); + DataSet output = checkOutput(runInternal(input)); // create a new delegate to wrap the algorithm output delegate = new Delegate<>(output); @@ -147,4 +148,22 @@ public final DataSet run(Graph input) return delegate.getProxy(); } + + /** + * If the given DataSet is not of type SingleInputOperator then a "no-op" + * map is appended which simply passes input to output. This guarantees + * that the delegating class type does not change. + * + * @param output user-defined function output + * @return a SingleInputOperator + */ + private DataSet checkOutput(DataSet output) { + if (output instanceof SingleInputOperator) { + return output; + } else { + return output + .map(new NoOp()) + .name("No-op"); + } + } } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java index 705510a6a1377..3209cb2d98899 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java @@ -21,6 +21,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.SingleInputOperator; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; @@ -127,7 +128,7 @@ public final Graph run(Graph input) for (GraphAlgorithmDelegatingGraph other : cache.get(this)) { if (mergeConfiguration(other)) { // configuration has been merged so generate new output - Graph output = runInternal(input); + Graph output = checkOutput(runInternal(input)); // update delegatee object and reuse delegate other.verticesDelegate.setObject(output.getVertices()); @@ -142,7 +143,7 @@ public final Graph run(Graph input) } // no mergeable configuration found so generate new output - Graph output = runInternal(input); + Graph output = checkOutput(runInternal(input)); // create a new delegate to wrap the algorithm output verticesDelegate = new Delegate<>(output.getVertices()); @@ -157,4 +158,30 @@ public final Graph run(Graph input) return Graph.fromDataSet(verticesDelegate.getProxy(), edgesDelegate.getProxy(), output.getContext()); } + + /** + * If the given Graph vertex and edge DataSets are not of type SingleInputOperator + * then a "no-op" map is appended which simply passes input to output. This + * guarantees that the delegating class type does not change. + * + * @param output user-defined function output + * @return a Graph with vertex and edge sets of type SingleInputOperator + */ + private Graph checkOutput(Graph output) { + DataSet> vertices = output.getVertices(); + if (!(vertices instanceof SingleInputOperator)) { + vertices = vertices + .map(new NoOp>()) + .name("No-op"); + } + + DataSet> edges = output.getEdges(); + if (!(edges instanceof SingleInputOperator)) { + edges = edges + .map(new NoOp>()) + .name("No-op"); + } + + return Graph.fromDataSet(vertices, edges, output.getContext()); + } } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/NoOp.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/NoOp.java new file mode 100644 index 0000000000000..8f76793667b91 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/NoOp.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils.proxy; + +import org.apache.flink.api.common.functions.MapFunction; + +/** + * A map which passes input to output. + * + * @param input and output type + */ +public class NoOp +implements MapFunction { + + @Override + public T map(T value) throws Exception { + return value; + } +} From 3209b05ab5575aba08f9cd6284b720ef9dfc5a02 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Mon, 25 Jul 2016 09:41:31 -0400 Subject: [PATCH 2/2] [FLINK-4257] [gelly] Handle delegating algorithm change of class Replaces Delegate with NoOpOperator. --- .../annotate/directed/EdgeDegreesPair.java | 6 +- .../annotate/directed/EdgeSourceDegrees.java | 6 +- .../annotate/directed/EdgeTargetDegrees.java | 6 +- .../annotate/directed/VertexDegrees.java | 6 +- .../annotate/directed/VertexInDegree.java | 6 +- .../annotate/directed/VertexOutDegree.java | 6 +- .../annotate/undirected/EdgeDegreePair.java | 6 +- .../annotate/undirected/EdgeSourceDegree.java | 6 +- .../annotate/undirected/EdgeTargetDegree.java | 6 +- .../annotate/undirected/VertexDegree.java | 6 +- .../filter/undirected/MaximumDegree.java | 6 +- .../graph/asm/simple/directed/Simplify.java | 6 +- .../graph/asm/simple/undirected/Simplify.java | 6 +- .../asm/translate/TranslateEdgeValues.java | 6 +- .../asm/translate/TranslateGraphIds.java | 6 +- .../asm/translate/TranslateVertexValues.java | 6 +- .../directed/LocalClusteringCoefficient.java | 6 +- .../clustering/directed/TriangleListing.java | 6 +- .../LocalClusteringCoefficient.java | 6 +- .../undirected/TriangleListing.java | 6 +- .../graph/library/link_analysis/HITS.java | 6 +- .../graph/library/similarity/AdamicAdar.java | 6 +- .../library/similarity/JaccardIndex.java | 6 +- .../flink/graph/utils/proxy/Delegate.java | 158 ------------------ ...ava => GraphAlgorithmWrappingDataSet.java} | 64 +++---- ....java => GraphAlgorithmWrappingGraph.java} | 80 +++------ .../apache/flink/graph/utils/proxy/NoOp.java | 35 ---- 27 files changed, 119 insertions(+), 356 deletions(-) delete mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java rename flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/{GraphAlgorithmDelegatingDataSet.java => GraphAlgorithmWrappingDataSet.java} (63%) rename flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/{GraphAlgorithmDelegatingGraph.java => GraphAlgorithmWrappingGraph.java} (56%) delete mode 100644 flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/NoOp.java diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java index be19613168f79..408516bc586ae 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java @@ -27,7 +27,7 @@ import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -41,7 +41,7 @@ * @param edge value type */ public class EdgeDegreesPair -extends GraphAlgorithmDelegatingDataSet>> { +extends GraphAlgorithmWrappingDataSet>> { // Optional configuration private int parallelism = PARALLELISM_DEFAULT; @@ -64,7 +64,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! EdgeDegreesPair.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java index ee3175e88c563..e55e3c6170b44 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java @@ -26,7 +26,7 @@ import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -40,7 +40,7 @@ * @param edge value type */ public class EdgeSourceDegrees -extends GraphAlgorithmDelegatingDataSet>> { +extends GraphAlgorithmWrappingDataSet>> { // Optional configuration private int parallelism = PARALLELISM_DEFAULT; @@ -63,7 +63,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! EdgeSourceDegrees.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java index 6ba47f2dd4f14..ed48f989fb1cc 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java @@ -26,7 +26,7 @@ import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -40,7 +40,7 @@ * @param edge value type */ public class EdgeTargetDegrees -extends GraphAlgorithmDelegatingDataSet>> { +extends GraphAlgorithmWrappingDataSet>> { // Optional configuration private int parallelism = PARALLELISM_DEFAULT; @@ -63,7 +63,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! EdgeTargetDegrees.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java index 84873bc9c126a..f4d734e660991 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java @@ -33,7 +33,7 @@ import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; import org.apache.flink.graph.utils.Murmur3_32; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.ByteValue; import org.apache.flink.types.LongValue; @@ -50,7 +50,7 @@ * @param edge value type */ public class VertexDegrees -extends GraphAlgorithmDelegatingDataSet> { +extends GraphAlgorithmWrappingDataSet> { // Optional configuration private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true); @@ -90,7 +90,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! VertexDegrees.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java index f7ac18b6054e1..3f842a63a3d9a 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java @@ -25,7 +25,7 @@ import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToTargetId; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.LongValue; import org.apache.flink.util.Preconditions; @@ -40,7 +40,7 @@ * @param edge value type */ public class VertexInDegree -extends GraphAlgorithmDelegatingDataSet> { +extends GraphAlgorithmWrappingDataSet> { // Optional configuration private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true); @@ -83,7 +83,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! VertexInDegree.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java index e235f6aa3654c..0ec4fc1366b05 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java @@ -25,7 +25,7 @@ import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.LongValue; import org.apache.flink.util.Preconditions; @@ -40,7 +40,7 @@ * @param edge value type */ public class VertexOutDegree -extends GraphAlgorithmDelegatingDataSet> { +extends GraphAlgorithmWrappingDataSet> { // Optional configuration private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true); @@ -83,7 +83,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! VertexOutDegree.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java index 1f78566d02822..09ef9756eda4c 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java @@ -26,7 +26,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.LongValue; import org.apache.flink.util.Preconditions; @@ -42,7 +42,7 @@ * @param edge value type */ public class EdgeDegreePair -extends GraphAlgorithmDelegatingDataSet>> { +extends GraphAlgorithmWrappingDataSet>> { // Optional configuration private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false); @@ -85,7 +85,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java index 520723c6d29ae..702fead867dda 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java @@ -25,7 +25,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.LongValue; import org.apache.flink.util.Preconditions; @@ -40,7 +40,7 @@ * @param edge value type */ public class EdgeSourceDegree -extends GraphAlgorithmDelegatingDataSet>> { +extends GraphAlgorithmWrappingDataSet>> { // Optional configuration private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false); @@ -83,7 +83,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java index 123c1dc002ca8..724567e9f56f6 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java @@ -25,7 +25,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.LongValue; import org.apache.flink.util.Preconditions; @@ -40,7 +40,7 @@ * @param edge value type */ public class EdgeTargetDegree -extends GraphAlgorithmDelegatingDataSet>> { +extends GraphAlgorithmWrappingDataSet>> { // Optional configuration private OptionalBoolean reduceOnSourceId = new OptionalBoolean(false, false); @@ -83,7 +83,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java index 42f084d7f872f..0f753fc972981 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java @@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; import org.apache.flink.api.java.DataSet; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; @@ -43,7 +43,7 @@ * @param edge value type */ public class VertexDegree -extends GraphAlgorithmDelegatingDataSet> { +extends GraphAlgorithmWrappingDataSet> { // Optional configuration private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true); @@ -103,7 +103,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! VertexDegree.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java index f9cfae9f93014..be19ffd0798b9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java @@ -29,7 +29,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.LongValue; import org.apache.flink.util.Collector; @@ -47,7 +47,7 @@ * @param edge value type */ public class MaximumDegree -extends GraphAlgorithmDelegatingGraph { +extends GraphAlgorithmWrappingGraph { // Required configuration private long maximumDegree; @@ -120,7 +120,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) { Preconditions.checkNotNull(other); if (! MaximumDegree.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java index 983dac9cf2eef..99ffe0d0c8934 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java @@ -22,7 +22,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph; import org.apache.flink.types.CopyableValue; import org.apache.flink.util.Preconditions; @@ -36,7 +36,7 @@ * @param edge value type */ public class Simplify & CopyableValue, VV, EV> -extends GraphAlgorithmDelegatingGraph { +extends GraphAlgorithmWrappingGraph { // Optional configuration private int parallelism = PARALLELISM_DEFAULT; @@ -62,7 +62,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) { Preconditions.checkNotNull(other); if (! Simplify.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java index ce78cfa8faf35..45cd3f9b69efb 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java @@ -22,7 +22,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph; import org.apache.flink.types.CopyableValue; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; @@ -38,7 +38,7 @@ * @param edge value type */ public class Simplify & CopyableValue, VV, EV> -extends GraphAlgorithmDelegatingGraph { +extends GraphAlgorithmWrappingGraph { // Required configuration private boolean clipAndFlip; @@ -80,7 +80,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) { Preconditions.checkNotNull(other); if (! Simplify.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java index 6003c9a67603f..bde826e7041bb 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java @@ -21,7 +21,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph; import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -36,7 +36,7 @@ * @param new edge value type */ public class TranslateEdgeValues -extends GraphAlgorithmDelegatingGraph { +extends GraphAlgorithmWrappingGraph { // Required configuration private TranslateFunction translator; @@ -76,7 +76,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) { Preconditions.checkNotNull(other); if (! TranslateEdgeValues.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java index 6ea56eb2c412a..2c67c5a016f30 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java @@ -22,7 +22,7 @@ import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph; import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -38,7 +38,7 @@ * @param edge value type */ public class TranslateGraphIds -extends GraphAlgorithmDelegatingGraph { +extends GraphAlgorithmWrappingGraph { // Required configuration private TranslateFunction translator; @@ -78,7 +78,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) { Preconditions.checkNotNull(other); if (! TranslateGraphIds.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java index 3a493248a4a7e..9e6784e09855a 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java @@ -21,7 +21,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph; import org.apache.flink.util.Preconditions; import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT; @@ -36,7 +36,7 @@ * @param edge value type */ public class TranslateVertexValues -extends GraphAlgorithmDelegatingGraph { +extends GraphAlgorithmWrappingGraph { // Required configuration private TranslateFunction translator; @@ -76,7 +76,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) { Preconditions.checkNotNull(other); if (! TranslateVertexValues.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java index 22c8b41d940da..9d323a8762613 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java @@ -31,8 +31,8 @@ import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; import org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result; import org.apache.flink.graph.utils.Murmur3_32; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.LongValue; import org.apache.flink.util.Collector; @@ -57,7 +57,7 @@ * @param edge value type */ public class LocalClusteringCoefficient & CopyableValue, VV, EV> -extends GraphAlgorithmDelegatingDataSet> { +extends GraphAlgorithmWrappingDataSet> { // Optional configuration private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(true, true); @@ -99,7 +99,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java index 14c731a2a34e6..7df288a44710e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java @@ -36,7 +36,7 @@ import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.ByteValue; import org.apache.flink.types.CopyableValue; @@ -62,7 +62,7 @@ * @param edge value type */ public class TriangleListing & CopyableValue, VV, EV> -extends GraphAlgorithmDelegatingDataSet> { +extends GraphAlgorithmWrappingDataSet> { // Optional configuration private OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, false); @@ -103,7 +103,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! TriangleListing.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java index 4b4bf07866ff7..293e3f90f9997 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java @@ -31,8 +31,8 @@ import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree; import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result; import org.apache.flink.graph.utils.Murmur3_32; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.LongValue; import org.apache.flink.util.Collector; @@ -57,7 +57,7 @@ * @param edge value type */ public class LocalClusteringCoefficient & CopyableValue, VV, EV> -extends GraphAlgorithmDelegatingDataSet> { +extends GraphAlgorithmWrappingDataSet> { // Optional configuration private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(true, true); @@ -100,7 +100,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java index 89b86fec166f0..c3dbf3eb8f1be 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java @@ -33,7 +33,7 @@ import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.graph.utils.proxy.OptionalBoolean; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.LongValue; @@ -63,7 +63,7 @@ * @param edge value type */ public class TriangleListing & CopyableValue, VV, EV> -extends GraphAlgorithmDelegatingDataSet> { +extends GraphAlgorithmWrappingDataSet> { // Optional configuration private OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, false); @@ -104,7 +104,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! TriangleListing.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java index 60e99bd80d439..7ba6feea95b44 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java @@ -40,7 +40,7 @@ import org.apache.flink.graph.Vertex; import org.apache.flink.graph.library.link_analysis.HITS.Result; import org.apache.flink.graph.utils.Murmur3_32; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.types.DoubleValue; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; @@ -64,7 +64,7 @@ * @param edge value type */ public class HITS -extends GraphAlgorithmDelegatingDataSet> { +extends GraphAlgorithmWrappingDataSet> { private static final String CHANGE_IN_SCORES = "change in scores"; @@ -135,7 +135,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! HITS.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java index 512a7a08f9459..1514866fb2a8a 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java @@ -37,7 +37,7 @@ import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree; import org.apache.flink.graph.library.similarity.AdamicAdar.Result; import org.apache.flink.graph.utils.Murmur3_32; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.FloatValue; import org.apache.flink.types.IntValue; @@ -71,7 +71,7 @@ * @param edge value type */ public class AdamicAdar, VV, EV> -extends GraphAlgorithmDelegatingDataSet> { +extends GraphAlgorithmWrappingDataSet> { private static final int GROUP_SIZE = 64; @@ -133,7 +133,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! AdamicAdar.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java index 7783e6b4f8388..1e406fabe3107 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java @@ -31,7 +31,7 @@ import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; import org.apache.flink.graph.library.similarity.JaccardIndex.Result; import org.apache.flink.graph.utils.Murmur3_32; -import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet; +import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.types.CopyableValue; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; @@ -61,7 +61,7 @@ * @param edge value type */ public class JaccardIndex, VV, EV> -extends GraphAlgorithmDelegatingDataSet> { +extends GraphAlgorithmWrappingDataSet> { public static final int DEFAULT_GROUP_SIZE = 64; @@ -159,7 +159,7 @@ protected String getAlgorithmName() { } @Override - protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) { + protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) { Preconditions.checkNotNull(other); if (! JaccardIndex.class.isAssignableFrom(other.getClass())) { diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java deleted file mode 100644 index 785c56113df98..0000000000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.utils.proxy; - -import javassist.util.proxy.MethodFilter; -import javassist.util.proxy.MethodHandler; -import javassist.util.proxy.ProxyFactory; -import javassist.util.proxy.ProxyObject; -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.operators.DataSink; -import org.apache.flink.api.java.operators.Operator; -import org.apache.flink.api.java.typeutils.InputTypeConfigurable; -import org.apache.flink.util.Preconditions; -import org.objenesis.ObjenesisStd; - -import java.lang.reflect.Method; - -/** - * Wraps an object with a proxy delegate whose method handler invokes all - * method calls on the wrapped object. This object can be later replaced. - * - * @param the type of the proxied object - */ -public class Delegate { - private X obj; - - private X proxy = null; - - /** - * Set the initial delegated object. - * - * @param obj delegated object - */ - public Delegate(X obj) { - setObject(obj); - } - - /** - * Change the delegated object. - * - * @param obj delegated object - */ - public void setObject(X obj) { - this.obj = (obj instanceof ReferentProxy) ? ((ReferentProxy) obj).getProxiedObject() : obj; - } - - /** - * Instantiates and returns a proxy object which subclasses the - * delegated object. The proxy's method handler invokes all methods - * on the delegated object that is set at the time of invocation. - * - * @return delegating proxy - */ - @SuppressWarnings("unchecked") - public X getProxy() { - if (proxy != null) { - return proxy; - } - - Class superclass = obj.getClass(); - while (! superclass.getSuperclass().equals(Operator.class)) { - superclass = superclass.getSuperclass(); - } - - ProxyFactory factory = new ProxyFactory(); - factory.setSuperclass(superclass); - factory.setInterfaces(new Class[]{ReferentProxy.class}); - - // create the class and instantiate an instance without calling a constructor - Class proxyClass = factory.createClass(new MethodFilter() { - @Override - public boolean isHandled(Method method) { - return true; - } - }); - proxy = new ObjenesisStd().newInstance(proxyClass); - - // create and set a handler to invoke all method calls on the delegated object - ((ProxyObject) proxy).setHandler(new MethodHandler() { - @Override - public Object invoke(Object self, Method thisMethod, Method proceed, Object[] args) throws Throwable { - if (thisMethod.getName().equals("getProxiedObject")) { - // this method is provided by the ReferentProxy interface - return obj; - } else { - Method objMethod; - try { - // lookup public method from class hierarchy - objMethod = obj.getClass().getMethod(thisMethod.getName(), thisMethod.getParameterTypes()); - } catch(NoSuchMethodException ignored) { - // lookup private method defined on class - objMethod = obj.getClass().getDeclaredMethod(thisMethod.getName(), thisMethod.getParameterTypes()); - } - - Class[] parameterTypes = thisMethod.getParameterTypes(); - - // reinterpret DataSet.output to add the delegate to the list of data sinks; otherwise, the proxy - // object will be added to the list of sinks and cannot be replaced - if (thisMethod.getName().equals("output") && parameterTypes.length == 1 && parameterTypes[0].equals(OutputFormat.class)) { - OutputFormat outputFormat = (OutputFormat)args[0]; - Preconditions.checkNotNull(outputFormat); - - ExecutionEnvironment context = ((DataSet)obj).getExecutionEnvironment(); - TypeInformation type = ((DataSet)obj).getType(); - - // configure the type if needed - if (outputFormat instanceof InputTypeConfigurable) { - ((InputTypeConfigurable) outputFormat).setInputType(type, context.getConfig() ); - } - - DataSink sink = new DataSink((DataSet)self, outputFormat, type); - - Method outputMethod = ExecutionEnvironment.class.getDeclaredMethod("registerDataSink", DataSink.class); - outputMethod.setAccessible(true); - outputMethod.invoke(context, sink); - return sink; - } else { - // method visibility may be restricted - objMethod.setAccessible(true); - return objMethod.invoke(obj, args); - } - } - } - }); - - return proxy; - } - - /** - * This interface provides access via the proxy handler to the original - * object being proxied. This is necessary since we cannot and should not - * create a proxy of a proxy but must instead proxy the original object. - * - * @param the type of the proxied object - */ - protected interface ReferentProxy { - Y getProxiedObject(); - } -} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java similarity index 63% rename from flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java rename to flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java index 1081bfccb909b..7a4a0e6a5b79f 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java @@ -21,7 +21,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.operators.SingleInputOperator; +import org.apache.flink.api.java.operators.NoOpOperator; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; @@ -33,27 +33,28 @@ /** * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of - * type {@code T}. A {@code GraphAlgorithmDelegatingDataSet} wraps the resultant - * {@link DataSet} with a delegating proxy object. The delegated object can be - * replaced when the same algorithm is run on the same input with a mergeable - * configuration. This allows algorithms to be composed of implicitly reusable - * algorithms without publicly sharing intermediate {@link DataSet}s. + * type {@code T}. A {@code GraphAlgorithmWrappingDataSet} wraps the resultant + * {@link DataSet} with a {@code NoOpOperator}. The input to the wrapped + * operator can be replaced when the same algorithm is run on the same input + * with a mergeable configuration. This allows algorithms to be composed of + * implicitly reusable algorithms without publicly sharing intermediate + * {@link DataSet}s. * * @param ID type * @param vertex value type * @param edge value type * @param output type */ -public abstract class GraphAlgorithmDelegatingDataSet +public abstract class GraphAlgorithmWrappingDataSet implements GraphAlgorithm> { // each algorithm and input pair may map to multiple configurations - private static Map> cache = - Collections.synchronizedMap(new HashMap>()); + private static Map> cache = + Collections.synchronizedMap(new HashMap>()); private Graph input; - private Delegate> delegate; + private NoOpOperator wrappingOperator; /** * Algorithms are identified by name rather than by class to allow subclassing. @@ -71,7 +72,7 @@ public abstract class GraphAlgorithmDelegatingDataSet * @return true if and only if configuration has been merged and the * algorithm's output can be reused */ - protected abstract boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other); + protected abstract boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other); /** * The implementation of the algorithm, renamed from {@link GraphAlgorithm#run(Graph)}. @@ -100,11 +101,11 @@ public final boolean equals(Object obj) { return true; } - if (! GraphAlgorithmDelegatingDataSet.class.isAssignableFrom(obj.getClass())) { + if (! GraphAlgorithmWrappingDataSet.class.isAssignableFrom(obj.getClass())) { return false; } - GraphAlgorithmDelegatingDataSet rhs = (GraphAlgorithmDelegatingDataSet) obj; + GraphAlgorithmWrappingDataSet rhs = (GraphAlgorithmWrappingDataSet) obj; return new EqualsBuilder() .append(input, rhs.input) @@ -119,25 +120,24 @@ public final DataSet run(Graph input) this.input = input; if (cache.containsKey(this)) { - for (GraphAlgorithmDelegatingDataSet other : cache.get(this)) { + for (GraphAlgorithmWrappingDataSet other : cache.get(this)) { if (mergeConfiguration(other)) { // configuration has been merged so generate new output - DataSet output = checkOutput(runInternal(input)); + DataSet output = runInternal(input); - // update delegatee object and reuse delegate - other.delegate.setObject(output); - delegate = other.delegate; + other.wrappingOperator.setInput(output); + wrappingOperator = other.wrappingOperator; - return delegate.getProxy(); + return wrappingOperator; } } } // no mergeable configuration found so generate new output - DataSet output = checkOutput(runInternal(input)); + DataSet output = runInternal(input); - // create a new delegate to wrap the algorithm output - delegate = new Delegate<>(output); + // create a new operator to wrap the algorithm output + wrappingOperator = new NoOpOperator<>(output, output.getType()); // cache this result if (cache.containsKey(this)) { @@ -146,24 +146,6 @@ public final DataSet run(Graph input) cache.put(this, new ArrayList(Collections.singletonList(this))); } - return delegate.getProxy(); - } - - /** - * If the given DataSet is not of type SingleInputOperator then a "no-op" - * map is appended which simply passes input to output. This guarantees - * that the delegating class type does not change. - * - * @param output user-defined function output - * @return a SingleInputOperator - */ - private DataSet checkOutput(DataSet output) { - if (output instanceof SingleInputOperator) { - return output; - } else { - return output - .map(new NoOp()) - .name("No-op"); - } + return wrappingOperator; } } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java similarity index 56% rename from flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java rename to flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java index 3209cb2d98899..69a6c37846d38 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingGraph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java @@ -21,7 +21,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.operators.SingleInputOperator; +import org.apache.flink.api.java.operators.NoOpOperator; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; @@ -35,11 +35,12 @@ /** * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of - * type {@code T}. A {@code GraphAlgorithmDelegatingGraph} wraps the resultant - * {@link Graph} with a delegating proxy object. The delegated object can be - * replaced when the same algorithm is run on the same input with a mergeable - * configuration. This allows algorithms to be composed of implicitly reusable - * algorithms without publicly sharing intermediate {@link DataSet}s. + * type {@code T}. A {@code GraphAlgorithmWrappingDataSet} wraps the resultant + * {@link Graph} vertex and edge sets with a {@code NoOpOperator}. The input to + * the wrapped operators can be replaced when the same algorithm is run on the + * same input with a mergeable configuration. This allows algorithms to be + * composed of implicitly reusable algorithms without publicly sharing + * intermediate {@link DataSet}s. * * @param input ID type * @param input vertex value type @@ -48,18 +49,18 @@ * @param output vertex value type * @param output edge value type */ -public abstract class GraphAlgorithmDelegatingGraph +public abstract class GraphAlgorithmWrappingGraph implements GraphAlgorithm> { // each algorithm and input pair may map to multiple configurations - private static Map> cache = - Collections.synchronizedMap(new HashMap>()); + private static Map> cache = + Collections.synchronizedMap(new HashMap>()); private Graph input; - private Delegate>> verticesDelegate; + private NoOpOperator> verticesWrappingOperator; - private Delegate>> edgesDelegate; + private NoOpOperator> edgesWrappingOperator; /** * Algorithms are identified by name rather than by class to allow subclassing. @@ -77,7 +78,7 @@ public abstract class GraphAlgorithmDelegatingGraph run(Graph input) this.input = input; if (cache.containsKey(this)) { - for (GraphAlgorithmDelegatingGraph other : cache.get(this)) { + for (GraphAlgorithmWrappingGraph other : cache.get(this)) { if (mergeConfiguration(other)) { // configuration has been merged so generate new output - Graph output = checkOutput(runInternal(input)); + Graph output = runInternal(input); - // update delegatee object and reuse delegate - other.verticesDelegate.setObject(output.getVertices()); - verticesDelegate = other.verticesDelegate; + other.verticesWrappingOperator.setInput(output.getVertices()); + other.edgesWrappingOperator.setInput(output.getEdges()); - other.edgesDelegate.setObject(output.getEdges()); - edgesDelegate = other.edgesDelegate; + verticesWrappingOperator = other.verticesWrappingOperator; + edgesWrappingOperator = other.edgesWrappingOperator; - return Graph.fromDataSet(verticesDelegate.getProxy(), edgesDelegate.getProxy(), output.getContext()); + return Graph.fromDataSet(verticesWrappingOperator, edgesWrappingOperator, output.getContext()); } } } // no mergeable configuration found so generate new output - Graph output = checkOutput(runInternal(input)); + Graph output = runInternal(input); - // create a new delegate to wrap the algorithm output - verticesDelegate = new Delegate<>(output.getVertices()); - edgesDelegate = new Delegate<>(output.getEdges()); + // create a new operator to wrap the algorithm output + verticesWrappingOperator = new NoOpOperator<>(output.getVertices(), output.getVertices().getType()); + edgesWrappingOperator = new NoOpOperator<>(output.getEdges(), output.getEdges().getType()); // cache this result if (cache.containsKey(this)) { @@ -156,32 +156,6 @@ public final Graph run(Graph input) cache.put(this, new ArrayList(Collections.singletonList(this))); } - return Graph.fromDataSet(verticesDelegate.getProxy(), edgesDelegate.getProxy(), output.getContext()); - } - - /** - * If the given Graph vertex and edge DataSets are not of type SingleInputOperator - * then a "no-op" map is appended which simply passes input to output. This - * guarantees that the delegating class type does not change. - * - * @param output user-defined function output - * @return a Graph with vertex and edge sets of type SingleInputOperator - */ - private Graph checkOutput(Graph output) { - DataSet> vertices = output.getVertices(); - if (!(vertices instanceof SingleInputOperator)) { - vertices = vertices - .map(new NoOp>()) - .name("No-op"); - } - - DataSet> edges = output.getEdges(); - if (!(edges instanceof SingleInputOperator)) { - edges = edges - .map(new NoOp>()) - .name("No-op"); - } - - return Graph.fromDataSet(vertices, edges, output.getContext()); + return Graph.fromDataSet(verticesWrappingOperator, edgesWrappingOperator, output.getContext()); } } diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/NoOp.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/NoOp.java deleted file mode 100644 index 8f76793667b91..0000000000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/NoOp.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.utils.proxy; - -import org.apache.flink.api.common.functions.MapFunction; - -/** - * A map which passes input to output. - * - * @param input and output type - */ -public class NoOp -implements MapFunction { - - @Override - public T map(T value) throws Exception { - return value; - } -}