From b2009bc04ed750b2ef03e76b5fdae2bffd958967 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=84=E7=95=85?= Date: Fri, 10 Oct 2014 15:19:37 +0800 Subject: [PATCH] format example code and explicit val instead of var --- .../apache/spark/examples/BroadcastTest.scala | 4 +- .../spark/examples/CassandraCQLTest.scala | 64 +++++++++---------- .../apache/spark/examples/CassandraTest.scala | 39 ++++++----- .../spark/examples/DriverSubmissionTest.scala | 6 +- .../apache/spark/examples/GroupByTest.scala | 16 ++--- .../org/apache/spark/examples/HBaseTest.scala | 3 +- .../org/apache/spark/examples/HdfsTest.scala | 4 +- .../org/apache/spark/examples/LocalALS.scala | 22 +++---- .../apache/spark/examples/LocalFileLR.scala | 7 +- .../apache/spark/examples/LocalKMeans.scala | 33 +++++----- .../org/apache/spark/examples/LocalLR.scala | 18 +++--- .../org/apache/spark/examples/LocalPi.scala | 5 +- .../org/apache/spark/examples/LogQuery.scala | 10 +-- .../spark/examples/MultiBroadcastTest.scala | 4 +- .../examples/SimpleSkewedGroupByTest.scala | 24 +++---- .../spark/examples/SkewedGroupByTest.scala | 16 ++--- .../org/apache/spark/examples/SparkALS.scala | 30 ++++----- .../apache/spark/examples/SparkHdfsLR.scala | 19 +++--- .../apache/spark/examples/SparkKMeans.scala | 18 +++--- .../org/apache/spark/examples/SparkLR.scala | 22 ++++--- .../apache/spark/examples/SparkPageRank.scala | 4 +- .../org/apache/spark/examples/SparkPi.scala | 6 +- .../org/apache/spark/examples/SparkTC.scala | 8 +-- .../spark/examples/SparkTachyonHdfsLR.scala | 23 +++---- .../spark/examples/SparkTachyonPi.scala | 8 +-- .../spark/examples/bagel/PageRankUtils.scala | 30 ++++----- .../examples/bagel/WikipediaPageRank.scala | 25 ++++---- .../bagel/WikipediaPageRankStandalone.scala | 56 ++++++++-------- .../spark/examples/graphx/Analytics.scala | 21 +++--- .../examples/graphx/LiveJournalPageRank.scala | 6 -- .../examples/graphx/SynthBenchmark.scala | 15 +++-- .../examples/mllib/BinaryClassification.scala | 13 ++-- .../spark/examples/mllib/Correlations.scala | 15 ++--- .../examples/mllib/CosineSimilarity.scala | 8 +-- .../spark/examples/mllib/DenseKMeans.scala | 5 +- .../examples/mllib/LinearRegression.scala | 19 +++--- .../spark/examples/mllib/MovieLensALS.scala | 13 ++-- .../mllib/MultivariateSummarizer.scala | 15 ++--- .../examples/mllib/RandomRDDGeneration.scala | 5 +- .../spark/examples/mllib/SampledRDDs.scala | 15 ++--- .../examples/mllib/SparseNaiveBayes.scala | 5 +- .../mllib/StreamingLinearRegression.scala | 6 +- .../spark/examples/mllib/TallSkinnySVD.scala | 4 +- .../pythonconverters/AvroConverters.scala | 41 ++++++------ .../CassandraConverters.scala | 6 +- .../pythonconverters/HBaseConverters.scala | 6 +- .../spark/examples/sql/RDDRelation.scala | 2 +- .../examples/sql/hive/HiveFromSpark.scala | 3 +- .../examples/streaming/ActorWordCount.scala | 42 ++++++------ .../examples/streaming/CustomReceiver.scala | 62 +++++++++--------- .../examples/streaming/FlumeEventCount.scala | 22 +++---- .../streaming/FlumePollingEventCount.scala | 20 +++--- .../examples/streaming/HdfsWordCount.scala | 8 +-- .../examples/streaming/KafkaWordCount.scala | 25 ++++---- .../examples/streaming/MQTTWordCount.scala | 25 ++++---- .../examples/streaming/NetworkWordCount.scala | 8 +-- .../examples/streaming/QueueStream.scala | 8 +-- .../examples/streaming/RawNetworkGrep.scala | 10 +-- .../RecoverableNetworkWordCount.scala | 23 ++++--- .../streaming/StatefulNetworkWordCount.scala | 12 ++-- .../streaming/StreamingExamples.scala | 3 +- .../streaming/TwitterAlgebirdCMS.scala | 5 +- .../streaming/TwitterAlgebirdHLL.scala | 7 +- .../streaming/TwitterPopularTags.scala | 18 +++--- .../examples/streaming/ZeroMQWordCount.scala | 23 +++---- .../clickstream/PageViewGenerator.scala | 37 +++++------ .../clickstream/PageViewStream.scala | 41 ++++++------ 67 files changed, 562 insertions(+), 584 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala index 973049b95a7bd..19f452272285e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala @@ -20,8 +20,8 @@ package org.apache.spark.examples import org.apache.spark.{SparkConf, SparkContext} /** - * Usage: BroadcastTest [slices] [numElem] [broadcastAlgo] [blockSize] - */ + * Usage: BroadcastTest [slices] [numElem] [broadcastAlgo] [blockSize] + */ object BroadcastTest { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala index 11d5c92c5952d..2fcc223970295 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala @@ -19,19 +19,17 @@ package org.apache.spark.examples import java.nio.ByteBuffer -import scala.collection.JavaConversions._ -import scala.collection.mutable.ListBuffer -import scala.collection.immutable.Map - import org.apache.cassandra.hadoop.ConfigHelper -import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat -import org.apache.cassandra.hadoop.cql3.CqlConfigHelper -import org.apache.cassandra.hadoop.cql3.CqlOutputFormat +import org.apache.cassandra.hadoop.cql3.{CqlConfigHelper, CqlOutputFormat, CqlPagingInputFormat} import org.apache.cassandra.utils.ByteBufferUtil +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job - -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ +import org.apache.spark.{SparkConf, SparkContext} + +import scala.collection.JavaConversions._ +import scala.collection.immutable.Map +import scala.collection.mutable.ListBuffer /* @@ -68,7 +66,7 @@ import org.apache.spark.SparkContext._ * using Spark. * Parameters : * Usage: ./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.CassandraCQLTest localhost 9160 + * --class org.apache.spark.examples.CassandraCQLTest localhost 9160 */ object CassandraCQLTest { @@ -84,28 +82,30 @@ object CassandraCQLTest { val job = new Job() job.setInputFormatClass(classOf[CqlPagingInputFormat]) - ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost) - ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort) - ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily) - ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner") - CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3") + ConfigHelper.setInputInitialAddress(job.getConfiguration, cHost) + val config: Configuration = job.getConfiguration + + ConfigHelper.setInputRpcPort(config, cPort) + ConfigHelper.setInputColumnFamily(config, KeySpace, InputColumnFamily) + ConfigHelper.setInputPartitioner(config, "Murmur3Partitioner") + CqlConfigHelper.setInputCQLPageRowSize(config, "3") - /** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'") */ + /** CqlConfigHelper.setInputWhereClauses(config, "user_id='bob'") */ /** An UPDATE writes one or more columns to a record in a Cassandra column family */ val query = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? " - CqlConfigHelper.setOutputCql(job.getConfiguration(), query) + CqlConfigHelper.setOutputCql(config, query) job.setOutputFormatClass(classOf[CqlOutputFormat]) - ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily) - ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost) - ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort) - ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner") + ConfigHelper.setOutputColumnFamily(config, KeySpace, OutputColumnFamily) + ConfigHelper.setOutputInitialAddress(config, cHost) + ConfigHelper.setOutputRpcPort(config, cPort) + ConfigHelper.setOutputPartitioner(config, "Murmur3Partitioner") - val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), + val casRdd = sc.newAPIHadoopRDD(config, classOf[CqlPagingInputFormat], - classOf[java.util.Map[String,ByteBuffer]], - classOf[java.util.Map[String,ByteBuffer]]) + classOf[java.util.Map[String, ByteBuffer]], + classOf[java.util.Map[String, ByteBuffer]]) println("Count: " + casRdd.count) val productSaleRDD = casRdd.map { @@ -118,24 +118,24 @@ object CassandraCQLTest { case (productId, saleCount) => println(productId + ":" + saleCount) } - val casoutputCF = aggregatedRDD.map { + val casoutputCF = aggregatedRDD.map { case (productId, saleCount) => { val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId)) val outKey: java.util.Map[String, ByteBuffer] = outColFamKey var outColFamVal = new ListBuffer[ByteBuffer] outColFamVal += ByteBufferUtil.bytes(saleCount) val outVal: java.util.List[ByteBuffer] = outColFamVal - (outKey, outVal) + (outKey, outVal) } } casoutputCF.saveAsNewAPIHadoopFile( - KeySpace, - classOf[java.util.Map[String, ByteBuffer]], - classOf[java.util.List[ByteBuffer]], - classOf[CqlOutputFormat], - job.getConfiguration() - ) + KeySpace, + classOf[java.util.Map[String, ByteBuffer]], + classOf[java.util.List[ByteBuffer]], + classOf[CqlOutputFormat], + config + ) sc.stop() } diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala index ec689474aecb0..91c9b9ad2648b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala @@ -18,20 +18,17 @@ package org.apache.spark.examples import java.nio.ByteBuffer -import java.util.SortedMap - -import scala.collection.JavaConversions._ +import java.util import org.apache.cassandra.db.IColumn -import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat -import org.apache.cassandra.hadoop.ConfigHelper -import org.apache.cassandra.hadoop.ColumnFamilyInputFormat +import org.apache.cassandra.hadoop.{ColumnFamilyInputFormat, ColumnFamilyOutputFormat, ConfigHelper} import org.apache.cassandra.thrift._ import org.apache.cassandra.utils.ByteBufferUtil import org.apache.hadoop.mapreduce.Job - -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ +import org.apache.spark.{SparkConf, SparkContext} + +import scala.collection.JavaConversions._ /* * This example demonstrates using Spark with Cassandra with the New Hadoop API and Cassandra @@ -65,29 +62,31 @@ object CassandraTest { val host: String = args(1) val port: String = args(2) - ConfigHelper.setInputInitialAddress(job.getConfiguration(), host) - ConfigHelper.setInputRpcPort(job.getConfiguration(), port) - ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host) - ConfigHelper.setOutputRpcPort(job.getConfiguration(), port) - ConfigHelper.setInputColumnFamily(job.getConfiguration(), "casDemo", "Words") - ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "casDemo", "WordCount") + val config = job.getConfiguration + + ConfigHelper.setInputInitialAddress(config, host) + ConfigHelper.setInputRpcPort(config, port) + ConfigHelper.setOutputInitialAddress(config, host) + ConfigHelper.setOutputRpcPort(config, port) + ConfigHelper.setInputColumnFamily(config, "casDemo", "Words") + ConfigHelper.setOutputColumnFamily(config, "casDemo", "WordCount") val predicate = new SlicePredicate() val sliceRange = new SliceRange() sliceRange.setStart(Array.empty[Byte]) sliceRange.setFinish(Array.empty[Byte]) predicate.setSlice_range(sliceRange) - ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate) + ConfigHelper.setInputSlicePredicate(config, predicate) - ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner") - ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner") + ConfigHelper.setInputPartitioner(config, "Murmur3Partitioner") + ConfigHelper.setOutputPartitioner(config, "Murmur3Partitioner") // Make a new Hadoop RDD val casRdd = sc.newAPIHadoopRDD( - job.getConfiguration(), + config, classOf[ColumnFamilyInputFormat], classOf[ByteBuffer], - classOf[SortedMap[ByteBuffer, IColumn]]) + classOf[util.SortedMap[ByteBuffer, IColumn]]) // Let us first get all the paragraphs from the retrieved rows val paraRdd = casRdd.map { @@ -125,7 +124,7 @@ object CassandraTest { (outputkey, mutations) } }.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]], - classOf[ColumnFamilyOutputFormat], job.getConfiguration) + classOf[ColumnFamilyOutputFormat], job.getConfiguration) sc.stop() } diff --git a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala index 65251e93190f0..886f35e4ac78b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala @@ -30,13 +30,13 @@ object DriverSubmissionTest { val numSecondsToSleep = args(0).toInt val env = System.getenv() - val properties = System.getProperties() + val properties = System.getProperties println("Environment variables containing SPARK_TEST:") - env.filter{case (k, v) => k.contains("SPARK_TEST")}.foreach(println) + env.filter { case (k, v) => k.contains("SPARK_TEST")}.foreach(println) println("System properties containing spark.test:") - properties.filter{case (k, v) => k.toString.contains("spark.test")}.foreach(println) + properties.filter { case (k, v) => k.toString.contains("spark.test")}.foreach(println) for (i <- 1 until numSecondsToSleep) { println(s"Alive for $i out of $numSecondsToSleep seconds") diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala index 15f6678648b29..0e3f46fc9a1c8 100644 --- a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala @@ -19,25 +19,25 @@ package org.apache.spark.examples import java.util.Random -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ +import org.apache.spark.{SparkConf, SparkContext} /** - * Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers] - */ + * Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers] + */ object GroupByTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("GroupBy Test") - var numMappers = if (args.length > 0) args(0).toInt else 2 - var numKVPairs = if (args.length > 1) args(1).toInt else 1000 - var valSize = if (args.length > 2) args(2).toInt else 1000 - var numReducers = if (args.length > 3) args(3).toInt else numMappers + val numMappers = if (args.length > 0) args(0).toInt else 2 + val numKVPairs = if (args.length > 1) args(1).toInt else 1000 + val valSize = if (args.length > 2) args(2).toInt else 1000 + val numReducers = if (args.length > 3) args(3).toInt else numMappers val sc = new SparkContext(sparkConf) val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random - var arr1 = new Array[(Int, Array[Byte])](numKVPairs) + val arr1 = new Array[(Int, Array[Byte])](numKVPairs) for (i <- 0 until numKVPairs) { val byteArr = new Array[Byte](valSize) ranGen.nextBytes(byteArr) diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala index 822673347bdce..d148190b4187c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala @@ -18,9 +18,8 @@ package org.apache.spark.examples import org.apache.hadoop.hbase.client.HBaseAdmin -import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.mapreduce.TableInputFormat - +import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.spark._ diff --git a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala index ed2b38e2ca6f8..ac3d1e56a1908 100644 --- a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala @@ -34,9 +34,9 @@ object HdfsTest { val mapped = file.map(s => s.length).cache() for (iter <- 1 to 10) { val start = System.currentTimeMillis() - for (x <- mapped) { x + 2 } + for (x <- mapped) {x + 2} val end = System.currentTimeMillis() - println("Iteration " + iter + " took " + (end-start) + " ms") + println("Iteration " + iter + " took " + (end - start) + " ms") } sc.stop() } diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala index 1f576319b3ca8..a9d504d343a35 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala @@ -17,12 +17,12 @@ package org.apache.spark.examples -import scala.math.sqrt - import cern.colt.matrix._ import cern.colt.matrix.linalg._ import cern.jet.math._ +import scala.math.sqrt + /** * Alternating least squares matrix factorization. * @@ -31,9 +31,12 @@ import cern.jet.math._ */ object LocalALS { // Parameters set through command line arguments - var M = 0 // Number of movies - var U = 0 // Number of users - var F = 0 // Number of features + var M = 0 + // Number of movies + var U = 0 + // Number of users + var F = 0 + // Number of features var ITERATIONS = 0 val LAMBDA = 0.01 // Regularization coefficient @@ -51,8 +54,7 @@ object LocalALS { } def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D], - us: Array[DoubleMatrix1D]): Double = - { + us: Array[DoubleMatrix1D]): Double = { val r = factory2D.make(M, U) for (i <- 0 until M; j <- 0 until U) { r.set(i, j, blas.ddot(ms(i), us(j))) @@ -63,8 +65,7 @@ object LocalALS { } def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D], - R: DoubleMatrix2D) : DoubleMatrix1D = - { + R: DoubleMatrix2D): DoubleMatrix1D = { val XtX = factory2D.make(F, F) val Xty = factory1D.make(F) // For each user that rated the movie @@ -87,8 +88,7 @@ object LocalALS { } def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D], - R: DoubleMatrix2D) : DoubleMatrix1D = - { + R: DoubleMatrix2D): DoubleMatrix1D = { val XtX = factory2D.make(F, F) val Xty = factory1D.make(F) // For each movie that the user rated diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala index 931faac5463c4..dce974f7bd120 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala @@ -19,7 +19,7 @@ package org.apache.spark.examples import java.util.Random -import breeze.linalg.{Vector, DenseVector} +import breeze.linalg.{DenseVector, Vector} /** * Logistic regression based classification. @@ -28,7 +28,8 @@ import breeze.linalg.{Vector, DenseVector} * please refer to org.apache.spark.mllib.classification.LogisticRegression */ object LocalFileLR { - val D = 10 // Numer of dimensions + val D = 10 + // Numer of dimensions val rand = new Random(42) case class DataPoint(x: Vector[Double], y: Double) @@ -55,7 +56,7 @@ object LocalFileLR { val ITERATIONS = args(1).toInt // Initialize w to a random value - var w = DenseVector.fill(D){2 * rand.nextDouble - 1} + var w = DenseVector.fill(D) {2 * rand.nextDouble - 1} println("Initial w: " + w) for (i <- 1 to ITERATIONS) { diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala index 17624c20cff3d..1099e60db4e84 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala @@ -19,12 +19,9 @@ package org.apache.spark.examples import java.util.Random -import scala.collection.mutable.HashMap -import scala.collection.mutable.HashSet +import breeze.linalg.{DenseVector, Vector, squaredDistance} -import breeze.linalg.{Vector, DenseVector, squaredDistance} - -import org.apache.spark.SparkContext._ +import scala.collection.mutable /** * K-means clustering. @@ -34,7 +31,8 @@ import org.apache.spark.SparkContext._ */ object LocalKMeans { val N = 1000 - val R = 1000 // Scaling factor + val R = 1000 + // Scaling factor val D = 10 val K = 10 val convergeDist = 0.001 @@ -42,12 +40,12 @@ object LocalKMeans { def generateData = { def generatePoint(i: Int) = { - DenseVector.fill(D){rand.nextDouble * R} + DenseVector.fill(D) {rand.nextDouble * R} } Array.tabulate(N)(generatePoint) } - def closestPoint(p: Vector[Double], centers: HashMap[Int, Vector[Double]]): Int = { + def closestPoint(p: Vector[Double], centers: mutable.HashMap[Int, Vector[Double]]): Int = { var index = 0 var bestIndex = 0 var closest = Double.PositiveInfinity @@ -77,8 +75,8 @@ object LocalKMeans { showWarning() val data = generateData - var points = new HashSet[Vector[Double]] - var kPoints = new HashMap[Int, Vector[Double]] + val points = new mutable.HashSet[Vector[Double]] + val kPoints = new mutable.HashMap[Int, Vector[Double]] var tempDist = 1.0 while (points.size < K) { @@ -92,19 +90,20 @@ object LocalKMeans { println("Initial centers: " + kPoints) - while(tempDist > convergeDist) { - var closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) + while (tempDist > convergeDist) { + val closest = data.map(p => (closestPoint(p, kPoints), (p, 1))) - var mappings = closest.groupBy[Int] (x => x._1) + val mappings = closest.groupBy[Int](x => x._1) - var pointStats = mappings.map { pair => - pair._2.reduceLeft [(Int, (Vector[Double], Int))] { + val pointStats = mappings.map { pair => + pair._2.reduceLeft[(Int, (Vector[Double], Int))] { case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1 + y2)) } } - var newPoints = pointStats.map {mapping => - (mapping._1, mapping._2._1 * (1.0 / mapping._2._2))} + val newPoints = pointStats.map { mapping => + (mapping._1, mapping._2._1 * (1.0 / mapping._2._2)) + } tempDist = 0.0 for (mapping <- newPoints) { diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala index 2d75b9d2590f8..300bd9c226fcb 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala @@ -19,7 +19,7 @@ package org.apache.spark.examples import java.util.Random -import breeze.linalg.{Vector, DenseVector} +import breeze.linalg.{DenseVector, Vector} /** * Logistic regression based classification. @@ -28,9 +28,9 @@ import breeze.linalg.{Vector, DenseVector} * please refer to org.apache.spark.mllib.classification.LogisticRegression */ object LocalLR { - val N = 10000 // Number of data points - val D = 10 // Number of dimensions - val R = 0.7 // Scaling factor + val N = 10000 // Number of data points + val D = 10 // Number of dimensions + val R = 0.7 // Scaling factor val ITERATIONS = 5 val rand = new Random(42) @@ -38,8 +38,8 @@ object LocalLR { def generateData = { def generatePoint(i: Int) = { - val y = if(i % 2 == 0) -1 else 1 - val x = DenseVector.fill(D){rand.nextGaussian + y * R} + val y = if (i % 2 == 0) -1 else 1 + val x = DenseVector.fill(D) {rand.nextGaussian + y * R} DataPoint(x, y) } Array.tabulate(N)(generatePoint) @@ -59,15 +59,15 @@ object LocalLR { val data = generateData // Initialize w to a random value - var w = DenseVector.fill(D){2 * rand.nextDouble - 1} + var w = DenseVector.fill(D) {2 * rand.nextDouble - 1} println("Initial w: " + w) for (i <- 1 to ITERATIONS) { println("On iteration " + i) var gradient = DenseVector.zeros[Double](D) for (p <- data) { - val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y - gradient += p.x * scale + val scale = (1 / (1 + math.exp(-p.y * w.dot(p.x))) - 1) * p.y + gradient += p.x * scale } w -= gradient } diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalPi.scala b/examples/src/main/scala/org/apache/spark/examples/LocalPi.scala index ee6b3ee34aeb2..afd3d25968a81 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalPi.scala @@ -19,16 +19,13 @@ package org.apache.spark.examples import scala.math.random -import org.apache.spark._ -import org.apache.spark.SparkContext._ - object LocalPi { def main(args: Array[String]) { var count = 0 for (i <- 1 to 100000) { val x = random * 2 - 1 val y = random * 2 - 1 - if (x*x + y*y < 1) count += 1 + if (x * x + y * y < 1) count += 1 } println("Pi is roughly " + 4 * count / 100000.0) } diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala index 74620ad007d83..58319cb90e875 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala @@ -17,12 +17,12 @@ package org.apache.spark.examples -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ +import org.apache.spark.{SparkConf, SparkContext} /** * Executes a roll up-style query against Apache logs. - * + * * Usage: LogQuery [logFile] */ object LogQuery { @@ -55,6 +55,7 @@ object LogQuery { /** Tracks the total query count and number of aggregate bytes for a particular group. */ class Stats(val count: Int, val numBytes: Int) extends Serializable { def merge(other: Stats) = new Stats(count + other.count, numBytes + other.numBytes) + override def toString = "bytes=%s\tn=%s".format(numBytes, count) } @@ -77,8 +78,9 @@ object LogQuery { dataSet.map(line => (extractKey(line), extractStats(line))) .reduceByKey((a, b) => a.merge(b)) - .collect().foreach{ - case (user, query) => println("%s\t%s".format(user, query))} + .collect().foreach { + case (user, query) => println("%s\t%s".format(user, query)) + } sc.stop() } diff --git a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala index 2a5c0c0defe13..9a0c51b09b5d2 100644 --- a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala @@ -21,8 +21,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** - * Usage: MultiBroadcastTest [slices] [numElem] - */ + * Usage: MultiBroadcastTest [slices] [numElem] + */ object MultiBroadcastTest { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala index 5291ab81f459e..73919a83a1bdf 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala @@ -19,27 +19,27 @@ package org.apache.spark.examples import java.util.Random -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ +import org.apache.spark.{SparkConf, SparkContext} /** - * Usage: SimpleSkewedGroupByTest [numMappers] [numKVPairs] [valSize] [numReducers] [ratio] - */ + * Usage: SimpleSkewedGroupByTest [numMappers] [numKVPairs] [valSize] [numReducers] [ratio] + */ object SimpleSkewedGroupByTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("SimpleSkewedGroupByTest") - var numMappers = if (args.length > 0) args(0).toInt else 2 - var numKVPairs = if (args.length > 1) args(1).toInt else 1000 - var valSize = if (args.length > 2) args(2).toInt else 1000 - var numReducers = if (args.length > 3) args(3).toInt else numMappers - var ratio = if (args.length > 4) args(4).toInt else 5.0 + val numMappers = if (args.length > 0) args(0).toInt else 2 + val numKVPairs = if (args.length > 1) args(1).toInt else 1000 + val valSize = if (args.length > 2) args(2).toInt else 1000 + val numReducers = if (args.length > 3) args(3).toInt else numMappers + val ratio = if (args.length > 4) args(4).toInt else 5.0 val sc = new SparkContext(sparkConf) val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random - var result = new Array[(Int, Array[Byte])](numKVPairs) + val result = new Array[(Int, Array[Byte])](numKVPairs) for (i <- 0 until numKVPairs) { val byteArr = new Array[Byte](valSize) ranGen.nextBytes(byteArr) @@ -49,14 +49,14 @@ object SimpleSkewedGroupByTest { result(i) = (offset, byteArr) } else { // generate a key for one of the other reducers - val key = 1 + ranGen.nextInt(numReducers-1) + offset + val key = 1 + ranGen.nextInt(numReducers - 1) + offset result(i) = (key, byteArr) } } result - }.cache + }.cache() // Enforce that everything has been calculated and in cache - pairs1.count + pairs1.count() println("RESULT: " + pairs1.groupByKey(numReducers).count) // Print how many keys each reducer got (for debugging) diff --git a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala index 017d4e1e5ce13..d47c9bc859fda 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala @@ -19,29 +19,29 @@ package org.apache.spark.examples import java.util.Random -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ +import org.apache.spark.{SparkConf, SparkContext} /** - * Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers] - */ + * Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers] + */ object SkewedGroupByTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("GroupBy Test") - var numMappers = if (args.length > 0) args(0).toInt else 2 + val numMappers = if (args.length > 0) args(0).toInt else 2 var numKVPairs = if (args.length > 1) args(1).toInt else 1000 - var valSize = if (args.length > 2) args(2).toInt else 1000 - var numReducers = if (args.length > 3) args(3).toInt else numMappers + val valSize = if (args.length > 2) args(2).toInt else 1000 + val numReducers = if (args.length > 3) args(3).toInt else numMappers val sc = new SparkContext(sparkConf) val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random - // map output sizes lineraly increase from the 1st to the last + // map output sizes liberally increase from the 1st to the last numKVPairs = (1.0 * (p + 1) / numMappers * numKVPairs).toInt - var arr1 = new Array[(Int, Array[Byte])](numKVPairs) + val arr1 = new Array[(Int, Array[Byte])](numKVPairs) for (i <- 0 until numKVPairs) { val byteArr = new Array[Byte](valSize) ranGen.nextBytes(byteArr) diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala index fde8ffeedf8b4..94d3e2c15de4d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala @@ -17,14 +17,13 @@ package org.apache.spark.examples -import scala.math.sqrt - import cern.colt.matrix._ import cern.colt.matrix.linalg._ import cern.jet.math._ - import org.apache.spark._ +import scala.math.sqrt + /** * Alternating least squares matrix factorization. * @@ -33,9 +32,12 @@ import org.apache.spark._ */ object SparkALS { // Parameters set through command line arguments - var M = 0 // Number of movies - var U = 0 // Number of users - var F = 0 // Number of features + var M = 0 + // Number of movies + var U = 0 + // Number of users + var F = 0 + // Number of features var ITERATIONS = 0 val LAMBDA = 0.01 // Regularization coefficient @@ -53,8 +55,7 @@ object SparkALS { } def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D], - us: Array[DoubleMatrix1D]): Double = - { + us: Array[DoubleMatrix1D]): Double = { val r = factory2D.make(M, U) for (i <- 0 until M; j <- 0 until U) { r.set(i, j, blas.ddot(ms(i), us(j))) @@ -65,8 +66,7 @@ object SparkALS { } def update(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D], - R: DoubleMatrix2D) : DoubleMatrix1D = - { + R: DoubleMatrix2D): DoubleMatrix1D = { val U = us.size val F = us(0).size val XtX = factory2D.make(F, F) @@ -130,18 +130,18 @@ object SparkALS { var us = Array.fill(U)(factory1D.random(F)) // Iteratively update movies then users - val Rc = sc.broadcast(R) + val Rc = sc.broadcast(R) var msb = sc.broadcast(ms) var usb = sc.broadcast(us) for (iter <- 1 to ITERATIONS) { println("Iteration " + iter + ":") ms = sc.parallelize(0 until M, slices) - .map(i => update(i, msb.value(i), usb.value, Rc.value)) - .collect() + .map(i => update(i, msb.value(i), usb.value, Rc.value)) + .collect() msb = sc.broadcast(ms) // Re-broadcast ms because it was updated us = sc.parallelize(0 until U, slices) - .map(i => update(i, usb.value(i), msb.value, algebra.transpose(Rc.value))) - .collect() + .map(i => update(i, usb.value(i), msb.value, algebra.transpose(Rc.value))) + .collect() usb = sc.broadcast(us) // Re-broadcast us because it was updated println("RMSE = " + rmse(R, ms, us)) println() diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index 3258510894372..dde81014c5dcb 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -19,14 +19,13 @@ package org.apache.spark.examples import java.util.Random -import scala.math.exp - -import breeze.linalg.{Vector, DenseVector} +import breeze.linalg.{DenseVector, Vector} import org.apache.hadoop.conf.Configuration - import org.apache.spark._ import org.apache.spark.scheduler.InputFormatInfo +import scala.math.exp + /** * Logistic regression based classification. @@ -35,7 +34,8 @@ import org.apache.spark.scheduler.InputFormatInfo * please refer to org.apache.spark.mllib.classification.LogisticRegression */ object SparkHdfsLR { - val D = 10 // Numer of dimensions + val D = 10 + // Numer of dimensions val rand = new Random(42) case class DataPoint(x: Vector[Double], y: Double) @@ -46,7 +46,8 @@ object SparkHdfsLR { var x = new Array[Double](D) var i = 0 while (i < D) { - x(i) = tok.nextToken.toDouble; i += 1 + x(i) = tok.nextToken.toDouble; + i += 1 } DataPoint(new DenseVector(x), y) } @@ -76,17 +77,17 @@ object SparkHdfsLR { Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath)) )) val lines = sc.textFile(inputPath) - val points = lines.map(parsePoint _).cache() + val points = lines.map(parsePoint).cache() val ITERATIONS = args(1).toInt // Initialize w to a random value - var w = DenseVector.fill(D){2 * rand.nextDouble - 1} + var w = DenseVector.fill(D) {2 * rand.nextDouble - 1} println("Initial w: " + w) for (i <- 1 to ITERATIONS) { println("On iteration " + i) val gradient = points.map { p => - p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y + p.x * (1 / (1 + exp(-p.y * w.dot(p.x))) - 1) * p.y }.reduce(_ + _) w -= gradient } diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala index 48e8d11cdf95b..d57842efe8dbc 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala @@ -17,10 +17,9 @@ package org.apache.spark.examples -import breeze.linalg.{Vector, DenseVector, squaredDistance} - -import org.apache.spark.{SparkConf, SparkContext} +import breeze.linalg.{DenseVector, Vector, squaredDistance} import org.apache.spark.SparkContext._ +import org.apache.spark.{SparkConf, SparkContext} /** * K-means clustering. @@ -69,20 +68,21 @@ object SparkKMeans { val sparkConf = new SparkConf().setAppName("SparkKMeans") val sc = new SparkContext(sparkConf) val lines = sc.textFile(args(0)) - val data = lines.map(parseVector _).cache() + val data = lines.map(parseVector).cache() val K = args(1).toInt val convergeDist = args(2).toDouble val kPoints = data.takeSample(withReplacement = false, K, 42).toArray var tempDist = 1.0 - while(tempDist > convergeDist) { - val closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) + while (tempDist > convergeDist) { + val closest = data.map(p => (closestPoint(p, kPoints), (p, 1))) - val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)} + val pointStats = closest.reduceByKey { case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)} - val newPoints = pointStats.map {pair => - (pair._1, pair._2._1 * (1.0 / pair._2._2))}.collectAsMap() + val newPoints = pointStats.map { pair => + (pair._1, pair._2._1 * (1.0 / pair._2._2)) + }.collectAsMap() tempDist = 0.0 for (i <- 0 until K) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala index fc23308fc4adf..8224404afd87a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala @@ -19,12 +19,11 @@ package org.apache.spark.examples import java.util.Random -import scala.math.exp - -import breeze.linalg.{Vector, DenseVector} - +import breeze.linalg.{DenseVector, Vector} import org.apache.spark._ +import scala.math.exp + /** * Logistic regression based classification. * Usage: SparkLR [slices] @@ -33,9 +32,12 @@ import org.apache.spark._ * please refer to org.apache.spark.mllib.classification.LogisticRegression */ object SparkLR { - val N = 10000 // Number of data points - val D = 10 // Numer of dimensions - val R = 0.7 // Scaling factor + val N = 10000 + // Number of data points + val D = 10 + // Numer of dimensions + val R = 0.7 + // Scaling factor val ITERATIONS = 5 val rand = new Random(42) @@ -43,8 +45,8 @@ object SparkLR { def generateData = { def generatePoint(i: Int) = { - val y = if(i % 2 == 0) -1 else 1 - val x = DenseVector.fill(D){rand.nextGaussian + y * R} + val y = if (i % 2 == 0) -1 else 1 + val x = DenseVector.fill(D) {rand.nextGaussian + y * R} DataPoint(x, y) } Array.tabulate(N)(generatePoint) @@ -68,7 +70,7 @@ object SparkLR { val points = sc.parallelize(generateData, numSlices).cache() // Initialize w to a random value - var w = DenseVector.fill(D){2 * rand.nextDouble - 1} + var w = DenseVector.fill(D) {2 * rand.nextDouble - 1} println("Initial w: " + w) for (i <- 1 to ITERATIONS) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala index 4c7e006da0618..90e1061623f98 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala @@ -39,14 +39,14 @@ object SparkPageRank { val iters = if (args.length > 0) args(1).toInt else 10 val ctx = new SparkContext(sparkConf) val lines = ctx.textFile(args(0), 1) - val links = lines.map{ s => + val links = lines.map { s => val parts = s.split("\\s+") (parts(0), parts(1)) }.distinct().groupByKey().cache() var ranks = links.mapValues(v => 1.0) for (i <- 1 to iters) { - val contribs = links.join(ranks).values.flatMap{ case (urls, rank) => + val contribs = links.join(ranks).values.flatMap { case (urls, rank) => val size = urls.size urls.map(url => (url, rank / size)) } diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala index 9fbb0a800d735..d1a7dba98cb15 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala @@ -17,10 +17,10 @@ package org.apache.spark.examples -import scala.math.random - import org.apache.spark._ +import scala.math.random + /** Computes an approximation to pi */ object SparkPi { def main(args: Array[String]) { @@ -31,7 +31,7 @@ object SparkPi { val count = spark.parallelize(1 to n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 - if (x*x + y*y < 1) 1 else 0 + if (x * x + y * y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) spark.stop() diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala index f7f83086df3db..82bb3da6b52f4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala @@ -17,11 +17,11 @@ package org.apache.spark.examples -import scala.util.Random -import scala.collection.mutable - -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ +import org.apache.spark.{SparkConf, SparkContext} + +import scala.collection.mutable +import scala.util.Random /** * Transitive closure on a graph. diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala index 96d13612e46dd..2d47de82feb1e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala @@ -19,33 +19,34 @@ package org.apache.spark.examples import java.util.Random -import scala.math.exp - -import breeze.linalg.{Vector, DenseVector} +import breeze.linalg.{DenseVector, Vector} import org.apache.hadoop.conf.Configuration - import org.apache.spark._ import org.apache.spark.scheduler.InputFormatInfo import org.apache.spark.storage.StorageLevel +import scala.math.exp + /** * Logistic regression based classification. * This example uses Tachyon to persist rdds during computation. */ object SparkTachyonHdfsLR { - val D = 10 // Numer of dimensions + val D = 10 + // Numer of dimensions val rand = new Random(42) case class DataPoint(x: Vector[Double], y: Double) def parsePoint(line: String): DataPoint = { val tok = new java.util.StringTokenizer(line, " ") - var y = tok.nextToken.toDouble - var x = new Array[Double](D) + val y = tok.nextToken.toDouble + val x = new Array[Double](D) var i = 0 while (i < D) { - x(i) = tok.nextToken.toDouble; i += 1 + x(i) = tok.nextToken.toDouble; + i += 1 } DataPoint(new DenseVector(x), y) } @@ -59,17 +60,17 @@ object SparkTachyonHdfsLR { Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath)) )) val lines = sc.textFile(inputPath) - val points = lines.map(parsePoint _).persist(StorageLevel.OFF_HEAP) + val points = lines.map(parsePoint).persist(StorageLevel.OFF_HEAP) val ITERATIONS = args(1).toInt // Initialize w to a random value - var w = DenseVector.fill(D){2 * rand.nextDouble - 1} + var w = DenseVector.fill(D) {2 * rand.nextDouble - 1} println("Initial w: " + w) for (i <- 1 to ITERATIONS) { println("On iteration " + i) val gradient = points.map { p => - p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y + p.x * (1 / (1 + exp(-p.y * w.dot(p.x))) - 1) * p.y }.reduce(_ + _) w -= gradient } diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala index 7743f7968b100..519d7bc70aeb7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala @@ -17,14 +17,14 @@ package org.apache.spark.examples -import scala.math.random - import org.apache.spark._ import org.apache.spark.storage.StorageLevel +import scala.math.random + /** - * Computes an approximation to pi - * This example uses Tachyon to persist rdds during computation. + * Computes an approximation to pi + * This example uses Tachyon to persist rdds during computation. */ object SparkTachyonPi { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala index e06f4dcd54442..6ca3909119b64 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala @@ -17,23 +17,15 @@ package org.apache.spark.examples.bagel +import com.esotericsoftware.kryo._ import org.apache.spark._ -import org.apache.spark.SparkContext._ -import org.apache.spark.serializer.KryoRegistrator - import org.apache.spark.bagel._ -import org.apache.spark.bagel.Bagel._ - -import scala.collection.mutable.ArrayBuffer - -import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream} - -import com.esotericsoftware.kryo._ +import org.apache.spark.serializer.KryoRegistrator class PageRankUtils extends Serializable { - def computeWithCombiner(numVertices: Long, epsilon: Double)( - self: PRVertex, messageSum: Option[Double], superstep: Int - ): (PRVertex, Array[PRMessage]) = { + def computeWithCombiner(numVertices: Long, epsilon: Double) + (self: PRVertex, messageSum: Option[Double], superstep: Int) + : (PRVertex, Array[PRMessage]) = { val newValue = messageSum match { case Some(msgSum) if msgSum != 0 => 0.15 / numVertices + 0.85 * msgSum @@ -53,7 +45,7 @@ class PageRankUtils extends Serializable { } def computeNoCombiner(numVertices: Long, epsilon: Double) - (self: PRVertex, messages: Option[Array[PRMessage]], superstep: Int) + (self: PRVertex, messages: Option[Array[PRMessage]], superstep: Int) : (PRVertex, Array[PRMessage]) = computeWithCombiner(numVertices, epsilon)(self, messages match { case Some(msgs) => Some(msgs.map(_.value).sum) @@ -62,10 +54,10 @@ class PageRankUtils extends Serializable { } class PRCombiner extends Combiner[PRMessage, Double] with Serializable { - def createCombiner(msg: PRMessage): Double = - msg.value - def mergeMsg(combiner: Double, msg: PRMessage): Double = - combiner + msg.value + def createCombiner(msg: PRMessage): Double = msg.value + + def mergeMsg(combiner: Double, msg: PRMessage): Double = combiner + msg.value + def mergeCombiners(a: Double, b: Double): Double = a + b } @@ -82,7 +74,7 @@ class PRVertex() extends Vertex with Serializable { this.active = active } - override def toString(): String = { + override def toString: String = { "PRVertex(value=%f, outEdges.length=%d, active=%s)" .format(value, outEdges.length, active.toString) } diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala index e4db3ec51313d..d48a4bd68ac51 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala @@ -17,12 +17,11 @@ package org.apache.spark.examples.bagel -import org.apache.spark._ import org.apache.spark.SparkContext._ - +import org.apache.spark._ import org.apache.spark.bagel._ -import scala.xml.{XML,NodeSeq} +import scala.xml.{NodeSeq, XML} /** * Run PageRank on XML Wikipedia dumps from http://wiki.freebase.com/wiki/WEX. Uses the "articles" @@ -36,10 +35,11 @@ object WikipediaPageRank { "Usage: WikipediaPageRank ") System.exit(-1) } + val sparkConf = new SparkConf() sparkConf.setAppName("WikipediaPageRank") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - sparkConf.set("spark.kryo.registrator", classOf[PRKryoRegistrator].getName) + sparkConf.set("spark.kryo.registrator", classOf[PRKryoRegistrator].getName) val inputFile = args(0) val threshold = args(1).toDouble @@ -69,7 +69,7 @@ object WikipediaPageRank { } catch { case e: org.xml.sax.SAXParseException => System.err.println("Article \"" + title + "\" has malformed XML in body:\n" + body) - NodeSeq.Empty + NodeSeq.Empty } } val outEdges = links.map(link => new String(link.text)).toArray @@ -88,18 +88,17 @@ object WikipediaPageRank { val messages = sc.parallelize(Array[(String, PRMessage)]()) val utils = new PageRankUtils val result = - Bagel.run( - sc, vertices, messages, combiner = new PRCombiner(), - numPartitions = numPartitions)( + Bagel.run( + sc, vertices, messages, combiner = new PRCombiner(), + numPartitions = numPartitions)( utils.computeWithCombiner(numVertices, epsilon)) // Print the result System.err.println("Articles with PageRank >= " + threshold + ":") - val top = - (result - .filter { case (id, vertex) => vertex.value >= threshold } - .map { case (id, vertex) => "%s\t%s\n".format(id, vertex.value) } - .collect().mkString) + val top = result.filter { case (id, vertex) => vertex.value >= threshold} + .map { case (id, vertex) => "%s\t%s\n".format(id, vertex.value)} + .collect().mkString + println(top) sc.stop() diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala index 576a3e371b993..3bbb8400afdce 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala @@ -17,18 +17,17 @@ package org.apache.spark.examples.bagel -import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream} +import java.io.{DataInputStream, DataOutputStream, InputStream, OutputStream} import java.nio.ByteBuffer -import scala.collection.mutable.ArrayBuffer -import scala.xml.{XML, NodeSeq} - -import org.apache.spark._ -import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} import org.apache.spark.SparkContext._ +import org.apache.spark._ import org.apache.spark.rdd.RDD +import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} +import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag +import scala.xml.{NodeSeq, XML} object WikipediaPageRankStandalone { def main(args: Array[String]) { @@ -53,9 +52,9 @@ object WikipediaPageRankStandalone { val partitioner = new HashPartitioner(sc.defaultParallelism) val links = if (usePartitioner) { - input.map(parseArticle _).partitionBy(partitioner).cache() + input.map(parseArticle).partitionBy(partitioner).cache() } else { - input.map(parseArticle _).cache() + input.map(parseArticle).cache() } val n = links.count() val defaultRank = 1.0 / n @@ -69,11 +68,9 @@ object WikipediaPageRankStandalone { // Print the result System.err.println("Articles with PageRank >= " + threshold + ":") - val top = - (ranks - .filter { case (id, rank) => rank >= threshold } - .map { case (id, rank) => "%s\t%s\n".format(id, rank) } - .collect().mkString) + val top = ranks.filter { case (id, rank) => rank >= threshold} + .map { case (id, rank) => "%s\t%s\n".format(id, rank)}.collect().mkString + println(top) val time = (System.currentTimeMillis - startTime) / 1000.0 @@ -95,7 +92,7 @@ object WikipediaPageRankStandalone { } catch { case e: org.xml.sax.SAXParseException => System.err.println("Article \"" + title + "\" has malformed XML in body:\n" + body) - NodeSeq.Empty + NodeSeq.Empty } } val outEdges = links.map(link => new String(link.text)).toArray @@ -112,16 +109,16 @@ object WikipediaPageRankStandalone { usePartitioner: Boolean, numPartitions: Int ): RDD[(String, Double)] = { - var ranks = links.mapValues { edges => defaultRank } + var ranks = links.mapValues { edges => defaultRank} for (i <- 1 to numIterations) { val contribs = links.groupWith(ranks).flatMap { case (id, (linksWrapperIterable, rankWrapperIterable)) => val linksWrapper = linksWrapperIterable.iterator val rankWrapper = rankWrapperIterable.iterator if (linksWrapper.hasNext) { - val linksWrapperHead = linksWrapper.next + val linksWrapperHead = linksWrapper.next() if (rankWrapper.hasNext) { - val rankWrapperHead = rankWrapper.next + val rankWrapperHead = rankWrapper.next() linksWrapperHead.map(dest => (dest, rankWrapperHead / linksWrapperHead.size)) } else { linksWrapperHead.map(dest => (dest, defaultRank / linksWrapperHead.size)) @@ -130,11 +127,11 @@ object WikipediaPageRankStandalone { Array[(String, Double)]() } } - ranks = (contribs.combineByKey((x: Double) => x, - (x: Double, y: Double) => x + y, - (x: Double, y: Double) => x + y, - partitioner) - .mapValues(sum => a/n + (1-a)*sum)) + ranks = contribs.combineByKey((x: Double) => x, + (x: Double, y: Double) => x + y, + (x: Double, y: Double) => x + y, + partitioner) + .mapValues(sum => a / n + (1 - a) * sum) } ranks } @@ -195,8 +192,13 @@ class WPRSerializationStream(os: OutputStream) extends SerializationStream { } } - def flush() { dos.flush() } - def close() { dos.close() } + def flush() { + dos.flush() + } + + def close() { + dos.close() + } } class WPRDeserializationStream(is: InputStream) extends DeserializationStream { @@ -224,9 +226,11 @@ class WPRDeserializationStream(is: InputStream) extends DeserializationStream { val id = dis.readUTF() val rank = dis.readDouble() (id, rank).asInstanceOf[T] - } + } } } - def close() { dis.close() } + def close() { + dis.close() + } } diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala index c4317a6aec798..025f0eb56e0b3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala @@ -17,12 +17,13 @@ package org.apache.spark.examples.graphx -import scala.collection.mutable import org.apache.spark._ -import org.apache.spark.storage.StorageLevel +import org.apache.spark.graphx.PartitionStrategy._ import org.apache.spark.graphx._ import org.apache.spark.graphx.lib._ -import org.apache.spark.graphx.PartitionStrategy._ +import org.apache.spark.storage.StorageLevel + +import scala.collection.mutable /** * Driver program for running graph algorithms. @@ -40,7 +41,7 @@ object Analytics extends Logging { val fname = args(1) val optionsList = args.drop(2).map { arg => arg.dropWhile(_ == '-').split('=') match { - case Array(opt, v) => (opt -> v) + case Array(opt, v) => opt -> v case _ => throw new IllegalArgumentException("Invalid argument: " + arg) } } @@ -67,11 +68,11 @@ object Analytics extends Logging { sys.exit(1) } val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy") - .map(pickPartitioner(_)) + .map(pickPartitioner) val edgeStorageLevel = options.remove("edgeStorageLevel") - .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY) + .map(StorageLevel.fromString).getOrElse(StorageLevel.MEMORY_ONLY) val vertexStorageLevel = options.remove("vertexStorageLevel") - .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY) + .map(StorageLevel.fromString).getOrElse(StorageLevel.MEMORY_ONLY) taskType match { case "pagerank" => @@ -107,7 +108,7 @@ object Analytics extends Logging { if (!outFname.isEmpty) { logWarning("Saving pageranks of pages to " + outFname) - pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname) + pr.map { case (id, r) => id + "\t" + r}.saveAsTextFile(outFname) } sc.stop() @@ -129,7 +130,7 @@ object Analytics extends Logging { val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) val cc = ConnectedComponents.run(graph) - println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct()) + println("Components: " + cc.vertices.map { case (vid, data) => data}.distinct()) sc.stop() case "triangles" => @@ -147,7 +148,7 @@ object Analytics extends Logging { minEdgePartitions = numEPart, edgeStorageLevel = edgeStorageLevel, vertexStorageLevel = vertexStorageLevel) - // TriangleCount requires the graph to be partitioned + // TriangleCount requires the graph to be partitioned .partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache() val triangles = TriangleCount.run(graph) println("Triangles: " + triangles.vertices.map { diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala index e809a65b79975..4425bbe6343f0 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala @@ -14,14 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.spark.examples.graphx -import org.apache.spark.SparkContext._ -import org.apache.spark._ -import org.apache.spark.graphx._ - - /** * Uses GraphX to run PageRank on a LiveJournal social network graph. Download the dataset from * http://snap.stanford.edu/data/soc-LiveJournal1.html. diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala index 5f35a5836462e..6744a9321bbef 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala @@ -17,11 +17,12 @@ package org.apache.spark.examples.graphx +import java.io.{FileOutputStream, PrintWriter} + import org.apache.spark.SparkContext._ import org.apache.spark.graphx.PartitionStrategy -import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.graphx.util.GraphGenerators -import java.io.{PrintWriter, FileOutputStream} +import org.apache.spark.{SparkConf, SparkContext} /** * The SynthBenchmark application can be used to run various GraphX algorithms on @@ -50,7 +51,7 @@ object SynthBenchmark { val options = args.map { arg => arg.dropWhile(_ == '-').split('=') match { - case Array(opt, v) => (opt -> v) + case Array(opt, v) => opt -> v case _ => throw new IllegalArgumentException("Invalid argument: " + arg) } } @@ -116,15 +117,15 @@ object SynthBenchmark { println(s"Total PageRank = $totalPR") } else if (app == "cc") { println("Running Connected Components") - val numComponents = graph.connectedComponents.vertices.map(_._2).distinct().count() + val numComponents = graph.connectedComponents().vertices.map(_._2).distinct().count() println(s"Number of components = $numComponents") } - val runTime = System.currentTimeMillis() - startTime + val runTime = System.currentTimeMillis() - startTime println(s"Num Vertices = $numVertices") println(s"Num Edges = $numEdges") - println(s"Creation time = ${loadTime/1000.0} seconds") - println(s"Run time = ${runTime/1000.0} seconds") + println(s"Creation time = ${loadTime / 1000.0} seconds") + println(s"Run time = ${runTime / 1000.0} seconds") sc.stop() } diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala index 1edd2432a0352..62382f89e23a9 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala @@ -18,13 +18,12 @@ package org.apache.spark.examples.mllib import org.apache.log4j.{Level, Logger} -import scopt.OptionParser - -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, SVMWithSGD} import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics +import org.apache.spark.mllib.optimization.{L1Updater, SquaredL2Updater} import org.apache.spark.mllib.util.MLUtils -import org.apache.spark.mllib.optimization.{SquaredL2Updater, L1Updater} +import org.apache.spark.{SparkConf, SparkContext} +import scopt.OptionParser /** * An example app for binary classification. Run with @@ -46,8 +45,8 @@ object BinaryClassification { val L1, L2 = Value } - import Algorithm._ - import RegType._ + import org.apache.spark.examples.mllib.BinaryClassification.Algorithm._ + import org.apache.spark.examples.mllib.BinaryClassification.RegType._ case class Params( input: String = null, @@ -67,7 +66,7 @@ object BinaryClassification { .action((x, c) => c.copy(numIterations = x)) opt[Double]("stepSize") .text("initial step size (ignored by logistic regression), " + - s"default: ${defaultParams.stepSize}") + s"default: ${defaultParams.stepSize}") .action((x, c) => c.copy(stepSize = x)) opt[String]("algorithm") .text(s"algorithm (${Algorithm.values.mkString(",")}), " + diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala index e49129c4e7844..3c6b2d123b947 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala @@ -17,11 +17,10 @@ package org.apache.spark.examples.mllib -import scopt.OptionParser - import org.apache.spark.mllib.stat.Statistics import org.apache.spark.mllib.util.MLUtils import org.apache.spark.{SparkConf, SparkContext} +import scopt.OptionParser /** @@ -48,18 +47,18 @@ object Correlations { .action((x, c) => c.copy(input = x)) note( """ - |For example, the following command runs this app on a synthetic dataset: - | - | bin/spark-submit --class org.apache.spark.examples.mllib.Correlations \ - | examples/target/scala-*/spark-examples-*.jar \ - | --input data/mllib/sample_linear_regression_data.txt + |For example, the following command runs this app on a synthetic dataset: + | + | bin/spark-submit --class org.apache.spark.examples.mllib.Correlations \ + | examples/target/scala-*/spark-examples-*.jar \ + | --input data/mllib/sample_linear_regression_data.txt """.stripMargin) } parser.parse(args, defaultParams).map { params => run(params) } getOrElse { - sys.exit(1) + sys.exit(1) } } diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/CosineSimilarity.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/CosineSimilarity.scala index cb1abbd18fd4d..df69842172f68 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/CosineSimilarity.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/CosineSimilarity.scala @@ -17,12 +17,11 @@ package org.apache.spark.examples.mllib -import scopt.OptionParser - import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.distributed.{MatrixEntry, RowMatrix} import org.apache.spark.{SparkConf, SparkContext} +import scopt.OptionParser /** * Compute the similar columns of a matrix, using cosine similarity. @@ -42,6 +41,7 @@ import org.apache.spark.{SparkConf, SparkContext} * --threshold 0.1 data/mllib/sample_svm_data.txt */ object CosineSimilarity { + case class Params(inputFile: String = null, threshold: Double = 0.1) extends AbstractParams[Params] @@ -92,8 +92,8 @@ object CosineSimilarity { // Compute similar columns with estimation using DIMSUM val approx = mat.columnSimilarities(params.threshold) - val exactEntries = exact.entries.map { case MatrixEntry(i, j, u) => ((i, j), u) } - val approxEntries = approx.entries.map { case MatrixEntry(i, j, v) => ((i, j), v) } + val exactEntries = exact.entries.map { case MatrixEntry(i, j, u) => ((i, j), u)} + val approxEntries = approx.entries.map { case MatrixEntry(i, j, v) => ((i, j), v)} val MAE = exactEntries.leftOuterJoin(approxEntries).values.map { case (u, Some(v)) => math.abs(u - v) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala index 11e35598baf50..1666c87b13119 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala @@ -18,11 +18,10 @@ package org.apache.spark.examples.mllib import org.apache.log4j.{Level, Logger} -import scopt.OptionParser - -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.{SparkConf, SparkContext} +import scopt.OptionParser /** * An example k-means app. Run with diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegression.scala index e1f9622350135..eca6e63cf89dd 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegression.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegression.scala @@ -18,12 +18,11 @@ package org.apache.spark.examples.mllib import org.apache.log4j.{Level, Logger} -import scopt.OptionParser - -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.mllib.optimization.{L1Updater, SimpleUpdater, SquaredL2Updater} import org.apache.spark.mllib.regression.LinearRegressionWithSGD import org.apache.spark.mllib.util.MLUtils -import org.apache.spark.mllib.optimization.{SimpleUpdater, SquaredL2Updater, L1Updater} +import org.apache.spark.{SparkConf, SparkContext} +import scopt.OptionParser /** * An example app for linear regression. Run with @@ -40,14 +39,14 @@ object LinearRegression extends App { val NONE, L1, L2 = Value } - import RegType._ + import org.apache.spark.examples.mllib.LinearRegression.RegType._ case class Params( - input: String = null, - numIterations: Int = 100, - stepSize: Double = 1.0, - regType: RegType = L2, - regParam: Double = 0.1) extends AbstractParams[Params] + input: String = null, + numIterations: Int = 100, + stepSize: Double = 1.0, + regType: RegType = L2, + regParam: Double = 0.1) extends AbstractParams[Params] val defaultParams = Params() diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala index fc6678013b932..f192882b2acdd 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala @@ -17,17 +17,16 @@ package org.apache.spark.examples.mllib -import scala.collection.mutable - import com.esotericsoftware.kryo.Kryo import org.apache.log4j.{Level, Logger} -import scopt.OptionParser - -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating} import org.apache.spark.rdd.RDD -import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator} +import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer} +import org.apache.spark.{SparkConf, SparkContext} +import scopt.OptionParser + +import scala.collection.mutable /** * An example app for ALS on MovieLens data (http://grouplens.org/datasets/movielens/). @@ -188,7 +187,7 @@ object MovieLensALS { def mapPredictedRating(r: Double) = if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product))) - val predictionsAndRatings = predictions.map{ x => + val predictionsAndRatings = predictions.map { x => ((x.user, x.product), mapPredictedRating(x.rating)) }.join(data.map(x => ((x.user, x.product), x.rating))).values math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean()) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala index 6e4e2d07f284b..e8d8930f52834 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala @@ -17,12 +17,11 @@ package org.apache.spark.examples.mllib -import scopt.OptionParser - import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer import org.apache.spark.mllib.util.MLUtils import org.apache.spark.{SparkConf, SparkContext} +import scopt.OptionParser /** @@ -49,18 +48,18 @@ object MultivariateSummarizer { .action((x, c) => c.copy(input = x)) note( """ - |For example, the following command runs this app on a synthetic dataset: - | - | bin/spark-submit --class org.apache.spark.examples.mllib.MultivariateSummarizer \ - | examples/target/scala-*/spark-examples-*.jar \ - | --input data/mllib/sample_linear_regression_data.txt + |For example, the following command runs this app on a synthetic dataset: + | + | bin/spark-submit --class org.apache.spark.examples.mllib.MultivariateSummarizer \ + | examples/target/scala-*/spark-examples-*.jar \ + | --input data/mllib/sample_linear_regression_data.txt """.stripMargin) } parser.parse(args, defaultParams).map { params => run(params) } getOrElse { - sys.exit(1) + sys.exit(1) } } diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala index 924b586e3af99..cb4127fc5ab90 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala @@ -19,7 +19,6 @@ package org.apache.spark.examples.mllib import org.apache.spark.mllib.random.RandomRDDs import org.apache.spark.rdd.RDD - import org.apache.spark.{SparkConf, SparkContext} /** @@ -44,13 +43,13 @@ object RandomRDDGeneration { println(s"Generated RDD of ${normalRDD.count()}" + " examples sampled from the standard normal distribution") println(" First 5 samples:") - normalRDD.take(5).foreach( x => println(s" $x") ) + normalRDD.take(5).foreach(x => println(s" $x")) // Example: RandomRDDs.normalVectorRDD val normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows = numExamples, numCols = 2) println(s"Generated RDD of ${normalVectorRDD.count()} examples of length-2 vectors.") println(" First 5 samples:") - normalVectorRDD.take(5).foreach( x => println(s" $x") ) + normalVectorRDD.take(5).foreach(x => println(s" $x")) println() diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala index 663c12734af68..e6dda329a9c42 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala @@ -17,11 +17,10 @@ package org.apache.spark.examples.mllib +import org.apache.spark.SparkContext._ import org.apache.spark.mllib.util.MLUtils -import scopt.OptionParser - import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.SparkContext._ +import scopt.OptionParser /** * An example app for randomly generated and sampled RDDs. Run with @@ -45,10 +44,10 @@ object SampledRDDs { .action((x, c) => c.copy(input = x)) note( """ - |For example, the following command runs this app: - | - | bin/spark-submit --class org.apache.spark.examples.mllib.SampledRDDs \ - | examples/target/scala-*/spark-examples-*.jar + |For example, the following command runs this app: + | + | bin/spark-submit --class org.apache.spark.examples.mllib.SampledRDDs \ + | examples/target/scala-*/spark-examples-*.jar """.stripMargin) } @@ -83,7 +82,7 @@ object SampledRDDs { println() // Example: RDD.sampleByKey() and RDD.sampleByKeyExact() - val keyedRDD = examples.map { lp => (lp.label.toInt, lp.features) } + val keyedRDD = examples.map { lp => (lp.label.toInt, lp.features)} println(s" Keyed data using label (Int) as key ==> Orig") // Count examples per label in original data. val keyCounts = keyedRDD.countByKey() diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala index f1ff4e6911f5e..d0638e1407151 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala @@ -18,11 +18,10 @@ package org.apache.spark.examples.mllib import org.apache.log4j.{Level, Logger} -import scopt.OptionParser - -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.classification.NaiveBayes import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.{SparkConf, SparkContext} +import scopt.OptionParser /** * An example naive Bayes app. Run with diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala index c5bd5b0b178d9..0218ac2d6ffcc 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala @@ -17,9 +17,9 @@ package org.apache.spark.examples.mllib +import org.apache.spark.SparkConf import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.{LabeledPoint, StreamingLinearRegressionWithSGD} -import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** @@ -35,8 +35,8 @@ import org.apache.spark.streaming.{Seconds, StreamingContext} * * To run on your local machine using the two directories `trainingDir` and `testDir`, * with updates every 5 seconds, and 2 features per data point, call: - * $ bin/run-example \ - * org.apache.spark.examples.mllib.StreamingLinearRegression trainingDir testDir 5 2 + * $ bin/run-example \ + * org.apache.spark.examples.mllib.StreamingLinearRegression trainingDir testDir 5 2 * * As you add text files to `trainingDir` the model will continuously update. * Anytime you add text files to `testDir`, you'll see predictions from the current model. diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala index 4d6690318615a..e0e26e4d0bced 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala @@ -17,9 +17,9 @@ package org.apache.spark.examples.mllib -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.distributed.RowMatrix +import org.apache.spark.{SparkConf, SparkContext} /** * Compute the singular value decomposition (SVD) of a tall-and-skinny matrix. diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala index a11890d6f2b1c..1594a1ff6c90b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala @@ -19,15 +19,14 @@ package org.apache.spark.examples.pythonconverters import java.util.{Collection => JCollection, Map => JMap} -import scala.collection.JavaConversions._ - -import org.apache.avro.generic.{GenericFixed, IndexedRecord} -import org.apache.avro.mapred.AvroWrapper import org.apache.avro.Schema import org.apache.avro.Schema.Type._ - -import org.apache.spark.api.python.Converter +import org.apache.avro.generic.{GenericFixed, IndexedRecord} +import org.apache.avro.mapred.AvroWrapper import org.apache.spark.SparkException +import org.apache.spark.api.python.Converter + +import scala.collection.JavaConversions._ object AvroConversionUtil extends Serializable { @@ -36,21 +35,21 @@ object AvroConversionUtil extends Serializable { return null } schema.getType match { - case UNION => unpackUnion(obj, schema) - case ARRAY => unpackArray(obj, schema) - case FIXED => unpackFixed(obj, schema) - case MAP => unpackMap(obj, schema) - case BYTES => unpackBytes(obj) - case RECORD => unpackRecord(obj) - case STRING => obj.toString - case ENUM => obj.toString - case NULL => obj + case UNION => unpackUnion(obj, schema) + case ARRAY => unpackArray(obj, schema) + case FIXED => unpackFixed(obj, schema) + case MAP => unpackMap(obj, schema) + case BYTES => unpackBytes(obj) + case RECORD => unpackRecord(obj) + case STRING => obj.toString + case ENUM => obj.toString + case NULL => obj case BOOLEAN => obj - case DOUBLE => obj - case FLOAT => obj - case INT => obj - case LONG => obj - case other => throw new SparkException( + case DOUBLE => obj + case FLOAT => obj + case INT => obj + case LONG => obj + case other => throw new SparkException( s"Unknown Avro schema type ${other.getName}") } } @@ -116,7 +115,7 @@ object AvroConversionUtil extends Serializable { * Implementation of [[org.apache.spark.api.python.Converter]] that converts * an Avro IndexedRecord (e.g., derived from AvroParquetInputFormat) to a Java Map. */ -class IndexedRecordToJavaConverter extends Converter[IndexedRecord, JMap[String, Any]]{ +class IndexedRecordToJavaConverter extends Converter[IndexedRecord, JMap[String, Any]] { override def convert(record: IndexedRecord): JMap[String, Any] = { if (record == null) { return null diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala index 83feb5703b908..273e5f5f0423a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala @@ -17,10 +17,12 @@ package org.apache.spark.examples.pythonconverters -import org.apache.spark.api.python.Converter import java.nio.ByteBuffer + import org.apache.cassandra.utils.ByteBufferUtil -import collection.JavaConversions._ +import org.apache.spark.api.python.Converter + +import scala.collection.JavaConversions._ /** diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala index 273bee0a8b30f..f0270389a4ede 100644 --- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala @@ -17,12 +17,12 @@ package org.apache.spark.examples.pythonconverters -import scala.collection.JavaConversions._ - -import org.apache.spark.api.python.Converter import org.apache.hadoop.hbase.client.{Put, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes +import org.apache.spark.api.python.Converter + +import scala.collection.JavaConversions._ /** * Implementation of [[org.apache.spark.api.python.Converter]] that converts an diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala index 2e98b2dc30b80..c374612d521a3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala @@ -17,8 +17,8 @@ package org.apache.spark.examples.sql -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkConf, SparkContext} // One method for defining the schema of an RDD is to make a case class with the desired column // names and types. diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala index e26f213e8afa8..d14195ffb89a7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala @@ -17,11 +17,12 @@ package org.apache.spark.examples.sql.hive -import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.{SparkConf, SparkContext} object HiveFromSpark { + case class Record(key: Int, value: String) def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index b433082dce1a2..b1d509a03cafe 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -17,29 +17,29 @@ package org.apache.spark.examples.streaming -import scala.collection.mutable.LinkedList -import scala.reflect.ClassTag -import scala.util.Random - import akka.actor.{Actor, ActorRef, Props, actorRef2Scala} - -import org.apache.spark.{SparkConf, SecurityManager} -import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions -import org.apache.spark.util.AkkaUtils import org.apache.spark.streaming.receiver.ActorHelper +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.util.AkkaUtils +import org.apache.spark.{SecurityManager, SparkConf} + +import scala.collection.mutable +import scala.reflect.ClassTag +import scala.util.Random case class SubscribeReceiver(receiverActor: ActorRef) + case class UnsubscribeReceiver(receiverActor: ActorRef) /** * Sends the random content to every receiver subscribed with 1/2 - * second delay. + * second delay. */ class FeederActor extends Actor { val rand = new Random() - var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]() + var receivers: mutable.LinkedList[ActorRef] = new mutable.LinkedList[ActorRef]() val strings: Array[String] = Array("words ", "may ", "count ") @@ -63,12 +63,12 @@ class FeederActor extends Actor { def receive: Receive = { case SubscribeReceiver(receiverActor: ActorRef) => - println("received subscribe from %s".format(receiverActor.toString)) - receivers = LinkedList(receiverActor) ++ receivers + println("received subscribe from %s".format(receiverActor.toString())) + receivers = mutable.LinkedList(receiverActor) ++ receivers case UnsubscribeReceiver(receiverActor: ActorRef) => - println("received unsubscribe from %s".format(receiverActor.toString)) - receivers = receivers.dropWhile(x => x eq receiverActor) + println("received unsubscribe from %s".format(receiverActor.toString())) + receivers = receivers.dropWhile(x => x eq receiverActor) } } @@ -81,11 +81,11 @@ class FeederActor extends Actor { * @see [[org.apache.spark.examples.streaming.FeederActor]] */ class SampleActorReceiver[T: ClassTag](urlOfPublisher: String) -extends Actor with ActorHelper { + extends Actor with ActorHelper { lazy private val remotePublisher = context.actorSelection(urlOfPublisher) - override def preStart = remotePublisher ! SubscribeReceiver(context.self) + override def preStart() = remotePublisher ! SubscribeReceiver(context.self) def receive = { case msg => store(msg.asInstanceOf[T]) @@ -99,12 +99,12 @@ extends Actor with ActorHelper { * A sample feeder actor * * Usage: FeederActor - * and describe the AkkaSystem that Spark Sample feeder would start on. + * and describe the AkkaSystem that Spark Sample feeder would start on. */ object FeederActor { def main(args: Array[String]) { - if(args.length < 2){ + if (args.length < 2) { System.err.println( "Usage: FeederActor \n" ) @@ -127,12 +127,12 @@ object FeederActor { * A sample word count program demonstrating the use of plugging in * Actor as Receiver * Usage: ActorWordCount - * and describe the AkkaSystem that Spark Sample feeder is running on. + * and describe the AkkaSystem that Spark Sample feeder is running on. * * To run this example locally, you may run Feeder Actor as - * `$ bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999` + * `$ bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999` * and then run the example - * `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount 127.0.1.1 9999` + * `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount 127.0.1.1 9999` */ object ActorWordCount { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala index 6bb659fbd8be8..66d70aa2dbcf1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala @@ -17,23 +17,23 @@ package org.apache.spark.examples.streaming -import java.io.{InputStreamReader, BufferedReader, InputStream} +import java.io.{BufferedReader, InputStreamReader} import java.net.Socket -import org.apache.spark.{SparkConf, Logging} import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.{Logging, SparkConf} /** * Custom Receiver that receives data over a socket. Received bytes is interpreted as * text and \n delimited lines are considered as records. They are then counted and printed. * * To run this on your local machine, you need to first run a Netcat server - * `$ nc -lk 9999` + * `$ nc -lk 9999` * and then run the example - * `$ bin/run-example org.apache.spark.examples.streaming.CustomReceiver localhost 9999` + * `$ bin/run-example org.apache.spark.examples.streaming.CustomReceiver localhost 9999` */ object CustomReceiver { def main(args: Array[String]) { @@ -66,38 +66,40 @@ class CustomReceiver(host: String, port: Int) def onStart() { // Start the thread that receives data over a connection new Thread("Socket Receiver") { - override def run() { receive() } + override def run() { + receive() + } }.start() } def onStop() { - // There is nothing much to do as the thread calling receive() - // is designed to stop by itself isStopped() returns false + // There is nothing much to do as the thread calling receive() + // is designed to stop by itself isStopped() returns false } /** Create a socket connection and receive data until receiver is stopped */ private def receive() { - var socket: Socket = null - var userInput: String = null - try { - logInfo("Connecting to " + host + ":" + port) - socket = new Socket(host, port) - logInfo("Connected to " + host + ":" + port) - val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")) - userInput = reader.readLine() - while(!isStopped && userInput != null) { - store(userInput) - userInput = reader.readLine() - } - reader.close() - socket.close() - logInfo("Stopped receiving") - restart("Trying to connect again") - } catch { - case e: java.net.ConnectException => - restart("Error connecting to " + host + ":" + port, e) - case t: Throwable => - restart("Error receiving data", t) - } + var socket: Socket = null + var userInput: String = null + try { + logInfo("Connecting to " + host + ":" + port) + socket = new Socket(host, port) + logInfo("Connected to " + host + ":" + port) + val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, "UTF-8")) + userInput = reader.readLine() + while (!isStopped && userInput != null) { + store(userInput) + userInput = reader.readLine() + } + reader.close() + socket.close() + logInfo("Stopped receiving") + restart("Trying to connect again") + } catch { + case e: java.net.ConnectException => + restart("Error connecting to " + host + ":" + port, e) + case t: Throwable => + restart("Error receiving data", t) + } } } diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala index 20e7df7c45b1b..9e35a97470d76 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala @@ -24,19 +24,19 @@ import org.apache.spark.streaming.flume._ import org.apache.spark.util.IntParam /** - * Produces a count of events received from Flume. + * Produces a count of events received from Flume. * - * This should be used in conjunction with an AvroSink in Flume. It will start - * an Avro server on at the request host:port address and listen for requests. - * Your Flume AvroSink should be pointed to this address. + * This should be used in conjunction with an AvroSink in Flume. It will start + * an Avro server on at the request host:port address and listen for requests. + * Your Flume AvroSink should be pointed to this address. * - * Usage: FlumeEventCount - * is the host the Flume receiver will be started on - a receiver - * creates a server and listens for flume events. - * is the port the Flume receiver will listen on. + * Usage: FlumeEventCount + * is the host the Flume receiver will be started on - a receiver + * creates a server and listens for flume events. + * is the port the Flume receiver will listen on. * - * To run this example: - * `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount ` + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount ` */ object FlumeEventCount { def main(args: Array[String]) { @@ -60,7 +60,7 @@ object FlumeEventCount { val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2) // Print out the count of events received from this server in each batch - stream.count().map(cnt => "Received " + cnt + " flume events." ).print() + stream.count().map(cnt => "Received " + cnt + " flume events.").print() ssc.start() ssc.awaitTermination() diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala index 1cc8c8d5c23b6..8170ddbc8797e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala @@ -18,24 +18,22 @@ package org.apache.spark.examples.streaming import org.apache.spark.SparkConf -import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.flume._ import org.apache.spark.util.IntParam -import java.net.InetSocketAddress /** - * Produces a count of events received from Flume. + * Produces a count of events received from Flume. * - * This should be used in conjunction with the Spark Sink running in a Flume agent. See - * the Spark Streaming programming guide for more details. + * This should be used in conjunction with the Spark Sink running in a Flume agent. See + * the Spark Streaming programming guide for more details. * - * Usage: FlumePollingEventCount - * `host` is the host on which the Spark Sink is running. - * `port` is the port at which the Spark Sink is listening. + * Usage: FlumePollingEventCount + * `host` is the host on which the Spark Sink is running. + * `port` is the port at which the Spark Sink is listening. * - * To run this example: - * `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] ` + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] ` */ object FlumePollingEventCount { def main(args: Array[String]) { @@ -59,7 +57,7 @@ object FlumePollingEventCount { val stream = FlumeUtils.createPollingStream(ssc, host, port) // Print out the count of events received from this server in each batch - stream.count().map(cnt => "Received " + cnt + " flume events." ).print() + stream.count().map(cnt => "Received " + cnt + " flume events.").print() ssc.start() ssc.awaitTermination() diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala index 6c24bc3ad09e0..f6c3687ccb4bf 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala @@ -18,17 +18,17 @@ package org.apache.spark.examples.streaming import org.apache.spark.SparkConf -import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Counts words in new text files created in the given directory * Usage: HdfsWordCount - * is the directory that Spark Streaming will use to find and read new text files. + * is the directory that Spark Streaming will use to find and read new text files. * * To run this on your local machine on directory `localdir`, run this example - * $ bin/run-example \ - * org.apache.spark.examples.streaming.HdfsWordCount localdir + * $ bin/run-example \ + * org.apache.spark.examples.streaming.HdfsWordCount localdir * * Then create a text file in `localdir` and the words in the file will get counted. */ diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala index c9e1511278ede..e91723bbbf6e3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala @@ -20,24 +20,23 @@ package org.apache.spark.examples.streaming import java.util.Properties import kafka.producer._ - -import org.apache.spark.streaming._ +import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ -import org.apache.spark.SparkConf /** * Consumes messages from one or more topics in Kafka and does wordcount. * Usage: KafkaWordCount - * is a list of one or more zookeeper servers that make quorum - * is the name of kafka consumer group - * is a list of one or more kafka topics to consume from - * is the number of threads the kafka consumer should use + * is a list of one or more zookeeper servers that make quorum + * is the name of kafka consumer group + * is a list of one or more kafka topics to consume from + * is the number of threads the kafka consumer should use * * Example: - * `$ bin/run-example \ - * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \ - * my-consumer-group topic1,topic2 1` + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \ + * my-consumer-group topic1,topic2 1` */ object KafkaWordCount { def main(args: Array[String]) { @@ -50,10 +49,10 @@ object KafkaWordCount { val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount") - val ssc = new StreamingContext(sparkConf, Seconds(2)) + val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") - val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap + val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)) @@ -86,7 +85,7 @@ object KafkaWordCountProducer { val producer = new Producer[String, String](config) // Send some messages - while(true) { + while (true) { val messages = (1 to messagesPerSec.toInt).map { messageNum => val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) .mkString(" ") diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala index e4283e04a1b11..b578fe3c516b6 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala @@ -17,14 +17,13 @@ package org.apache.spark.examples.streaming -import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic} -import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence - +import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.mqtt._ -import org.apache.spark.SparkConf +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence +import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic} /** * A simple Mqtt publisher for demonstration purposes, repeatedly publishes @@ -45,7 +44,7 @@ object MQTTPublisher { val Seq(brokerUrl, topic) = args.toSeq try { - var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp") + val peristance: MqttClientPersistence = new MqttDefaultFilePersistence("/tmp") client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance) } catch { case e: MqttException => println("Exception Caught: " + e) @@ -59,9 +58,9 @@ object MQTTPublisher { while (true) { val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes("utf-8")) msgtopic.publish(message) - println("Published data. topic: " + msgtopic.getName() + " Message: " + message) + println("Published data. topic: " + msgtopic.getName + " Message: " + message) } - client.disconnect() + client.disconnect() } } @@ -75,14 +74,14 @@ object MQTTPublisher { * Example Java code for Mqtt Publisher and Subscriber can be found here * https://bitbucket.org/mkjinesh/mqttclient * Usage: MQTTWordCount - * and describe where Mqtt publisher is running. + * and describe where Mqtt publisher is running. * * To run this example locally, you may run publisher as - * `$ bin/run-example \ - * org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo` + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo` * and run the example as - * `$ bin/run-example \ - * org.apache.spark.examples.streaming.MQTTWordCount tcp://localhost:1883 foo` + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.MQTTWordCount tcp://localhost:1883 foo` */ object MQTTWordCount { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala index ae0a08c6cdb1a..496202e0db278 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala @@ -18,9 +18,9 @@ package org.apache.spark.examples.streaming import org.apache.spark.SparkConf -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. @@ -29,9 +29,9 @@ import org.apache.spark.storage.StorageLevel * and describe the TCP server that Spark Streaming would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server - * `$ nc -lk 9999` + * `$ nc -lk 9999` * and then run the example - * `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999` + * `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999` */ object NetworkWordCount { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala index 4caa90659111a..5fe8fd7df6b32 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala @@ -17,12 +17,12 @@ package org.apache.spark.examples.streaming -import scala.collection.mutable.SynchronizedQueue - import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.{Seconds, StreamingContext} + +import scala.collection.mutable object QueueStream { @@ -35,7 +35,7 @@ object QueueStream { // Create the queue through which RDDs can be pushed to // a QueueInputDStream - val rddQueue = new SynchronizedQueue[RDD[Int]]() + val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]() // Create the QueueInputDStream and use it do some processing val inputStream = ssc.queueStream(rddQueue) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala index a9aaa445bccb6..2ead4fb907ecf 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala @@ -29,11 +29,11 @@ import org.apache.spark.util.IntParam * and with Spark using Kryo serialization (set Java property "spark.serializer" to * "org.apache.spark.serializer.KryoSerializer"). * Usage: RawNetworkGrep - * is the number rawNetworkStreams, which should be same as number - * of work nodes in the cluster - * is "localhost". - * is the port on which RawTextSender is running in the worker nodes. - * is the Spark Streaming batch duration in milliseconds. + * is the number rawNetworkStreams, which should be same as number + * of work nodes in the cluster + * is "localhost". + * is the port on which RawTextSender is running in the worker nodes. + * is the Spark Streaming batch duration in milliseconds. */ object RawNetworkGrep { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index 6af3a0f33efc2..92862515a539a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -21,20 +21,19 @@ import java.io.File import java.nio.charset.Charset import com.google.common.io.Files - import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Time, Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.{Seconds, StreamingContext, Time} import org.apache.spark.util.IntParam /** * Counts words in text encoded with UTF8 received from the network every second. * * Usage: NetworkWordCount - * and describe the TCP server that Spark Streaming would connect to receive - * data. directory to HDFS-compatible file system which checkpoint data - * file to which the word counts will be appended + * and describe the TCP server that Spark Streaming would connect to receive + * data. directory to HDFS-compatible file system which checkpoint data + * file to which the word counts will be appended * * In local mode, should be 'local[n]' with n > 1 * and must be absolute paths @@ -42,12 +41,12 @@ import org.apache.spark.util.IntParam * * To run this on your local machine, you need to first run a Netcat server * - * `$ nc -lk 9999` + * `$ nc -lk 9999` * * and run the example as * - * `$ ./bin/run-example org.apache.spark.examples.streaming.RecoverableNetworkWordCount \ - * localhost 9999 ~/checkpoint/ ~/out` + * `$ ./bin/run-example org.apache.spark.examples.streaming.RecoverableNetworkWordCount \ + * localhost 9999 ~/checkpoint/ ~/out` * * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if @@ -56,10 +55,10 @@ import org.apache.spark.util.IntParam * * To run this example in a local standalone cluster with automatic driver recovery, * - * `$ bin/spark-class org.apache.spark.deploy.Client -s launch \ - * \ - * org.apache.spark.examples.streaming.RecoverableNetworkWordCount \ - * localhost 9999 ~/checkpoint ~/out` + * `$ bin/spark-class org.apache.spark.deploy.Client -s launch \ + * \ + * org.apache.spark.examples.streaming.RecoverableNetworkWordCount \ + * localhost 9999 ~/checkpoint ~/out` * * would typically be * /examples/target/scala-XX/spark-examples....jar diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala index a4d159bf38377..20212acfec6a5 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala @@ -18,21 +18,21 @@ package org.apache.spark.examples.streaming import org.apache.spark.SparkConf -import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming._ /** * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every * second. * Usage: StatefulNetworkWordCount - * and describe the TCP server that Spark Streaming would connect to receive - * data. + * and describe the TCP server that Spark Streaming would connect to receive + * data. * * To run this on your local machine, you need to first run a Netcat server - * `$ nc -lk 9999` + * `$ nc -lk 9999` * and then run the example - * `$ bin/run-example - * org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999` + * `$ bin/run-example + * org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999` */ object StatefulNetworkWordCount { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala index 8396e65d0d588..ff9dfc28e2b8c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala @@ -17,9 +17,8 @@ package org.apache.spark.examples.streaming -import org.apache.spark.Logging - import org.apache.log4j.{Level, Logger} +import org.apache.spark.Logging /** Utility functions for Spark Streaming examples. */ object StreamingExamples extends Logging { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala index 683752ac96241..fa0efa9f0fb8c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala @@ -18,13 +18,12 @@ package org.apache.spark.examples.streaming import com.twitter.algebird._ - import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.twitter._ +import org.apache.spark.streaming.{Seconds, StreamingContext} // scalastyle:off /** @@ -99,7 +98,7 @@ object TwitterAlgebirdCMS { if (rdd.count() != 0) { val partialMap = rdd.collect().toMap val partialTopK = rdd.map( - {case (id, count) => (count, id)}) + { case (id, count) => (count, id)}) .sortByKey(ascending = false).take(TOPK) globalExact = mm.plus(globalExact.toMap, partialMap) val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala index 62db5e663b8af..f20236af62dcb 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala @@ -17,13 +17,12 @@ package org.apache.spark.examples.streaming -import com.twitter.algebird.HyperLogLogMonoid import com.twitter.algebird.HyperLogLog._ - +import com.twitter.algebird.HyperLogLogMonoid +import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.twitter._ -import org.apache.spark.SparkConf +import org.apache.spark.streaming.{Seconds, StreamingContext} // scalastyle:off /** diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala index f55d23ab3924b..de9c455f1c1bf 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala @@ -17,11 +17,11 @@ package org.apache.spark.examples.streaming -import org.apache.spark.streaming.{Seconds, StreamingContext} -import StreamingContext._ +import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.twitter._ -import org.apache.spark.SparkConf +import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter @@ -58,25 +58,25 @@ object TwitterPopularTags { val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) - .map{case (topic, count) => (count, topic)} - .transform(_.sortByKey(false)) + .map { case (topic, count) => (count, topic)} + .transform(_.sortByKey(ascending = false)) val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) - .map{case (topic, count) => (count, topic)} - .transform(_.sortByKey(false)) + .map { case (topic, count) => (count, topic)} + .transform(_.sortByKey(ascending = false)) // Print popular hashtags topCounts60.foreachRDD(rdd => { val topList = rdd.take(10) println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) - topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} + topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count))} }) topCounts10.foreachRDD(rdd => { val topList = rdd.take(10) println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) - topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} + topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count))} }) ssc.start() diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala index 79905af381a12..edd1c79e2e762 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala @@ -17,18 +17,15 @@ package org.apache.spark.examples.streaming -import akka.actor.ActorSystem -import akka.actor.actorRef2Scala -import akka.zeromq._ -import akka.zeromq.Subscribe +import akka.actor.{ActorSystem, actorRef2Scala} import akka.util.ByteString - -import org.apache.spark.streaming.{Seconds, StreamingContext} +import akka.zeromq._ +import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.zeromq._ +import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.language.implicitConversions -import org.apache.spark.SparkConf /** * A simple publisher for demonstration purposes, repeatedly publishes random Messages @@ -65,14 +62,14 @@ object SimpleZeroMQPublisher { * (http://www.zeromq.org/intro:get-the-software) * * Usage: ZeroMQWordCount - * and describe where zeroMq publisher is running. + * and describe where zeroMq publisher is running. * * To run this example locally, you may run publisher as - * `$ bin/run-example \ - * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` * and run the example as - * `$ bin/run-example \ - * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo` + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo` */ // scalastyle:on object ZeroMQWordCount { @@ -87,7 +84,7 @@ object ZeroMQWordCount { // Create the context and set the batch size val ssc = new StreamingContext(sparkConf, Seconds(2)) - def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator + def bytesToStringIterator(x: Seq[ByteString]) = x.map(_.utf8String).iterator // For this stream, a zeroMQ publisher should be running. val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala index 8402491b62671..7dad3ddeabee5 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala @@ -17,20 +17,21 @@ package org.apache.spark.examples.streaming.clickstream -import java.net.ServerSocket import java.io.PrintWriter -import util.Random +import java.net.ServerSocket + +import scala.util.Random /** Represents a page view on a website with associated dimension data. */ -class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) - extends Serializable { - override def toString() : String = { +class PageView(val url: String, val status: Int, val zipCode: Int, val userID: Int) + extends Serializable { + override def toString: String = { "%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID) } } object PageView extends Serializable { - def fromString(in : String) : PageView = { + def fromString(in: String): PageView = { val parts = in.split("\t") new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt) } @@ -45,22 +46,22 @@ object PageView extends Serializable { * `$ bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10` * To process the generated stream * `$ bin/run-example \ - * org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444` + * org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444` * */ // scalastyle:on object PageViewGenerator { - val pages = Map("http://foo.com/" -> .7, - "http://foo.com/news" -> 0.2, - "http://foo.com/contact" -> .1) + val pages = Map("http://foo.com/" -> .7, + "http://foo.com/news" -> 0.2, + "http://foo.com/contact" -> .1) val httpStatus = Map(200 -> .95, - 404 -> .05) + 404 -> .05) val userZipCode = Map(94709 -> .5, - 94117 -> .5) - val userID = Map((1 to 100).map(_ -> .01):_*) + 94117 -> .5) + val userID = Map((1 to 100).map(_ -> .01): _*) - def pickFromDistribution[T](inputMap : Map[T, Double]) : T = { + def pickFromDistribution[T](inputMap: Map[T, Double]): T = { val rand = new Random().nextDouble() var total = 0.0 for ((item, prob) <- inputMap) { @@ -72,7 +73,7 @@ object PageViewGenerator { inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0 } - def getNextClickEvent() : String = { + def getNextClickEvent: String = { val id = pickFromDistribution(userID) val page = pickFromDistribution(pages) val status = pickFromDistribution(httpStatus) @@ -80,7 +81,7 @@ object PageViewGenerator { new PageView(page, status, zipCode, id).toString() } - def main(args : Array[String]) { + def main(args: Array[String]) { if (args.length != 2) { System.err.println("Usage: PageViewGenerator ") System.exit(1) @@ -94,9 +95,9 @@ object PageViewGenerator { while (true) { val socket = listener.accept() new Thread() { - override def run = { + override def run() = { println("Got client connected from: " + socket.getInetAddress) - val out = new PrintWriter(socket.getOutputStream(), true) + val out = new PrintWriter(socket.getOutputStream, true) while (true) { Thread.sleep(sleepDelayMs) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala index d9b886eff77cc..a2424f79a92f0 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala @@ -18,9 +18,10 @@ package org.apache.spark.examples.streaming.clickstream import org.apache.spark.SparkContext._ -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.examples.streaming.StreamingExamples +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.{Seconds, StreamingContext} + // scalastyle:off /** Analyses a streaming dataset of web page views. This class demonstrates several types of * operators available in Spark streaming. @@ -30,7 +31,7 @@ import org.apache.spark.examples.streaming.StreamingExamples * `$ bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10` * To process the generated stream * `$ bin/run-example \ - * org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444` + * org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444` */ // scalastyle:on object PageViewStream { @@ -38,7 +39,7 @@ object PageViewStream { if (args.length != 3) { System.err.println("Usage: PageViewStream ") System.err.println(" must be one of pageCounts, slidingPageCounts," + - " errorRatePerZipCode, activeUserCount, popularUsersSeen") + " errorRatePerZipCode, activeUserCount, popularUsersSeen") System.exit(1) } StreamingExamples.setStreamingLogLevels() @@ -52,24 +53,24 @@ object PageViewStream { // Create a NetworkInputDStream on target host:port and convert each line to a PageView val pageViews = ssc.socketTextStream(host, port) - .flatMap(_.split("\n")) - .map(PageView.fromString(_)) + .flatMap(_.split("\n")) + .map(PageView.fromString) // Return a count of views per URL seen in each batch val pageCounts = pageViews.map(view => view.url).countByValue() // Return a sliding window of page views per URL in the last ten seconds val slidingPageCounts = pageViews.map(view => view.url) - .countByValueAndWindow(Seconds(10), Seconds(2)) + .countByValueAndWindow(Seconds(10), Seconds(2)) // Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds val statusesPerZipCode = pageViews.window(Seconds(30), Seconds(2)) - .map(view => ((view.zipCode, view.status))) - .groupByKey() - val errorRatePerZipCode = statusesPerZipCode.map{ - case(zip, statuses) => - val normalCount = statuses.filter(_ == 200).size + .map(view => (view.zipCode, view.status)) + .groupByKey() + val errorRatePerZipCode = statusesPerZipCode.map { + case (zip, statuses) => + val normalCount = statuses.count(_ == 200) val errorCount = statuses.size - normalCount val errorRatio = errorCount.toFloat / statuses.size if (errorRatio > 0.05) { @@ -81,14 +82,14 @@ object PageViewStream { // Return the number unique users in last 15 seconds val activeUserCount = pageViews.window(Seconds(15), Seconds(2)) - .map(view => (view.userID, 1)) - .groupByKey() - .count() - .map("Unique active users: " + _) + .map(view => (view.userID, 1)) + .groupByKey() + .count() + .map("Unique active users: " + _) // An external dataset we want to join to this stream val userList = ssc.sparkContext.parallelize( - Map(1 -> "Patrick Wendell", 2->"Reynold Xin", 3->"Matei Zaharia").toSeq) + Map(1 -> "Patrick Wendell", 2 -> "Reynold Xin", 3 -> "Matei Zaharia").toSeq) metric match { case "pageCounts" => pageCounts.print() @@ -99,9 +100,9 @@ object PageViewStream { // Look for users in our existing dataset and print it out if we have a match pageViews.map(view => (view.userID, 1)) .foreachRDD((rdd, time) => rdd.join(userList) - .map(_._2._2) - .take(10) - .foreach(u => println("Saw user %s at time %s".format(u, time)))) + .map(_._2._2) + .take(10) + .foreach(u => println("Saw user %s at time %s".format(u, time)))) case _ => println("Invalid metric entered: " + metric) }