From f4f839a3a3fc92df1573c0a25410f16dee61fdaa Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Mon, 5 May 2014 15:52:58 -0700 Subject: [PATCH 1/6] Creating a synthetic benchmark script. --- .../spark/graphx/lib/SynthBenchmark.scala | 136 ++++++++++++++++++ .../spark/graphx/util/GraphGenerators.scala | 41 ++++-- 2 files changed, 168 insertions(+), 9 deletions(-) create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/SynthBenchmark.scala diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SynthBenchmark.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SynthBenchmark.scala new file mode 100644 index 0000000000000..ff00a6151927c --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SynthBenchmark.scala @@ -0,0 +1,136 @@ +package org.apache.spark.graphx.lib + +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx.PartitionStrategy +import org.apache.spark.graphx.PartitionStrategy.{CanonicalRandomVertexCut, EdgePartition2D, EdgePartition1D, RandomVertexCut} +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.graphx.util.GraphGenerators +import java.io.{PrintWriter, FileOutputStream} + +/** + * The SynthBenchmark application can be used to run various GraphX algorithms on + * synthetic log-normal graphs. The intent of this code is to enable users to + * profile the GraphX system without access to large graph datasets. + */ +object SynthBenchmark { + + def pickPartitioner(v: String): PartitionStrategy = { + // TODO: Use reflection rather than listing all the partitioning strategies here. + v match { + case "RandomVertexCut" => RandomVertexCut + case "EdgePartition1D" => EdgePartition1D + case "EdgePartition2D" => EdgePartition2D + case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut + case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + v) + } + } + + /** + * To run this program use the following: + * + * bin/spark-class org.apache.spark.graphx.lib.SynthBenchmark -host="local[4]" + * + * Required Options: + * -host The spark job scheduler + * + * Additional Options: + * -app "pagerank" or "cc" for pagerank or connected components. (Default: pagerank) + * -niters the number of iterations of pagerank to use (Default: 10) + * -numVertices the number of vertices in the graph (Default: 1000000) + * -numEPart the number of edge partitions in the graph (Default: number of cores) + * -partStrategy the graph partitioning strategy to use + * -mu the mean parameter for the log-normal graph (Default: 4.0) + * -sigma the stdev parameter for the log-normal graph (Default: 1.3) + * -degFile the local file to save the degree information (Default: Empty) + */ + def main(args: Array[String]): Unit = { + val options = args.map { + arg => + arg.dropWhile(_ == '-').split('=') match { + case Array(opt, v) => (opt -> v) + case _ => throw new IllegalArgumentException("Invalid argument: " + arg) + } + } + + var host: String = null + var app = "pagerank" + var niter = 10 + var numVertices = 1000000 + var numEPart: Option[Int] = None + var partitionStrategy: Option[PartitionStrategy] = None + var mu: Double = 4.0 + var sigma: Double = 1.3 + var degFile: String = "" + + options.foreach { + case ("host", v) => host = v + case ("app", v) => app = v + case ("niter", v) => niter = v.toInt + case ("nverts", v) => numVertices = v.toInt + case ("numEPart", v) => numEPart = Some(v.toInt) + case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) + case ("mu", v) => mu = v.toDouble + case ("sigma", v) => sigma = v.toDouble + case ("degFile", v) => degFile = v + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + if (host == null) { + println("No -host option specified!") + System.exit(1) + } + + val conf = new SparkConf() + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") + + val sc = new SparkContext(host, s"GraphX Synth Benchmark (nverts = $numVertices)", conf) + + // Create the graph + var graph = GraphGenerators.logNormalGraph(sc, numVertices, numEPart.getOrElse(sc.defaultParallelism), mu, sigma) + // Repartition the graph + if (!partitionStrategy.isEmpty) { + graph = graph.partitionBy(partitionStrategy.get) + } + graph.cache + + var startTime = System.currentTimeMillis() + val numEdges = graph.edges.count() + println(s"Num Vertices: $numVertices") + println(s"Num Edges: $numEdges}") + val loadTime = System.currentTimeMillis() - startTime + + // Collect the degree distribution (if desired) + if (!degFile.isEmpty) { + val fos = new FileOutputStream(degFile) + val pos = new PrintWriter(fos) + val hist = graph.vertices.leftJoin(graph.degrees)((id, _, optDeg) => optDeg.getOrElse(0)) + .map(p => p._2).countByValue() + hist.foreach { + case (deg, count) => pos.println(s"$deg \t $count") + } + } + + // Run PageRank + startTime = System.currentTimeMillis() + if (app == "pagerank") { + println("Running PageRank") + val totalPR = graph.staticPageRank(niter).vertices.map(p => p._2).sum + println(s"Total pagerank = $totalPR") + } else if (app == "cc") { + println("Connected Components") + val maxCC = graph.staticPageRank(niter).vertices.map(v => v._2).reduce((a,b)=>math.max(a,b)) + println(s"Max CC = $maxCC") + } + val runTime = System.currentTimeMillis() - startTime + + sc.stop + println(s"Num Vertices: $numVertices") + println(s"Num Edges: $numEdges") + println(s"Load time: ${loadTime/1000.0} seconds") + println(s"Run time: ${runTime/1000.0} seconds") + + } + + +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala index a3c8de3f9068f..b070b11d1bbfb 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala @@ -38,19 +38,42 @@ object GraphGenerators { val RMATa = 0.45 val RMATb = 0.15 val RMATd = 0.25 + /** * Generate a graph whose vertex out degree is log normal. + * + * The default values for mu and sigma are taken from the Pregel paper: + * + * Grzegorz Malewicz, Matthew H. Austern, Aart J.C Bik, James C. Dehnert, + * Ilan Horn, Naty Leiser, and Grzegorz Czajkowski. 2010. + * Pregel: a system for large-scale graph processing. SIGMOD '10. + * + * @param sc + * @param numVertices + * @param mu + * @param sigma + * @return */ - def logNormalGraph(sc: SparkContext, numVertices: Int): Graph[Int, Int] = { - // based on Pregel settings - val mu = 4 - val sigma = 1.3 - - val vertices: RDD[(VertexId, Int)] = sc.parallelize(0 until numVertices).map{ - src => (src, sampleLogNormal(mu, sigma, numVertices)) + def logNormalGraph(sc: SparkContext, numVertices: Int, numEParts: Int, + mu: Double = 4.0, sigma: Double = 1.3): Graph[Long, Int] = { + val vertices: RDD[(VertexId, Long)] = sc.parallelize(0 until numVertices, numEParts).map { src => + // Initialize the random number generator with the source vertex id + val rand = new Random(src) + val degree: Long = math.min(numVertices.toLong, math.exp(rand.nextGaussian()*sigma + mu).toLong) + (src.toLong, degree) } - val edges = vertices.flatMap { v => - generateRandomEdges(v._1.toInt, v._2, numVertices) + val edges: RDD[Edge[Int]] = vertices.flatMap { case (src, degree) => + new Iterator[Edge[Int]] { + // Initialize the random number generator with the source vertex id + val rand = new Random(src) + var i = 0 + override def hasNext(): Boolean = { i < degree } + override def next(): Edge[Int] = { + val nextEdge = Edge[Int](src, rand.nextInt(numVertices), i) + i += 1 + nextEdge + } + } } Graph(vertices, edges, 0) } From d943972d7dd2df3c2026631410f61ffe0e536d63 Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Sat, 10 May 2014 15:57:31 -0700 Subject: [PATCH 2/6] moving the benchmark application into the examples folder. --- .../org/apache/spark/examples/graphx}/SynthBenchmark.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) rename {graphx/src/main/scala/org/apache/spark/graphx/lib => examples/src/main/scala/org/apache/spark/examples/graphx}/SynthBenchmark.scala (99%) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SynthBenchmark.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala similarity index 99% rename from graphx/src/main/scala/org/apache/spark/graphx/lib/SynthBenchmark.scala rename to examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala index ff00a6151927c..79cad63fb2ebc 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SynthBenchmark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala @@ -1,4 +1,4 @@ -package org.apache.spark.graphx.lib +package org.apache.spark.examples.graphx import org.apache.spark.SparkContext._ import org.apache.spark.graphx.PartitionStrategy @@ -131,6 +131,4 @@ object SynthBenchmark { println(s"Run time: ${runTime/1000.0} seconds") } - - } From 1bdf39a396430237e1bb6ea10fe334929217416e Mon Sep 17 00:00:00 2001 From: "Joseph E. Gonzalez" Date: Sun, 11 May 2014 16:55:22 -0700 Subject: [PATCH 3/6] updating options --- .../spark/examples/graphx/SynthBenchmark.scala | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala index 79cad63fb2ebc..9029507bebdb6 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala @@ -28,12 +28,9 @@ object SynthBenchmark { /** * To run this program use the following: * - * bin/spark-class org.apache.spark.graphx.lib.SynthBenchmark -host="local[4]" + * MASTER=spark://foobar bin/run-example graphx.SynthBenchmark -app=pagerank * - * Required Options: - * -host The spark job scheduler - * - * Additional Options: + * Options: * -app "pagerank" or "cc" for pagerank or connected components. (Default: pagerank) * -niters the number of iterations of pagerank to use (Default: 10) * -numVertices the number of vertices in the graph (Default: 1000000) @@ -52,7 +49,6 @@ object SynthBenchmark { } } - var host: String = null var app = "pagerank" var niter = 10 var numVertices = 1000000 @@ -63,7 +59,6 @@ object SynthBenchmark { var degFile: String = "" options.foreach { - case ("host", v) => host = v case ("app", v) => app = v case ("niter", v) => niter = v.toInt case ("nverts", v) => numVertices = v.toInt @@ -75,16 +70,12 @@ object SynthBenchmark { case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) } - if (host == null) { - println("No -host option specified!") - System.exit(1) - } - val conf = new SparkConf() + .setAppName(s"GraphX Synth Benchmark (nverts = $numVertices, app = $app)") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") - val sc = new SparkContext(host, s"GraphX Synth Benchmark (nverts = $numVertices)", conf) + val sc = new SparkContext(conf) // Create the graph var graph = GraphGenerators.logNormalGraph(sc, numVertices, numEPart.getOrElse(sc.defaultParallelism), mu, sigma) From 374678af436794225d1dda88e179e3a7b441013d Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Tue, 3 Jun 2014 11:56:32 -0700 Subject: [PATCH 4/6] Bugfix and style changes --- .../examples/graphx/SynthBenchmark.scala | 67 ++++++++++--------- .../spark/graphx/PartitionStrategy.scala | 9 +++ 2 files changed, 44 insertions(+), 32 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala index 9029507bebdb6..551c339b19523 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala @@ -1,8 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.examples.graphx import org.apache.spark.SparkContext._ import org.apache.spark.graphx.PartitionStrategy -import org.apache.spark.graphx.PartitionStrategy.{CanonicalRandomVertexCut, EdgePartition2D, EdgePartition1D, RandomVertexCut} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.graphx.util.GraphGenerators import java.io.{PrintWriter, FileOutputStream} @@ -14,17 +30,6 @@ import java.io.{PrintWriter, FileOutputStream} */ object SynthBenchmark { - def pickPartitioner(v: String): PartitionStrategy = { - // TODO: Use reflection rather than listing all the partitioning strategies here. - v match { - case "RandomVertexCut" => RandomVertexCut - case "EdgePartition1D" => EdgePartition1D - case "EdgePartition2D" => EdgePartition2D - case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut - case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + v) - } - } - /** * To run this program use the following: * @@ -40,7 +45,7 @@ object SynthBenchmark { * -sigma the stdev parameter for the log-normal graph (Default: 1.3) * -degFile the local file to save the degree information (Default: Empty) */ - def main(args: Array[String]): Unit = { + def main(args: Array[String]) { val options = args.map { arg => arg.dropWhile(_ == '-').split('=') match { @@ -51,7 +56,7 @@ object SynthBenchmark { var app = "pagerank" var niter = 10 - var numVertices = 1000000 + var numVertices = 100000 var numEPart: Option[Int] = None var partitionStrategy: Option[PartitionStrategy] = None var mu: Double = 4.0 @@ -63,7 +68,7 @@ object SynthBenchmark { case ("niter", v) => niter = v.toInt case ("nverts", v) => numVertices = v.toInt case ("numEPart", v) => numEPart = Some(v.toInt) - case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) + case ("partStrategy", v) => partitionStrategy = Some(PartitionStrategy.fromString(v)) case ("mu", v) => mu = v.toDouble case ("sigma", v) => sigma = v.toDouble case ("degFile", v) => degFile = v @@ -78,17 +83,15 @@ object SynthBenchmark { val sc = new SparkContext(conf) // Create the graph - var graph = GraphGenerators.logNormalGraph(sc, numVertices, numEPart.getOrElse(sc.defaultParallelism), mu, sigma) + println(s"Creating graph...") + val unpartitionedGraph = GraphGenerators.logNormalGraph(sc, numVertices, + numEPart.getOrElse(sc.defaultParallelism), mu, sigma) // Repartition the graph - if (!partitionStrategy.isEmpty) { - graph = graph.partitionBy(partitionStrategy.get) - } - graph.cache + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)).cache() var startTime = System.currentTimeMillis() val numEdges = graph.edges.count() - println(s"Num Vertices: $numVertices") - println(s"Num Edges: $numEdges}") + println(s"Done creating graph. Num Vertices = $numVertices, Num Edges = $numEdges") val loadTime = System.currentTimeMillis() - startTime // Collect the degree distribution (if desired) @@ -106,20 +109,20 @@ object SynthBenchmark { startTime = System.currentTimeMillis() if (app == "pagerank") { println("Running PageRank") - val totalPR = graph.staticPageRank(niter).vertices.map(p => p._2).sum - println(s"Total pagerank = $totalPR") + val totalPR = graph.staticPageRank(niter).vertices.map(_._2).sum() + println(s"Total PageRank = $totalPR") } else if (app == "cc") { - println("Connected Components") - val maxCC = graph.staticPageRank(niter).vertices.map(v => v._2).reduce((a,b)=>math.max(a,b)) - println(s"Max CC = $maxCC") + println("Running Connected Components") + val numComponents = graph.connectedComponents.vertices.map(_._2).distinct() + println(s"Number of components = $numComponents") } val runTime = System.currentTimeMillis() - startTime - sc.stop - println(s"Num Vertices: $numVertices") - println(s"Num Edges: $numEdges") - println(s"Load time: ${loadTime/1000.0} seconds") - println(s"Run time: ${runTime/1000.0} seconds") + println(s"Num Vertices = $numVertices") + println(s"Num Edges = $numEdges") + println(s"Creation time = ${loadTime/1000.0} seconds") + println(s"Run time = ${runTime/1000.0} seconds") + sc.stop() } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index 1526ccef06fd4..ef412cfd4e6ea 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -119,4 +119,13 @@ object PartitionStrategy { math.abs((lower, higher).hashCode()) % numParts } } + + /** Returns the PartitionStrategy with the specified name. */ + def fromString(s: String): PartitionStrategy = s match { + case "RandomVertexCut" => RandomVertexCut + case "EdgePartition1D" => EdgePartition1D + case "EdgePartition2D" => EdgePartition2D + case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut + case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + s) + } } From bccccad6ae94aec8c9ec7b9e59f1e6782587a8f2 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Tue, 3 Jun 2014 12:04:21 -0700 Subject: [PATCH 5/6] Fix long lines --- .../org/apache/spark/graphx/util/GraphGenerators.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala index b070b11d1bbfb..635514f09ece0 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala @@ -56,13 +56,13 @@ object GraphGenerators { */ def logNormalGraph(sc: SparkContext, numVertices: Int, numEParts: Int, mu: Double = 4.0, sigma: Double = 1.3): Graph[Long, Int] = { - val vertices: RDD[(VertexId, Long)] = sc.parallelize(0 until numVertices, numEParts).map { src => + val vertices = sc.parallelize(0 until numVertices, numEParts).map { src => // Initialize the random number generator with the source vertex id val rand = new Random(src) - val degree: Long = math.min(numVertices.toLong, math.exp(rand.nextGaussian()*sigma + mu).toLong) + val degree = math.min(numVertices.toLong, math.exp(rand.nextGaussian() * sigma + mu).toLong) (src.toLong, degree) } - val edges: RDD[Edge[Int]] = vertices.flatMap { case (src, degree) => + val edges = vertices.flatMap { case (src, degree) => new Iterator[Edge[Int]] { // Initialize the random number generator with the source vertex id val rand = new Random(src) From e40812a8788c38655c0bcaf7ed29fff6a1c1afff Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Tue, 3 Jun 2014 13:19:23 -0700 Subject: [PATCH 6/6] Exclude all of GraphX from compatibility checks vs. 1.0.0 --- project/MimaExcludes.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index ecb389de5558f..fc9cbeaec6473 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -35,7 +35,8 @@ object MimaExcludes { val excludes = SparkBuild.SPARK_VERSION match { case v if v.startsWith("1.1") => - Seq() + Seq( + MimaBuild.excludeSparkPackage("graphx")) case v if v.startsWith("1.0") => Seq( MimaBuild.excludeSparkPackage("api.java"), @@ -58,4 +59,3 @@ object MimaExcludes { case _ => Seq() } } -