From 36e1159a80f539b8bd4a884e5c1cf304ec52c4f9 Mon Sep 17 00:00:00 2001 From: Dan LaRocque Date: Tue, 25 Oct 2016 19:37:17 -0500 Subject: [PATCH] Avoid starting VP worker iterations that never end SparkExecutor.executeVertexProgramIteration was written in such a way that an empty RDD partition would cause it to invoke VertexProgram.workerIterationStart without ever invoking VertexProgram.workerIterationEnd. This seems like a contract violation. I have at least one VP that relies on workerIterationStart|End to allocate and release resources. Failing to invoke End like this causes a leak in that VP, as it would for any VP that uses that resource management pattern. --- .../gremlin/spark/process/computer/SparkExecutor.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java index 8dd23814141..6e65e2608c7 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java @@ -91,9 +91,15 @@ public static JavaPairRDD> executeVertexProgr // for each partition of vertices emit a view and their outgoing messages .mapPartitionsToPair(partitionIterator -> { KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration); + + // if the partition is empty, return without starting a new VP iteration + if (!partitionIterator.hasNext()) + return Collections.emptyList(); + final VertexProgram workerVertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(graphComputerConfiguration), vertexProgramConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task) final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array final SparkMessenger messenger = new SparkMessenger<>(); + workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> { final StarGraph.StarVertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable