From e954408b6f30f33d17316551c6d014cd63ab83c2 Mon Sep 17 00:00:00 2001 From: "Marko A. Rodriguez" Date: Tue, 1 Dec 2015 09:25:14 -0700 Subject: [PATCH 1/9] HadoopElementIterator now supports ANY InputFormat, not just FileInputFormat. Sweet. Also, if you are using an RDD in Spark (and thus, not really doing Hadoop InputFormat stuffs), we have InputRDDFormat which wraps an RDD in an InputFormat so HadoopElementIterator works as well. This solves the HadoopGraph OLTP problem for ALL InputFormats and it allows ComputerResultStep to Attach elements for more than just FileInputFormats. Good stuff. --- .../structure/hdfs/HadoopEdgeIterator.java | 4 - .../structure/hdfs/HadoopElementIterator.java | 35 +++-- .../structure/hdfs/HadoopVertexIterator.java | 4 - .../HadoopGraphStructureStandardTest.java | 2 +- .../spark/structure/io/InputRDDFormat.java | 136 ++++++++++++++++++ .../spark/structure/io/InputRDDTest.java | 21 +++ 6 files changed, 174 insertions(+), 28 deletions(-) rename hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/{io => }/HadoopGraphStructureStandardTest.java (95%) create mode 100644 spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopEdgeIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopEdgeIterator.java index acdc0a4562c..59a4d2cf4a1 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopEdgeIterator.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopEdgeIterator.java @@ -39,10 +39,6 @@ public final class HadoopEdgeIterator extends HadoopElementIterator { private Iterator edgeIterator = Collections.emptyIterator(); - public HadoopEdgeIterator(final HadoopGraph graph, final InputFormat inputFormat, final Path path) throws IOException, InterruptedException { - super(graph, inputFormat, path); - } - public HadoopEdgeIterator(final HadoopGraph graph) throws IOException { super(graph); } diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java index a9f287bb678..f9ffea2137c 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java @@ -19,13 +19,14 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.hdfs; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; @@ -36,35 +37,31 @@ import java.io.IOException; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import java.util.Queue; +import java.util.UUID; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ public abstract class HadoopElementIterator implements Iterator { - // TODO: Generalize so it works for more than just FileFormats. - protected final HadoopGraph graph; protected final Queue> readers = new LinkedList<>(); - public HadoopElementIterator(final HadoopGraph graph, final InputFormat inputFormat, final Path path) throws IOException, InterruptedException { - this.graph = graph; - final Configuration configuration = ConfUtil.makeHadoopConfiguration(this.graph.configuration()); - for (final Path path2 : HDFSTools.getAllFilePaths(FileSystem.get(configuration), path, HiddenFileFilter.instance())) { - this.readers.add(inputFormat.createRecordReader(new FileSplit(path2, 0, Long.MAX_VALUE, new String[]{}), new TaskAttemptContextImpl(configuration, new TaskAttemptID()))); - } - } - public HadoopElementIterator(final HadoopGraph graph) throws IOException { try { this.graph = graph; - if (this.graph.configuration().containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION)) { - final Configuration configuration = ConfUtil.makeHadoopConfiguration(this.graph.configuration()); - final InputFormat inputFormat = this.graph.configuration().getGraphInputFormat().getConstructor().newInstance(); - for (final Path path : HDFSTools.getAllFilePaths(FileSystem.get(configuration), new Path(graph.configuration().getInputLocation()), HiddenFileFilter.instance())) { - this.readers.add(inputFormat.createRecordReader(new FileSplit(path, 0, Long.MAX_VALUE, new String[]{}), new TaskAttemptContextImpl(configuration, new TaskAttemptID()))); - } + final Configuration configuration = ConfUtil.makeHadoopConfiguration(this.graph.configuration()); + final InputFormat inputFormat = this.graph.configuration().getGraphInputFormat().getConstructor().newInstance(); + if (inputFormat instanceof FileInputFormat) { + if (!this.graph.configuration().containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION)) + return; // there is not input location and thus, no data (empty graph) + configuration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, this.graph.configuration().getInputLocation()); + } + final List splits = inputFormat.getSplits(new JobContextImpl(configuration, new JobID(UUID.randomUUID().toString(), 1))); + for (final InputSplit split : splits) { + this.readers.add(inputFormat.createRecordReader(split, new TaskAttemptContextImpl(configuration, new TaskAttemptID()))); } } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopVertexIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopVertexIterator.java index 8977692d3bc..8f13c592795 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopVertexIterator.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopVertexIterator.java @@ -36,10 +36,6 @@ public final class HadoopVertexIterator extends HadoopElementIterator { private HadoopVertex nextVertex = null; - public HadoopVertexIterator(final HadoopGraph graph, final InputFormat inputFormat, final Path path) throws IOException, InterruptedException { - super(graph, inputFormat, path); - } - public HadoopVertexIterator(final HadoopGraph graph) throws IOException { super(graph); } diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopGraphStructureStandardTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraphStructureStandardTest.java similarity index 95% rename from hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopGraphStructureStandardTest.java rename to hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraphStructureStandardTest.java index ccb2d64ee87..459c900ed6a 100644 --- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopGraphStructureStandardTest.java +++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraphStructureStandardTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.gremlin.hadoop.structure.io; +package org.apache.tinkerpop.gremlin.hadoop.structure; import org.apache.tinkerpop.gremlin.GraphProviderClass; import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider; diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java new file mode 100644 index 00000000000..3952c661631 --- /dev/null +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.tinkerpop.gremlin.hadoop.Constants; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; +import scala.Tuple2; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class InputRDDFormat extends InputFormat { + + public InputRDDFormat() { + + } + + @Override + public List getSplits(final JobContext jobContext) throws IOException, InterruptedException { + return Collections.singletonList(new InputSplit() { + @Override + public long getLength() throws IOException, InterruptedException { + return 0; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[0]; + } + }); + } + + @Override + public RecordReader createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + try { + final org.apache.hadoop.conf.Configuration hadoopConfiguration = taskAttemptContext.getConfiguration(); + final SparkConf sparkConfiguration = new SparkConf(); + sparkConfiguration.setAppName(UUID.randomUUID().toString()); + hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue())); + InputRDD inputRDD = (InputRDD) Class.forName(sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD)).newInstance(); + JavaSparkContext javaSparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration)); + final Iterator> iterator = inputRDD.readGraphRDD(ConfUtil.makeApacheConfiguration(taskAttemptContext.getConfiguration()), javaSparkContext).toLocalIterator(); + return new RecordReader() { + @Override + public void initialize(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return iterator.hasNext(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return NullWritable.get(); + } + + @Override + public VertexWritable getCurrentValue() throws IOException, InterruptedException { + return iterator.next()._2(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 1.0f; // TODO: make this dynamic (how? its an iterator.) + } + + @Override + public void close() throws IOException { + if (!hadoopConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false)) + javaSparkContext.close(); + } + }; + } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new IOException(e.getMessage(), e); + } + + } + + /*private static class PartitionInputSplit extends InputSplit { + + private final Partition partition; + + public PartitionInputSplit(final Partition partition) { + this.partition = partition; + } + + @Override + public long getLength() throws IOException, InterruptedException { + return 0; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[0]; + } + + public Partition getPartition() { + return this.partition; + } + }*/ +} diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java index 5ba3b12d6af..98a2b9fc81c 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java @@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat; +import org.apache.tinkerpop.gremlin.process.traversal.P; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; import org.apache.tinkerpop.gremlin.structure.Graph; @@ -52,4 +53,24 @@ public void shouldReadFromArbitraryRDD() { assertEquals(123l, graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().values("age").sum().next()); assertEquals(Long.valueOf(4l), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next()); } + + @Test + public void shouldSupportHadoopGraphOLTP() { + final Configuration configuration = new BaseConfiguration(); + configuration.setProperty("spark.master", "local[4]"); + configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName()); + configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); + configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output"); + configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false); + //////// + Graph graph = GraphFactory.open(configuration); + GraphTraversalSource g = graph.traversal(); // OLTP; + assertEquals("person", g.V().has("age", 29).next().label()); + assertEquals(Long.valueOf(4), g.V().count().next()); + assertEquals(Long.valueOf(0), g.E().count().next()); + assertEquals(Long.valueOf(2), g.V().has("age", P.gt(30)).count().next()); + } } From c374f4ecb1c2c82f911b5fb0a19a66ed663eea60 Mon Sep 17 00:00:00 2001 From: "Marko A. Rodriguez" Date: Wed, 2 Dec 2015 12:02:09 -0700 Subject: [PATCH 2/9] I have the SparkIntegrationTestSuite now testing either from Gryo FileInputFormat, GraphSON FileInputFormat, or an InputRDD. This gives us super coverage and proves that InputRDD (bypassing Hadoop) is working as expected. I also fixed up some other tests that used KryoSerializer instead of GryoSerializer as I learned how to deal with Scalas WrappedArray class. It was insane. This is really good stuff. --- .../structure/hdfs/HadoopElementIterator.java | 6 ++- .../computer/payload/MessagePayload.java | 5 +- .../computer/payload/ViewIncomingPayload.java | 6 +-- .../computer/payload/ViewOutgoingPayload.java | 12 +++-- .../structure/io/gryo/GryoSerializer.java | 6 +++ .../io/gryo/WrappedArraySerializer.java | 46 +++++++++++++++++ .../computer/SparkHadoopGraphProvider.java | 40 +++++++++++++-- .../spark/structure/io/ClassicInputRDD.java | 39 +++++++++++++++ .../spark/structure/io/GratefulInputRDD.java | 50 +++++++++++++++++++ .../structure/io/InputOutputRDDTest.java | 3 +- .../spark/structure/io/InputRDDTest.java | 5 +- .../spark/structure/io/ModernInputRDD.java | 41 +++++++++++++++ .../spark/structure/io/OutputRDDTest.java | 3 +- .../spark/structure/io/TheCrewInputRDD.java | 39 +++++++++++++++ 14 files changed, 285 insertions(+), 16 deletions(-) create mode 100644 spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java create mode 100644 spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java create mode 100644 spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java create mode 100644 spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java create mode 100644 spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java index f9ffea2137c..45f3c5554fe 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java @@ -19,6 +19,8 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.hdfs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -56,7 +58,9 @@ public HadoopElementIterator(final HadoopGraph graph) throws IOException { final InputFormat inputFormat = this.graph.configuration().getGraphInputFormat().getConstructor().newInstance(); if (inputFormat instanceof FileInputFormat) { if (!this.graph.configuration().containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION)) - return; // there is not input location and thus, no data (empty graph) + return; // there is no input location and thus, no data (empty graph) + if (!FileSystem.get(configuration).exists(new Path(this.graph.configuration().getInputLocation()))) + return; // there is no data at the input location (empty graph) configuration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, this.graph.configuration().getInputLocation()); } final List splits = inputFormat.getSplits(new JobContextImpl(configuration, new JobID(UUID.randomUUID().toString(), 1))); diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/MessagePayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/MessagePayload.java index 09e25993e95..f32ec446e85 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/MessagePayload.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/MessagePayload.java @@ -23,7 +23,10 @@ */ public final class MessagePayload implements Payload { - private final M message; + private M message; + + private MessagePayload() { + } public MessagePayload(final M message) { this.message = message; diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java index 911fc7bd8ac..a2c92057ea6 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewIncomingPayload.java @@ -31,11 +31,11 @@ public final class ViewIncomingPayload implements Payload { private List> view = null; - private final List incomingMessages; + private List incomingMessages; - public ViewIncomingPayload() { - this.incomingMessages = null; + private ViewIncomingPayload() { + } public ViewIncomingPayload(final MessageCombiner messageCombiner) { diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java index fc4aeed4f18..20c8e09900b 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/payload/ViewOutgoingPayload.java @@ -28,10 +28,14 @@ */ public final class ViewOutgoingPayload implements Payload { - private final List> view; - private final List> outgoingMessages; + private List> view; + private List> outgoingMessages; - public ViewOutgoingPayload(final List> view, final List> outgoingMessages) { + private ViewOutgoingPayload() { + + } + + public ViewOutgoingPayload(final List> view, final List> outgoingMessages) { this.view = view; this.outgoingMessages = outgoingMessages; } @@ -40,7 +44,7 @@ public ViewPayload getView() { return new ViewPayload(this.view); } - public List> getOutgoingMessages() { + public List> getOutgoingMessages() { return this.outgoingMessages; } } diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java index 720289273a2..29fde9faf24 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java @@ -41,6 +41,8 @@ import org.apache.tinkerpop.shaded.kryo.io.Output; import org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer; import scala.Tuple2; +import scala.Tuple3; +import scala.collection.mutable.WrappedArray; import scala.runtime.BoxedUnit; import java.util.Collections; @@ -79,11 +81,15 @@ public GryoSerializer(final SparkConf sparkConfiguration) { try { builder.addCustom(SerializableWritable.class, new JavaSerializer()) .addCustom(Tuple2.class, new JavaSerializer()) + .addCustom(Tuple2[].class, new JavaSerializer()) + .addCustom(Tuple3.class, new JavaSerializer()) + .addCustom(Tuple3[].class, new JavaSerializer()) .addCustom(CompressedMapStatus.class, new JavaSerializer()) .addCustom(HttpBroadcast.class, new JavaSerializer()) .addCustom(PythonBroadcast.class, new JavaSerializer()) .addCustom(BoxedUnit.class, new JavaSerializer()) .addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer()) + .addCustom(WrappedArray.ofRef.class, new WrappedArraySerializer()) .addCustom(MessagePayload.class) .addCustom(ViewIncomingPayload.class) .addCustom(ViewOutgoingPayload.class) diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java new file mode 100644 index 00000000000..0e9f03fd2a5 --- /dev/null +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io.gryo; + +import org.apache.tinkerpop.shaded.kryo.Kryo; +import org.apache.tinkerpop.shaded.kryo.Serializer; +import org.apache.tinkerpop.shaded.kryo.io.Input; +import org.apache.tinkerpop.shaded.kryo.io.Output; +import scala.collection.JavaConversions; +import scala.collection.mutable.WrappedArray; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class WrappedArraySerializer extends Serializer> { + + @Override + public void write(final Kryo kryo, final Output output, final WrappedArray iterable) { + kryo.writeClassAndObject(output,new ArrayList<>(JavaConversions.asJavaList(iterable))); + } + + @Override + public WrappedArray read(final Kryo kryo, final Input input, final Class> aClass) { + return new WrappedArray.ofRef<>((T[]) ((List) kryo.readClassAndObject(input)).toArray()); + } +} diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java index c81ea92185c..232817645f5 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java @@ -20,12 +20,19 @@ import org.apache.tinkerpop.gremlin.GraphProvider; import org.apache.tinkerpop.gremlin.LoadGraphWith; +import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine; +import org.apache.tinkerpop.gremlin.spark.structure.io.ClassicInputRDD; +import org.apache.tinkerpop.gremlin.spark.structure.io.GratefulInputRDD; +import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDDFormat; +import org.apache.tinkerpop.gremlin.spark.structure.io.ModernInputRDD; +import org.apache.tinkerpop.gremlin.spark.structure.io.TheCrewInputRDD; import org.apache.tinkerpop.gremlin.structure.Graph; import java.util.Map; +import java.util.Random; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -33,13 +40,41 @@ @GraphProvider.Descriptor(computer = SparkGraphComputer.class) public final class SparkHadoopGraphProvider extends HadoopGraphProvider { + private static final Random RANDOM = new Random(); + @Override public Map getBaseConfiguration(final String graphName, final Class test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) { final Map config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith); - config.put("mapreduce.job.reduces", 4); + if (null != loadGraphWith) { + if (loadGraphWith.equals(LoadGraphWith.GraphData.MODERN)) { + if (RANDOM.nextBoolean()) { + config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); + config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ModernInputRDD.class.getCanonicalName()); + config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); + } + } else if (loadGraphWith.equals(LoadGraphWith.GraphData.CREW)) { + if (RANDOM.nextBoolean()) { + config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); + config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, TheCrewInputRDD.class.getCanonicalName()); + config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); + } + } else if (loadGraphWith.equals(LoadGraphWith.GraphData.CLASSIC)) { + if (RANDOM.nextBoolean()) { + config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); + config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ClassicInputRDD.class.getCanonicalName()); + config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); + } + } else if (loadGraphWith.equals(LoadGraphWith.GraphData.GRATEFUL)) { + if (RANDOM.nextBoolean()) { + config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); + config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, GratefulInputRDD.class.getCanonicalName()); + config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); + } + } + } /// spark configuration config.put("spark.master", "local[4]"); - // put("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + //config.put("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); config.put("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer"); config.put("spark.kryo.registrationRequired", true); return config; @@ -49,5 +84,4 @@ public Map getBaseConfiguration(final String graphName, final Cl public GraphTraversalSource traversal(final Graph graph) { return GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)).create(graph); } - } \ No newline at end of file diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java new file mode 100644 index 00000000000..4512b616e36 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io; + +import org.apache.commons.configuration.Configuration; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import scala.Tuple2; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class ClassicInputRDD implements InputRDD { + + @Override + public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { + return sparkContext.parallelize(IteratorUtils.list(IteratorUtils.map(TinkerFactory.createClassic().vertices(), VertexWritable::new))).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); + } +} diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java new file mode 100644 index 00000000000..396aa758028 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io; + +import org.apache.commons.configuration.Configuration; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoResourceAccess; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import scala.Tuple2; + +import java.io.IOException; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class GratefulInputRDD implements InputRDD { + + @Override + public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { + try { + final Graph graph = TinkerGraph.open(); + graph.io(GryoIo.build()).readGraph(GryoResourceAccess.class.getResource("grateful-dead.kryo").getFile()); + return sparkContext.parallelize(IteratorUtils.list(IteratorUtils.map(graph.vertices(), VertexWritable::new))).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); + } catch (final IOException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } +} diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java index 3691aba0245..ea621141ef5 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java @@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; import org.junit.Test; @@ -41,7 +42,7 @@ public class InputOutputRDDTest { public void shouldReadFromWriteToArbitraryRDD() throws Exception { final Configuration configuration = new BaseConfiguration(); configuration.setProperty("spark.master", "local[4]"); - configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName()); + configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName()); configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName()); configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName()); diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java index 98a2b9fc81c..2cbfd66ec70 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java @@ -27,6 +27,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.P; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; import org.junit.Test; @@ -42,7 +43,7 @@ public class InputRDDTest { public void shouldReadFromArbitraryRDD() { final Configuration configuration = new BaseConfiguration(); configuration.setProperty("spark.master", "local[4]"); - configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName()); + configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName()); configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName()); configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName()); @@ -58,7 +59,7 @@ public void shouldReadFromArbitraryRDD() { public void shouldSupportHadoopGraphOLTP() { final Configuration configuration = new BaseConfiguration(); configuration.setProperty("spark.master", "local[4]"); - configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName()); + configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName()); configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName()); configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java new file mode 100644 index 00000000000..849e3e655d1 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io; + +import org.apache.commons.configuration.Configuration; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import scala.Tuple2; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class ModernInputRDD implements InputRDD { + + @Override + public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { + return sparkContext. + parallelize(IteratorUtils.list(IteratorUtils.map(TinkerFactory.createModern().vertices(), VertexWritable::new))). + mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); + } +} diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java index 10eecb3c6dc..f9b6f393bb6 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java @@ -30,6 +30,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; import org.junit.Test; @@ -43,7 +44,7 @@ public class OutputRDDTest { public void shouldWriteToArbitraryRDD() throws Exception { final Configuration configuration = new BaseConfiguration(); configuration.setProperty("spark.master", "local[4]"); - configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName()); + configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName()); configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName()); diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java new file mode 100644 index 00000000000..ff5a274df89 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io; + +import org.apache.commons.configuration.Configuration; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import scala.Tuple2; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class TheCrewInputRDD implements InputRDD { + + @Override + public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { + return sparkContext.parallelize(IteratorUtils.list(IteratorUtils.map(TinkerFactory.createTheCrew().vertices(), VertexWritable::new))).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); + } +} From e20ff91995f83efd4ebbe9388b0aec1535669ecb Mon Sep 17 00:00:00 2001 From: "Marko A. Rodriguez" Date: Wed, 2 Dec 2015 13:11:29 -0700 Subject: [PATCH 3/9] some organization and clean up. Stuff is lookin SOLID. Time to run full integration tests. --- .../process/computer/SparkContextHelper.java} | 20 +++--- .../process/computer/SparkGraphComputer.java | 15 ++-- .../spark/structure/io/InputRDDFormat.java | 8 +-- .../process/computer/LocalPropertyTest.java | 4 +- .../computer/SparkHadoopGraphProvider.java | 38 +++------- .../spark/structure/io/GratefulInputRDD.java | 50 -------------- .../structure/io/InputOutputRDDTest.java | 1 - .../spark/structure/io/InputRDDTest.java | 1 - .../spark/structure/io/ModernInputRDD.java | 41 ----------- .../spark/structure/io/OutputRDDTest.java | 1 - .../io/PersistedInputOutputRDDTest.java | 10 +-- .../spark/structure/io/TheCrewInputRDD.java | 39 ----------- .../spark/structure/io/ToyGraphInputRDD.java | 69 +++++++++++++++++++ 13 files changed, 105 insertions(+), 192 deletions(-) rename spark-gremlin/src/{test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java => main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkContextHelper.java} (57%) delete mode 100644 spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java delete mode 100644 spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java delete mode 100644 spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java create mode 100644 spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkContextHelper.java similarity index 57% rename from spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java rename to spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkContextHelper.java index 4512b616e36..ed553c51d68 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ClassicInputRDD.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkContextHelper.java @@ -17,23 +17,23 @@ * under the License. */ -package org.apache.tinkerpop.gremlin.spark.structure.io; +package org.apache.tinkerpop.gremlin.spark.process.computer; import org.apache.commons.configuration.Configuration; -import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; -import scala.Tuple2; +import org.apache.tinkerpop.gremlin.hadoop.Constants; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class ClassicInputRDD implements InputRDD { +public final class SparkContextHelper { - @Override - public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { - return sparkContext.parallelize(IteratorUtils.list(IteratorUtils.map(TinkerFactory.createClassic().vertices(), VertexWritable::new))).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); + private SparkContextHelper() { + + } + + public static void tryToCloseContext(final JavaSparkContext context, final Configuration configuration) { + if (context != null && !configuration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false)) + context.close(); } } diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java index e7566d5def2..c20f1b08227 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java @@ -228,8 +228,7 @@ private Future submitWithExecutor(Executor exec) { finalMemory.setRuntime(System.currentTimeMillis() - startTime); return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable()); } finally { - if (sparkContext != null && !apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false)) - sparkContext.stop(); + SparkContextHelper.tryToCloseContext(sparkContext, apacheConfiguration); } }, exec); } @@ -268,14 +267,14 @@ private void updateLocalConfiguration(final JavaSparkContext sparkContext, final * Execution rather than applying the entire configuration. */ final String[] validPropertyNames = { - "spark.job.description", - "spark.jobGroup.id", - "spark.job.interruptOnCancel", - "spark.scheduler.pool" + "spark.job.description", + "spark.jobGroup.id", + "spark.job.interruptOnCancel", + "spark.scheduler.pool" }; - for (String propertyName: validPropertyNames){ - if (sparkConfiguration.contains(propertyName)){ + for (String propertyName : validPropertyNames) { + if (sparkConfiguration.contains(propertyName)) { String propertyValue = sparkConfiguration.get(propertyName); this.logger.info("Setting Thread Local SparkContext Property - " + propertyName + " : " + propertyValue); diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java index 3952c661631..d1a198e2aed 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java @@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; +import org.apache.tinkerpop.gremlin.spark.process.computer.SparkContextHelper; import scala.Tuple2; import java.io.IOException; @@ -70,8 +71,8 @@ public RecordReader createRecordReader(final Input final SparkConf sparkConfiguration = new SparkConf(); sparkConfiguration.setAppName(UUID.randomUUID().toString()); hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue())); - InputRDD inputRDD = (InputRDD) Class.forName(sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD)).newInstance(); - JavaSparkContext javaSparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration)); + final InputRDD inputRDD = (InputRDD) Class.forName(sparkConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD)).newInstance(); + final JavaSparkContext javaSparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration)); final Iterator> iterator = inputRDD.readGraphRDD(ConfUtil.makeApacheConfiguration(taskAttemptContext.getConfiguration()), javaSparkContext).toLocalIterator(); return new RecordReader() { @Override @@ -101,8 +102,7 @@ public float getProgress() throws IOException, InterruptedException { @Override public void close() throws IOException { - if (!hadoopConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false)) - javaSparkContext.close(); + SparkContextHelper.tryToCloseContext(javaSparkContext, ConfUtil.makeApacheConfiguration(hadoopConfiguration)); } }; } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) { diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/LocalPropertyTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/LocalPropertyTest.java index e0fe796a2f6..671bee8904b 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/LocalPropertyTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/LocalPropertyTest.java @@ -25,7 +25,6 @@ import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkStatusTracker; -import org.apache.spark.serializer.KryoSerializer; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat; @@ -36,6 +35,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine; import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD; import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; import org.junit.Test; @@ -53,7 +53,7 @@ public void shouldSetThreadLocalProperties() throws Exception { final String rddName = "target/test-output/" + UUID.randomUUID(); final Configuration configuration = new BaseConfiguration(); configuration.setProperty("spark.master", "local[4]"); - configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName()); + configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName()); configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName()); diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java index 232817645f5..2cfeea379bd 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java @@ -24,11 +24,9 @@ import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine; -import org.apache.tinkerpop.gremlin.spark.structure.io.ClassicInputRDD; -import org.apache.tinkerpop.gremlin.spark.structure.io.GratefulInputRDD; import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDDFormat; -import org.apache.tinkerpop.gremlin.spark.structure.io.ModernInputRDD; -import org.apache.tinkerpop.gremlin.spark.structure.io.TheCrewInputRDD; +import org.apache.tinkerpop.gremlin.spark.structure.io.ToyGraphInputRDD; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; import org.apache.tinkerpop.gremlin.structure.Graph; import java.util.Map; @@ -46,36 +44,16 @@ public final class SparkHadoopGraphProvider extends HadoopGraphProvider { public Map getBaseConfiguration(final String graphName, final Class test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) { final Map config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith); if (null != loadGraphWith) { - if (loadGraphWith.equals(LoadGraphWith.GraphData.MODERN)) { - if (RANDOM.nextBoolean()) { - config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); - config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ModernInputRDD.class.getCanonicalName()); - config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); - } - } else if (loadGraphWith.equals(LoadGraphWith.GraphData.CREW)) { - if (RANDOM.nextBoolean()) { - config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); - config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, TheCrewInputRDD.class.getCanonicalName()); - config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); - } - } else if (loadGraphWith.equals(LoadGraphWith.GraphData.CLASSIC)) { - if (RANDOM.nextBoolean()) { - config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); - config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ClassicInputRDD.class.getCanonicalName()); - config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); - } - } else if (loadGraphWith.equals(LoadGraphWith.GraphData.GRATEFUL)) { - if (RANDOM.nextBoolean()) { - config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); - config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, GratefulInputRDD.class.getCanonicalName()); - config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); - } + if (RANDOM.nextBoolean()) { + config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); + config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ToyGraphInputRDD.class.getCanonicalName()); + config.put(ToyGraphInputRDD.GREMLIN_SPARK_TOY_GRAPH, loadGraphWith.toString()); + config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); } } /// spark configuration config.put("spark.master", "local[4]"); - //config.put("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - config.put("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer"); + config.put("spark.serializer", GryoSerializer.class.getCanonicalName()); config.put("spark.kryo.registrationRequired", true); return config; } diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java deleted file mode 100644 index 396aa758028..00000000000 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/GratefulInputRDD.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.spark.structure.io; - -import org.apache.commons.configuration.Configuration; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; -import org.apache.tinkerpop.gremlin.structure.Graph; -import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo; -import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoResourceAccess; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; -import scala.Tuple2; - -import java.io.IOException; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class GratefulInputRDD implements InputRDD { - - @Override - public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { - try { - final Graph graph = TinkerGraph.open(); - graph.io(GryoIo.build()).readGraph(GryoResourceAccess.class.getResource("grateful-dead.kryo").getFile()); - return sparkContext.parallelize(IteratorUtils.list(IteratorUtils.map(graph.vertices(), VertexWritable::new))).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); - } catch (final IOException e) { - throw new IllegalStateException(e.getMessage(), e); - } - } -} diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java index ea621141ef5..50a43bc84e1 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java @@ -20,7 +20,6 @@ import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; -import org.apache.spark.serializer.KryoSerializer; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java index 2cbfd66ec70..b64139c72d1 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java @@ -20,7 +20,6 @@ import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; -import org.apache.spark.serializer.KryoSerializer; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat; diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java deleted file mode 100644 index 849e3e655d1..00000000000 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ModernInputRDD.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.spark.structure.io; - -import org.apache.commons.configuration.Configuration; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; -import scala.Tuple2; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class ModernInputRDD implements InputRDD { - - @Override - public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { - return sparkContext. - parallelize(IteratorUtils.list(IteratorUtils.map(TinkerFactory.createModern().vertices(), VertexWritable::new))). - mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); - } -} diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java index f9b6f393bb6..60790e73155 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java @@ -20,7 +20,6 @@ import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; -import org.apache.spark.serializer.KryoSerializer; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat; diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java index 6aeb864da50..1de2b478420 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java @@ -24,7 +24,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.serializer.KryoSerializer; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat; @@ -37,6 +36,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.io.IoCore; import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; @@ -58,7 +58,7 @@ public void shouldNotPersistRDDAcrossJobs() throws Exception { final String rddName = "target/test-output/" + UUID.randomUUID(); final Configuration configuration = new BaseConfiguration(); configuration.setProperty("spark.master", "local[4]"); - configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName()); + configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName()); configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName()); @@ -87,7 +87,7 @@ public void shouldPersistRDDAcrossJobs() throws Exception { final String rddName = "target/test-output/" + UUID.randomUUID(); final Configuration configuration = new BaseConfiguration(); configuration.setProperty("spark.master", "local[4]"); - configuration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName()); + configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName()); configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName()); @@ -129,7 +129,7 @@ public void testBulkLoaderVertexProgramChain() throws Exception { final String rddName = "target/test-output/" + UUID.randomUUID().toString(); final Configuration readConfiguration = new BaseConfiguration(); readConfiguration.setProperty("spark.master", "local[4]"); - readConfiguration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName()); + readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName()); readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName()); readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); @@ -176,7 +176,7 @@ public void testBulkLoaderVertexProgramChainWithInputOutputHelperMapping() throw final String rddName = "target/test-output/" + UUID.randomUUID().toString(); final Configuration readConfiguration = new BaseConfiguration(); readConfiguration.setProperty("spark.master", "local[4]"); - readConfiguration.setProperty("spark.serializer", KryoSerializer.class.getCanonicalName()); + readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName()); readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName()); readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java deleted file mode 100644 index ff5a274df89..00000000000 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/TheCrewInputRDD.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.spark.structure.io; - -import org.apache.commons.configuration.Configuration; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; -import scala.Tuple2; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class TheCrewInputRDD implements InputRDD { - - @Override - public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { - return sparkContext.parallelize(IteratorUtils.list(IteratorUtils.map(TinkerFactory.createTheCrew().vertices(), VertexWritable::new))).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); - } -} diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java new file mode 100644 index 00000000000..ed97c04f93b --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io; + +import org.apache.commons.configuration.Configuration; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.tinkerpop.gremlin.LoadGraphWith; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoResourceAccess; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import scala.Tuple2; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class ToyGraphInputRDD implements InputRDD { + + public static final String GREMLIN_SPARK_TOY_GRAPH = "gremlin.spark.toyGraph"; + + @Override + public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { + final List vertices; + if (configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH).equals(LoadGraphWith.GraphData.MODERN.toString())) + vertices = IteratorUtils.list(TinkerFactory.createModern().vertices()); + else if (configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH).equals(LoadGraphWith.GraphData.CLASSIC.toString())) + vertices = IteratorUtils.list(TinkerFactory.createClassic().vertices()); + else if (configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH).equals(LoadGraphWith.GraphData.CREW.toString())) + vertices = IteratorUtils.list(TinkerFactory.createTheCrew().vertices()); + else if (configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH).equals(LoadGraphWith.GraphData.GRATEFUL.toString())) { + try { + final Graph graph = TinkerGraph.open(); + graph.io(GryoIo.build()).readGraph(GryoResourceAccess.class.getResource("grateful-dead.kryo").getFile()); + vertices = IteratorUtils.list(graph.vertices()); + } catch (final IOException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } else + throw new IllegalArgumentException("No legal toy graph was provided to load: " + configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH)); + + return sparkContext.parallelize(vertices.stream().map(VertexWritable::new).collect(Collectors.toList())).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); + } +} From 91efb28df23fdc0dc99c78bd72159f0478614df1 Mon Sep 17 00:00:00 2001 From: "Marko A. Rodriguez" Date: Wed, 2 Dec 2015 15:44:50 -0700 Subject: [PATCH 4/9] GroovyProcessCompiuterSuite was missing GroovyFlatMapTest. Added it. Added HadoopPool registration to ToyGraphInputRDD so it doesn't give a WARN message. Also I tweaked BulkLoaderVertexProgramTest to use target/test-output/ for its intermediary data. --- .../process/GroovyProcessComputerSuite.java | 2 + .../BulkLoaderVertexProgramTest.java | 2 +- .../computer/SparkHadoopGraphProvider.java | 16 ++++---- .../spark/structure/io/ToyGraphInputRDD.java | 38 ++++++++++--------- 4 files changed, 33 insertions(+), 25 deletions(-) diff --git a/gremlin-groovy-test/src/main/java/org/apache/tinkerpop/gremlin/process/GroovyProcessComputerSuite.java b/gremlin-groovy-test/src/main/java/org/apache/tinkerpop/gremlin/process/GroovyProcessComputerSuite.java index 9df79026bb7..d07d52f0023 100644 --- a/gremlin-groovy-test/src/main/java/org/apache/tinkerpop/gremlin/process/GroovyProcessComputerSuite.java +++ b/gremlin-groovy-test/src/main/java/org/apache/tinkerpop/gremlin/process/GroovyProcessComputerSuite.java @@ -47,6 +47,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyCoalesceTest; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyConstantTest; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyCountTest; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyFlatMapTest; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyFoldTest; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyGraphTest; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyLoopsTest; @@ -128,6 +129,7 @@ public class GroovyProcessComputerSuite extends ProcessComputerSuite { GroovyCoalesceTest.Traversals.class, GroovyConstantTest.Traversals.class, GroovyCountTest.Traversals.class, + GroovyFlatMapTest.Traversals.class, GroovyFoldTest.Traversals.class, GroovyGraphTest.Traversals.class, GroovyLoopsTest.Traversals.class, diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgramTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgramTest.java index 61a267f40be..992bfa42188 100644 --- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgramTest.java +++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/bulkloading/BulkLoaderVertexProgramTest.java @@ -52,7 +52,7 @@ */ public class BulkLoaderVertexProgramTest extends AbstractGremlinProcessTest { - final static String TINKERGRAPH_LOCATION = "/tmp/tinkertest.kryo"; + final static String TINKERGRAPH_LOCATION = "target/test-output/tinkertest.kryo"; private BulkLoader getBulkLoader(final BulkLoaderVertexProgram blvp) throws Exception { final Field field = BulkLoaderVertexProgram.class.getDeclaredField("bulkLoader"); diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java index 2cfeea379bd..de5a60df447 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java @@ -22,6 +22,8 @@ import org.apache.tinkerpop.gremlin.LoadGraphWith; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider; +import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgramTest; +import org.apache.tinkerpop.gremlin.process.computer.ranking.PageRankVertexProgramTest; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine; import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDDFormat; @@ -43,13 +45,13 @@ public final class SparkHadoopGraphProvider extends HadoopGraphProvider { @Override public Map getBaseConfiguration(final String graphName, final Class test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) { final Map config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith); - if (null != loadGraphWith) { - if (RANDOM.nextBoolean()) { - config.remove(Constants.GREMLIN_HADOOP_INPUT_LOCATION); - config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ToyGraphInputRDD.class.getCanonicalName()); - config.put(ToyGraphInputRDD.GREMLIN_SPARK_TOY_GRAPH, loadGraphWith.toString()); - config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); - } + config.put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); // this makes the test suite go really fast + if (null != loadGraphWith && + !test.equals(BulkLoaderVertexProgramTest.class) && + !test.equals(PageRankVertexProgramTest.class) && + RANDOM.nextBoolean()) { + config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ToyGraphInputRDD.class.getCanonicalName()); + config.put(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName()); } /// spark configuration config.put("spark.master", "local[4]"); diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java index ed97c04f93b..ea3636f9620 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java @@ -22,11 +22,13 @@ import org.apache.commons.configuration.Configuration; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.tinkerpop.gremlin.LoadGraphWith; +import org.apache.tinkerpop.gremlin.hadoop.Constants; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools; import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; import org.apache.tinkerpop.gremlin.structure.Graph; -import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.io.GraphReader; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoResourceAccess; import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; @@ -34,36 +36,38 @@ import scala.Tuple2; import java.io.IOException; +import java.io.InputStream; import java.util.List; -import java.util.stream.Collectors; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ public final class ToyGraphInputRDD implements InputRDD { - public static final String GREMLIN_SPARK_TOY_GRAPH = "gremlin.spark.toyGraph"; - @Override public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { - final List vertices; - if (configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH).equals(LoadGraphWith.GraphData.MODERN.toString())) - vertices = IteratorUtils.list(TinkerFactory.createModern().vertices()); - else if (configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH).equals(LoadGraphWith.GraphData.CLASSIC.toString())) - vertices = IteratorUtils.list(TinkerFactory.createClassic().vertices()); - else if (configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH).equals(LoadGraphWith.GraphData.CREW.toString())) - vertices = IteratorUtils.list(TinkerFactory.createTheCrew().vertices()); - else if (configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH).equals(LoadGraphWith.GraphData.GRATEFUL.toString())) { + HadoopPools.initialize(TinkerGraph.open().configuration()); + final List vertices; + if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("modern")) + vertices = IteratorUtils.list(IteratorUtils.map(TinkerFactory.createModern().vertices(), VertexWritable::new)); + else if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("classic")) + vertices = IteratorUtils.list(IteratorUtils.map(TinkerFactory.createClassic().vertices(), VertexWritable::new)); + else if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("crew")) + vertices = IteratorUtils.list(IteratorUtils.map(TinkerFactory.createTheCrew().vertices(), VertexWritable::new)); + else if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("grateful")) { try { final Graph graph = TinkerGraph.open(); - graph.io(GryoIo.build()).readGraph(GryoResourceAccess.class.getResource("grateful-dead.kryo").getFile()); - vertices = IteratorUtils.list(graph.vertices()); + final GraphReader reader = GryoReader.build().mapper(graph.io(GryoIo.build()).mapper().create()).create(); + try (final InputStream stream = GryoResourceAccess.class.getResourceAsStream("grateful-dead.kryo")) { + reader.readGraph(stream, graph); + } + vertices = IteratorUtils.list(IteratorUtils.map(graph.vertices(), VertexWritable::new)); } catch (final IOException e) { throw new IllegalStateException(e.getMessage(), e); } } else - throw new IllegalArgumentException("No legal toy graph was provided to load: " + configuration.getProperty(GREMLIN_SPARK_TOY_GRAPH)); + throw new IllegalArgumentException("No legal toy graph was provided to load: " + configuration.getProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION)); - return sparkContext.parallelize(vertices.stream().map(VertexWritable::new).collect(Collectors.toList())).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); + return sparkContext.parallelize(vertices).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex)); } } From 796fe24ae5a4a858b6b301ffa86efc63a2d98758 Mon Sep 17 00:00:00 2001 From: "Marko A. Rodriguez" Date: Wed, 2 Dec 2015 15:49:42 -0700 Subject: [PATCH 5/9] GroovyProcessStartard was missing GroovyFlatMapTest. Added. --- .../tinkerpop/gremlin/process/GroovyProcessStandardSuite.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gremlin-groovy-test/src/main/java/org/apache/tinkerpop/gremlin/process/GroovyProcessStandardSuite.java b/gremlin-groovy-test/src/main/java/org/apache/tinkerpop/gremlin/process/GroovyProcessStandardSuite.java index ae91b5fec3b..7024869f4a7 100644 --- a/gremlin-groovy-test/src/main/java/org/apache/tinkerpop/gremlin/process/GroovyProcessStandardSuite.java +++ b/gremlin-groovy-test/src/main/java/org/apache/tinkerpop/gremlin/process/GroovyProcessStandardSuite.java @@ -48,6 +48,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyCoalesceTest; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyConstantTest; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyCountTest; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyFlatMapTest; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyFoldTest; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyGraphTest; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyLoopsTest; @@ -128,6 +129,7 @@ public class GroovyProcessStandardSuite extends ProcessStandardSuite { GroovyConstantTest.Traversals.class, GroovyCountTest.Traversals.class, GroovyFoldTest.Traversals.class, + GroovyFlatMapTest.Traversals.class, GroovyGraphTest.Traversals.class, GroovyLoopsTest.Traversals.class, GroovyMapTest.Traversals.class, From 5bb4076c4530d59a253daa379769c3a02cddb0f3 Mon Sep 17 00:00:00 2001 From: "Marko A. Rodriguez" Date: Wed, 2 Dec 2015 18:35:58 -0700 Subject: [PATCH 6/9] Random exceptions started popping up. I realized that I should kill any residual SparkContext before and after every test. Added AbstractSparkTest which has a Before/After annotated method which does the job. No random exceptions now. Thank god. --- .../gremlin/spark/AbstractSparkTest.java | 43 +++++++++++++++++++ .../process/computer/LocalPropertyTest.java | 4 +- .../structure/io/InputOutputRDDTest.java | 3 +- .../spark/structure/io/InputRDDTest.java | 3 +- .../spark/structure/io/OutputRDDTest.java | 3 +- .../io/PersistedInputOutputRDDTest.java | 4 +- 6 files changed, 53 insertions(+), 7 deletions(-) create mode 100644 spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java new file mode 100644 index 00000000000..02b0e182781 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark; + +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.After; +import org.junit.Before; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public abstract class AbstractSparkTest { + + @After + @Before + public void setupTest() { + SparkConf sparkConfiguration = new SparkConf(); + sparkConfiguration.setAppName(this.getClass().getCanonicalName() + "-setupTest"); + sparkConfiguration.set("spark.master", "local[4]"); + JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration)); + sparkContext.close(); + System.out.println("SparkContext has been closed for " + this.getClass().getCanonicalName() + "-setupTest"); + } +} diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/LocalPropertyTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/LocalPropertyTest.java index 671bee8904b..b27b7b9b21f 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/LocalPropertyTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/LocalPropertyTest.java @@ -33,6 +33,7 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine; +import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest; import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD; import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; @@ -44,8 +45,7 @@ import static org.junit.Assert.assertTrue; -public class LocalPropertyTest { - +public class LocalPropertyTest extends AbstractSparkTest { @Test public void shouldSetThreadLocalProperties() throws Exception { diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java index 50a43bc84e1..cd78d7dba34 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java @@ -26,6 +26,7 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine; +import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; import org.apache.tinkerpop.gremlin.structure.Graph; @@ -35,7 +36,7 @@ /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class InputOutputRDDTest { +public class InputOutputRDDTest extends AbstractSparkTest { @Test public void shouldReadFromWriteToArbitraryRDD() throws Exception { diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java index b64139c72d1..1a54af00600 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java @@ -25,6 +25,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat; import org.apache.tinkerpop.gremlin.process.traversal.P; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; import org.apache.tinkerpop.gremlin.structure.Graph; @@ -36,7 +37,7 @@ /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class InputRDDTest { +public class InputRDDTest extends AbstractSparkTest { @Test public void shouldReadFromArbitraryRDD() { diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java index 60790e73155..2ccf0cae965 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java @@ -27,6 +27,7 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine; +import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; @@ -37,7 +38,7 @@ /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class OutputRDDTest { +public class OutputRDDTest extends AbstractSparkTest { @Test public void shouldWriteToArbitraryRDD() throws Exception { diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java index 1de2b478420..bc55c040f2d 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java @@ -34,6 +34,7 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine; +import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; @@ -50,10 +51,9 @@ /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class PersistedInputOutputRDDTest { +public class PersistedInputOutputRDDTest extends AbstractSparkTest { @Test - public void shouldNotPersistRDDAcrossJobs() throws Exception { final String rddName = "target/test-output/" + UUID.randomUUID(); final Configuration configuration = new BaseConfiguration(); From 5f6c3029faf0acfbba4653d71b82a8d74ec0bb67 Mon Sep 17 00:00:00 2001 From: "Marko A. Rodriguez" Date: Thu, 3 Dec 2015 07:49:40 -0700 Subject: [PATCH 7/9] updated CHANGELOG. --- CHANGELOG.asciidoc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 9f45a900b9b..90c9f27b92b 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -26,6 +26,10 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/ TinkerPop 3.1.1 (NOT OFFICIALLY RELEASED YET) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +* Added `AbstractSparkTest` which handles closing `SparkContext` instances between tests now that we support persisted contexts. +* Fixed a serialization bug in `GryoSerializer` that made it difficult for graph providers to yield `InputRDDs` for `SparkGraphComputer`. +* `SparkGraphComputer` is now tested against Gryo, GraphSON, and `InputRDD` data sources. +* `HadoopElementIterator` (for Hadoop-Gremlin OLTP) now works for any `InputFormat`, not just `FileInputFormats`. * Added `Traverser.Admin.getTags()` which are used to mark branches in a traversal (useful in `match()` and related future steps). * Fixed the `Future` model for `GiraphGraphComputer` and `SparkGraphComputer` so that class loaders are preserved. * Added support for arbitrary vertex ID types in `BulkLoaderVertexProgram`. From d25532a8632e600b1c6fcb65a43bbb20a4b0fa9e Mon Sep 17 00:00:00 2001 From: "Marko A. Rodriguez" Date: Thu, 3 Dec 2015 07:51:22 -0700 Subject: [PATCH 8/9] updated CHANGELOG again. --- CHANGELOG.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 90c9f27b92b..869b494bb47 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -26,6 +26,7 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/ TinkerPop 3.1.1 (NOT OFFICIALLY RELEASED YET) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +* Added `InputRDDFormat` which wraps an `InputRDD` to make it accessible to Hadoop and not just Spark. * Added `AbstractSparkTest` which handles closing `SparkContext` instances between tests now that we support persisted contexts. * Fixed a serialization bug in `GryoSerializer` that made it difficult for graph providers to yield `InputRDDs` for `SparkGraphComputer`. * `SparkGraphComputer` is now tested against Gryo, GraphSON, and `InputRDD` data sources. From 06c180ba72a6642185d1b178e1b5e577a5fed168 Mon Sep 17 00:00:00 2001 From: Stephen Mallette Date: Thu, 3 Dec 2015 13:26:07 -0500 Subject: [PATCH 9/9] Introduce the "ci" profile to Travis It builds locally with a sizeable reduction in logging. --- .travis.yml | 4 +--- pom.xml | 10 ++-------- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/.travis.yml b/.travis.yml index 8619a81f7dd..4b7e490b718 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,6 +9,4 @@ addons: packages: - oracle-java8-installer script: - - "mvn clean install" - - "bin/process-docs.sh --dryRun" - - "mvn process-resources -Djavadoc" \ No newline at end of file + - "mvn clean install -Dci" \ No newline at end of file diff --git a/pom.xml b/pom.xml index c184a751883..0332cdcdc33 100644 --- a/pom.xml +++ b/pom.xml @@ -133,7 +133,7 @@ limitations under the License. the modules under the root directory, not the root directory itself. --> ${project.basedir}/../gremlin-server/conf/ - log4j-server.properties + log4j.properties 3.0.2 UTF-8 UTF-8 @@ -930,15 +930,9 @@ limitations under the License. maven-surefire-plugin 2.17 - -Dlog4j.configuration=${log4j.properties} - - false + true - - - ${log4j.properties.dir} - **/*IntegrateTest.java **/*PerformanceTest.java