From a49ce62761d99e2d232f9fffe9703a4b315f35de Mon Sep 17 00:00:00 2001 From: yucx Date: Thu, 1 Sep 2016 23:11:58 -0700 Subject: [PATCH] TINKERPOP-1389 Support Spark 2.0 --- giraph-gremlin/pom.xml | 2 +- hadoop-gremlin/pom.xml | 2 +- spark-gremlin/pom.xml | 13 +++++- .../spark/process/computer/SparkExecutor.java | 16 +++---- .../SparkStarBarrierInterceptor.java | 10 ++--- .../structure/io/gryo/GryoSerializer.java | 43 +++++++++++++++---- 6 files changed, 59 insertions(+), 27 deletions(-) diff --git a/giraph-gremlin/pom.xml b/giraph-gremlin/pom.xml index 2e83f899aa8..190078d8e47 100644 --- a/giraph-gremlin/pom.xml +++ b/giraph-gremlin/pom.xml @@ -127,7 +127,7 @@ limitations under the License. javax.servlet javax.servlet-api - 3.0.1 + 3.1.0 diff --git a/hadoop-gremlin/pom.xml b/hadoop-gremlin/pom.xml index 88133ebbdfc..2f9c1ca7a6d 100644 --- a/hadoop-gremlin/pom.xml +++ b/hadoop-gremlin/pom.xml @@ -128,7 +128,7 @@ limitations under the License. javax.servlet javax.servlet-api - 3.0.1 + 3.1.0 diff --git a/spark-gremlin/pom.xml b/spark-gremlin/pom.xml index fe8988c45c9..be8ac5eecf0 100644 --- a/spark-gremlin/pom.xml +++ b/spark-gremlin/pom.xml @@ -29,6 +29,11 @@ spark-gremlin Apache TinkerPop :: Spark Gremlin + + com.google.guava + guava + 14.0.1 + org.apache.tinkerpop gremlin-core @@ -54,6 +59,10 @@ javax.servlet servlet-api + + javax.servlet + javax.servlet-api + com.sun.jersey jersey-core @@ -104,7 +113,7 @@ org.apache.spark spark-core_2.10 - 1.6.1 + 2.0.0 @@ -210,7 +219,7 @@ com.fasterxml.jackson.core jackson-databind - 2.4.4 + 2.6.5 commons-lang diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java index 8dd23814141..8d32b36d4bc 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java @@ -18,7 +18,7 @@ */ package org.apache.tinkerpop.gremlin.spark.process.computer; -import com.google.common.base.Optional; +import org.apache.spark.api.java.Optional; import org.apache.commons.configuration.Configuration; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function2; @@ -65,7 +65,7 @@ private SparkExecutor() { public static JavaPairRDD applyGraphFilter(final JavaPairRDD graphRDD, final GraphFilter graphFilter) { return graphRDD.mapPartitionsToPair(partitionIterator -> { final GraphFilter gFilter = graphFilter.clone(); - return () -> IteratorUtils.filter(partitionIterator, tuple -> (tuple._2().get().applyGraphFilter(gFilter)).isPresent()); + return IteratorUtils.filter(partitionIterator, tuple -> (tuple._2().get().applyGraphFilter(gFilter)).isPresent()); }, true); } @@ -95,7 +95,7 @@ public static JavaPairRDD> executeVertexProgr final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array final SparkMessenger messenger = new SparkMessenger<>(); workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker - return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> { + return IteratorUtils.map(partitionIterator, vertexViewIncoming -> { final StarGraph.StarVertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages final List> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : memory.isInitialIteration() ? new ArrayList<>() : Collections.emptyList(); @@ -133,7 +133,7 @@ public static JavaPairRDD> executeVertexProgr ///////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////// final PairFlatMapFunction>, Object, Payload> messageFunction = - tuple -> () -> IteratorUtils.concat( + tuple -> IteratorUtils.concat( IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())), // emit the view payload IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))); final MessageCombiner messageCombiner = VertexProgram.>createVertexProgram(HadoopGraph.open(vertexProgramConfiguration), vertexProgramConfiguration).getMessageCombiner().orElse(null); @@ -172,7 +172,7 @@ else if (payload instanceof ViewPayload) // this happens if there is a vertex newViewIncomingRDD .foreachPartition(partitionIterator -> { KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration); - }); // need to complete a task so its BSP and the memory for this iteration is updated + }); // need to complete a task so its BSP and the memory for this iteration is updatedß return newViewIncomingRDD; } @@ -207,7 +207,7 @@ public static JavaPairRDD executeMap( final Configuration graphComputerConfiguration) { JavaPairRDD mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> { KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration); - return () -> new MapIterator<>(MapReduce.>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator); + return new MapIterator<>(MapReduce.>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator); }); if (mapReduce.getMapKeySort().isPresent()) mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get(), true, 1); @@ -218,7 +218,7 @@ public static JavaPairRDD executeCombine(final JavaPairRD final Configuration graphComputerConfiguration) { return mapRDD.mapPartitionsToPair(partitionIterator -> { KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration); - return () -> new CombineIterator<>(MapReduce.>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator); + return new CombineIterator<>(MapReduce.>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator); }); } @@ -227,7 +227,7 @@ public static JavaPairRDD executeReduce( final Configuration graphComputerConfiguration) { JavaPairRDD reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator -> { KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration); - return () -> new ReduceIterator<>(MapReduce.>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator); + return new ReduceIterator<>(MapReduce.>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator); }); if (mapReduce.getReduceKeySort().isPresent()) reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get(), true, 1); diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java index 8585e0dcdcb..dc22d47e401 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java @@ -85,13 +85,11 @@ public JavaPairRDD apply(final TraversalVertexProgram ve .filter(vertexWritable -> ElementHelper.idExists(vertexWritable.get().id(), graphStepIds)) // ensure vertex ids are in V(x) .flatMap(vertexWritable -> { if (identityTraversal) // g.V.count()-style (identity) - return () -> IteratorUtils.of(traversal.getTraverserGenerator().generate(vertexWritable.get(), (Step) graphStep, 1l)); + return IteratorUtils.of(traversal.getTraverserGenerator().generate(vertexWritable.get(), (Step) graphStep, 1l)); else { // add the vertex to head of the traversal - return () -> { // and iterate it for its results - final Traversal.Admin clone = traversal.clone(); // need a unique clone for each vertex to isolate the computation + final Traversal.Admin clone = traversal.clone(); // need a unique clone for each vertex to isolate the computation clone.getStartStep().addStart(clone.getTraverserGenerator().generate(vertexWritable.get(), graphStep, 1l)); return (Step) clone.getEndStep(); - }; } }); // USE SPARK DSL FOR THE RESPECTIVE END REDUCING BARRIER STEP OF THE TRAVERSAL @@ -133,14 +131,14 @@ else if (endStep instanceof FoldStep) { result = ((GroupStep) endStep).generateFinalResult(nextRDD. mapPartitions(partitions -> { final GroupStep clone = (GroupStep) endStep.clone(); - return () -> IteratorUtils.map(partitions, clone::projectTraverser); + return IteratorUtils.map(partitions, clone::projectTraverser); }).fold(((GroupStep) endStep).getSeedSupplier().get(), biOperator::apply)); } else if (endStep instanceof GroupCountStep) { final GroupCountStep.GroupCountBiOperator biOperator = GroupCountStep.GroupCountBiOperator.instance(); result = nextRDD .mapPartitions(partitions -> { final GroupCountStep clone = (GroupCountStep) endStep.clone(); - return () -> IteratorUtils.map(partitions, clone::projectTraverser); + return IteratorUtils.map(partitions, clone::projectTraverser); }) .fold(((GroupCountStep) endStep).getSeedSupplier().get(), biOperator::apply); } else diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java index 28a4d55c4b8..6735fe51981 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java @@ -24,7 +24,7 @@ import org.apache.commons.configuration.Configuration; import org.apache.spark.SparkConf; import org.apache.spark.api.python.PythonBroadcast; -import org.apache.spark.broadcast.HttpBroadcast; +import org.apache.spark.broadcast.TorrentBroadcast; import org.apache.spark.network.util.ByteUnit; import org.apache.spark.scheduler.CompressedMapStatus; import org.apache.spark.scheduler.HighlyCompressedMapStatus; @@ -48,24 +48,32 @@ import scala.collection.mutable.WrappedArray; import scala.runtime.BoxedUnit; +import java.io.Serializable; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class GryoSerializer extends Serializer { +public final class GryoSerializer extends Serializer implements Serializable { //private final Option userRegistrator; private final int bufferSize; private final int maxBufferSize; + private final int poolSize; + private final ArrayList ioRegList = new ArrayList<>(); + private final boolean referenceTracking; + private final boolean registrationRequired; - private final GryoPool gryoPool; + + private transient GryoPool gryoPool; public GryoSerializer(final SparkConf sparkConfiguration) { final long bufferSizeKb = sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k"); final long maxBufferSizeMb = sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m"); - final boolean referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true); - final boolean registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false); + referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true); + registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false); if (bufferSizeKb >= ByteUnit.GiB.toKiB(2L)) { throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than 2048 mb, got: " + bufferSizeKb + " mb."); } else { @@ -77,9 +85,19 @@ public GryoSerializer(final SparkConf sparkConfiguration) { //this.userRegistrator = sparkConfiguration.getOption("spark.kryo.registrator"); } } - this.gryoPool = GryoPool.build(). - poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT)). - ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())). + poolSize = sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT); + List list = makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList()); + list.forEach(c -> { + ioRegList.add(c.toString()); + } + ); + } + + private GryoPool createPool(){ + List list = new ArrayList<>(ioRegList); + return GryoPool.build(). + poolSize(poolSize). + ioRegistries(list). initializeMapper(builder -> { try { builder.addCustom(Tuple2.class, new Tuple2Serializer()) @@ -91,7 +109,7 @@ public GryoSerializer(final SparkConf sparkConfiguration) { .addCustom(CompressedMapStatus.class) .addCustom(BlockManagerId.class) .addCustom(HighlyCompressedMapStatus.class, new ExternalizableSerializer()) // externalizable implemented so its okay - .addCustom(HttpBroadcast.class) + .addCustom(TorrentBroadcast.class) .addCustom(PythonBroadcast.class) .addCustom(BoxedUnit.class) .addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer()) @@ -118,6 +136,13 @@ public Output newOutput() { } public GryoPool getGryoPool() { + if (gryoPool == null) { + synchronized (this) { + if (gryoPool == null) { + gryoPool = createPool(); + } + } + } return this.gryoPool; }