Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Merge branch 'mesos'

  • Loading branch information...
commit 7c129388fbdc90cb6abb99470545dba8a2e90adf 2 parents 8587844 + 7151e1e
Haitao Yao haitaoyao authored

Showing 88 changed files with 836 additions and 749 deletions. Show diff stats Hide diff stats

  1. +10 10 bagel/src/main/scala/spark/bagel/Bagel.scala
  2. +3 3 bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala
  3. +1 1  bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
  4. +2 2 core/src/main/scala/spark/CacheManager.scala
  5. +2 2 core/src/main/scala/spark/DoubleRDDFunctions.scala
  6. +32 30 core/src/main/scala/spark/PairRDDFunctions.scala
  7. +1 1  core/src/main/scala/spark/{Split.scala → Partition.scala}
  8. +43 33 core/src/main/scala/spark/RDD.scala
  9. +6 6 core/src/main/scala/spark/RDDCheckpointData.scala
  10. +14 14 core/src/main/scala/spark/SparkContext.scala
  11. +3 3 core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
  12. +22 22 core/src/main/scala/spark/api/java/JavaPairRDD.scala
  13. +3 3 core/src/main/scala/spark/api/java/JavaRDD.scala
  14. +6 6 core/src/main/scala/spark/api/java/JavaRDDLike.scala
  15. +11 11 core/src/main/scala/spark/api/java/JavaSparkContext.scala
  16. +1 1  core/src/main/scala/spark/api/python/PythonPartitioner.scala
  17. +5 5 core/src/main/scala/spark/api/python/PythonRDD.scala
  18. +2 2 core/src/main/scala/spark/deploy/{JobDescription.scala → ApplicationDescription.scala}
  19. +10 9 core/src/main/scala/spark/deploy/DeployMessage.scala
  20. +9 9 core/src/main/scala/spark/deploy/JsonProtocol.scala
  21. +11 11 core/src/main/scala/spark/deploy/client/Client.scala
  22. +1 1  core/src/main/scala/spark/deploy/client/ClientListener.scala
  23. +3 3 core/src/main/scala/spark/deploy/client/TestClient.scala
  24. +5 5 core/src/main/scala/spark/deploy/master/{JobInfo.scala → ApplicationInfo.scala}
  25. +11 0 core/src/main/scala/spark/deploy/master/ApplicationState.scala
  26. +2 2 core/src/main/scala/spark/deploy/master/ExecutorInfo.scala
  27. +0 9 core/src/main/scala/spark/deploy/master/JobState.scala
  28. +87 87 core/src/main/scala/spark/deploy/master/Master.scala
  29. +11 11 core/src/main/scala/spark/deploy/master/MasterWebUI.scala
  30. +2 2 core/src/main/scala/spark/deploy/master/WorkerInfo.scala
  31. +13 13 core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
  32. +10 10 core/src/main/scala/spark/deploy/worker/Worker.scala
  33. +1 1  core/src/main/scala/spark/deploy/worker/WorkerArguments.scala
  34. +2 2 core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
  35. +3 2 core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
  36. +1 1  core/src/main/scala/spark/partial/ApproximateActionListener.scala
  37. +8 8 core/src/main/scala/spark/rdd/BlockRDD.scala
  38. +18 18 core/src/main/scala/spark/rdd/CartesianRDD.scala
  39. +10 10 core/src/main/scala/spark/rdd/CheckpointRDD.scala
  40. +13 13 core/src/main/scala/spark/rdd/CoGroupedRDD.scala
  41. +12 12 core/src/main/scala/spark/rdd/CoalescedRDD.scala
  42. +3 3 core/src/main/scala/spark/rdd/FilteredRDD.scala
  43. +3 3 core/src/main/scala/spark/rdd/FlatMappedRDD.scala
  44. +3 3 core/src/main/scala/spark/rdd/GlommedRDD.scala
  45. +10 10 core/src/main/scala/spark/rdd/HadoopRDD.scala
  46. +4 4 core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
  47. +6 6 core/src/main/scala/spark/rdd/{MapPartitionsWithSplitRDD.scala → MapPartitionsWithIndexRDD.scala}
  48. +3 3 core/src/main/scala/spark/rdd/MappedRDD.scala
  49. +10 10 core/src/main/scala/spark/rdd/NewHadoopRDD.scala
  50. +13 18 core/src/main/scala/spark/{ParallelCollection.scala → rdd/ParallelCollectionRDD.scala}
  51. +8 8 core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
  52. +3 3 core/src/main/scala/spark/rdd/PipedRDD.scala
  53. +8 8 core/src/main/scala/spark/rdd/SampledRDD.scala
  54. +5 5 core/src/main/scala/spark/rdd/ShuffledRDD.scala
  55. +15 15 core/src/main/scala/spark/rdd/UnionRDD.scala
  56. +16 16 core/src/main/scala/spark/rdd/ZippedRDD.scala
  57. +7 7 core/src/main/scala/spark/scheduler/DAGScheduler.scala
  58. +3 3 core/src/main/scala/spark/scheduler/ResultTask.scala
  59. +3 3 core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
  60. +1 1  core/src/main/scala/spark/scheduler/Stage.scala
  61. +8 7 core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
  62. +2 2 core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
  63. +2 2 core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
  64. +1 1  core/src/main/scala/spark/storage/BlockManager.scala
  65. +1 1  core/src/main/scala/spark/storage/StorageUtils.scala
  66. +40 0 core/src/main/twirl/spark/deploy/master/app_details.scala.html
  67. +20 0 core/src/main/twirl/spark/deploy/master/app_row.scala.html
  68. +4 4 core/src/main/twirl/spark/deploy/master/{job_table.scala.html → app_table.scala.html}
  69. +3 3 core/src/main/twirl/spark/deploy/master/executor_row.scala.html
  70. +8 8 core/src/main/twirl/spark/deploy/master/index.scala.html
  71. +0 40 core/src/main/twirl/spark/deploy/master/job_details.scala.html
  72. +0 20 core/src/main/twirl/spark/deploy/master/job_row.scala.html
  73. +5 5 core/src/main/twirl/spark/deploy/worker/executor_row.scala.html
  74. +56 56 core/src/test/scala/spark/CheckpointSuite.scala
  75. +9 4 core/src/test/scala/spark/RDDSuite.scala
  76. +24 1 core/src/test/scala/spark/ShuffleSuite.scala
  77. +5 5 core/src/test/scala/spark/SortingSuite.scala
  78. +20 20 core/src/test/scala/spark/{ → rdd}/ParallelCollectionSplitSuite.scala
  79. +11 11 core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
  80. +5 5 core/src/test/scala/spark/scheduler/TaskContextSuite.scala
  81. +11 0 ec2/deploy.generic/root/spark-ec2/ec2-variables.sh
  82. +55 19 ec2/spark_ec2.py
  83. +1 1  streaming/src/main/scala/spark/streaming/Checkpoint.scala
  84. +1 1  streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
  85. +5 5 streaming/src/main/scala/spark/streaming/StreamingContext.scala
  86. +3 3 streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
  87. +1 1  streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala
  88. +1 1  streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
20 bagel/src/main/scala/spark/bagel/Bagel.scala
@@ -14,11 +14,11 @@ object Bagel extends Logging {
14 14 combiner: Combiner[M, C],
15 15 aggregator: Option[Aggregator[V, A]],
16 16 partitioner: Partitioner,
17   - numSplits: Int
  17 + numPartitions: Int
18 18 )(
19 19 compute: (V, Option[C], Option[A], Int) => (V, Array[M])
20 20 ): RDD[(K, V)] = {
21   - val splits = if (numSplits != 0) numSplits else sc.defaultParallelism
  21 + val splits = if (numPartitions != 0) numPartitions else sc.defaultParallelism
22 22
23 23 var superstep = 0
24 24 var verts = vertices
@@ -56,12 +56,12 @@ object Bagel extends Logging {
56 56 messages: RDD[(K, M)],
57 57 combiner: Combiner[M, C],
58 58 partitioner: Partitioner,
59   - numSplits: Int
  59 + numPartitions: Int
60 60 )(
61 61 compute: (V, Option[C], Int) => (V, Array[M])
62 62 ): RDD[(K, V)] = {
63 63 run[K, V, M, C, Nothing](
64   - sc, vertices, messages, combiner, None, partitioner, numSplits)(
  64 + sc, vertices, messages, combiner, None, partitioner, numPartitions)(
65 65 addAggregatorArg[K, V, M, C](compute))
66 66 }
67 67
@@ -70,13 +70,13 @@ object Bagel extends Logging {
70 70 vertices: RDD[(K, V)],
71 71 messages: RDD[(K, M)],
72 72 combiner: Combiner[M, C],
73   - numSplits: Int
  73 + numPartitions: Int
74 74 )(
75 75 compute: (V, Option[C], Int) => (V, Array[M])
76 76 ): RDD[(K, V)] = {
77   - val part = new HashPartitioner(numSplits)
  77 + val part = new HashPartitioner(numPartitions)
78 78 run[K, V, M, C, Nothing](
79   - sc, vertices, messages, combiner, None, part, numSplits)(
  79 + sc, vertices, messages, combiner, None, part, numPartitions)(
80 80 addAggregatorArg[K, V, M, C](compute))
81 81 }
82 82
@@ -84,13 +84,13 @@ object Bagel extends Logging {
84 84 sc: SparkContext,
85 85 vertices: RDD[(K, V)],
86 86 messages: RDD[(K, M)],
87   - numSplits: Int
  87 + numPartitions: Int
88 88 )(
89 89 compute: (V, Option[Array[M]], Int) => (V, Array[M])
90 90 ): RDD[(K, V)] = {
91   - val part = new HashPartitioner(numSplits)
  91 + val part = new HashPartitioner(numPartitions)
92 92 run[K, V, M, Array[M], Nothing](
93   - sc, vertices, messages, new DefaultCombiner(), None, part, numSplits)(
  93 + sc, vertices, messages, new DefaultCombiner(), None, part, numPartitions)(
94 94 addAggregatorArg[K, V, M, Array[M]](compute))
95 95 }
96 96
6 bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala
@@ -16,7 +16,7 @@ import scala.xml.{XML,NodeSeq}
16 16 object WikipediaPageRank {
17 17 def main(args: Array[String]) {
18 18 if (args.length < 5) {
19   - System.err.println("Usage: WikipediaPageRank <inputFile> <threshold> <numSplits> <host> <usePartitioner>")
  19 + System.err.println("Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <host> <usePartitioner>")
20 20 System.exit(-1)
21 21 }
22 22
@@ -25,7 +25,7 @@ object WikipediaPageRank {
25 25
26 26 val inputFile = args(0)
27 27 val threshold = args(1).toDouble
28   - val numSplits = args(2).toInt
  28 + val numPartitions = args(2).toInt
29 29 val host = args(3)
30 30 val usePartitioner = args(4).toBoolean
31 31 val sc = new SparkContext(host, "WikipediaPageRank")
@@ -69,7 +69,7 @@ object WikipediaPageRank {
69 69 val result =
70 70 Bagel.run(
71 71 sc, vertices, messages, combiner = new PRCombiner(),
72   - numSplits = numSplits)(
  72 + numPartitions = numPartitions)(
73 73 utils.computeWithCombiner(numVertices, epsilon))
74 74
75 75 // Print the result
2  bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
@@ -88,7 +88,7 @@ object WikipediaPageRankStandalone {
88 88 n: Long,
89 89 partitioner: Partitioner,
90 90 usePartitioner: Boolean,
91   - numSplits: Int
  91 + numPartitions: Int
92 92 ): RDD[(String, Double)] = {
93 93 var ranks = links.mapValues { edges => defaultRank }
94 94 for (i <- 1 to numIterations) {
4 core/src/main/scala/spark/CacheManager.scala
@@ -11,13 +11,13 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
11 11 private val loading = new HashSet[String]
12 12
13 13 /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
14   - def getOrCompute[T](rdd: RDD[T], split: Split, context: TaskContext, storageLevel: StorageLevel)
  14 + def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
15 15 : Iterator[T] = {
16 16 val key = "rdd_%d_%d".format(rdd.id, split.index)
17 17 logInfo("Cache key is " + key)
18 18 blockManager.get(key) match {
19 19 case Some(cachedValues) =>
20   - // Split is in cache, so just return its values
  20 + // Partition is in cache, so just return its values
21 21 logInfo("Found partition in cache!")
22 22 return cachedValues.asInstanceOf[Iterator[T]]
23 23
4 core/src/main/scala/spark/DoubleRDDFunctions.scala
@@ -42,14 +42,14 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
42 42 /** (Experimental) Approximate operation to return the mean within a timeout. */
43 43 def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
44 44 val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
45   - val evaluator = new MeanEvaluator(self.splits.size, confidence)
  45 + val evaluator = new MeanEvaluator(self.partitions.size, confidence)
46 46 self.context.runApproximateJob(self, processPartition, evaluator, timeout)
47 47 }
48 48
49 49 /** (Experimental) Approximate operation to return the sum within a timeout. */
50 50 def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
51 51 val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
52   - val evaluator = new SumEvaluator(self.splits.size, confidence)
  52 + val evaluator = new SumEvaluator(self.partitions.size, confidence)
53 53 self.context.runApproximateJob(self, processPartition, evaluator, timeout)
54 54 }
55 55 }
62 core/src/main/scala/spark/PairRDDFunctions.scala
@@ -62,7 +62,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
62 62 }
63 63 val aggregator =
64 64 new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
65   - if (mapSideCombine) {
  65 + if (self.partitioner == Some(partitioner)) {
  66 + self.mapPartitions(aggregator.combineValuesByKey(_), true)
  67 + } else if (mapSideCombine) {
66 68 val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey(_), true)
67 69 val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner)
68 70 partitioned.mapPartitions(aggregator.combineCombinersByKey(_), true)
@@ -81,8 +83,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
81 83 def combineByKey[C](createCombiner: V => C,
82 84 mergeValue: (C, V) => C,
83 85 mergeCombiners: (C, C) => C,
84   - numSplits: Int): RDD[(K, C)] = {
85   - combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numSplits))
  86 + numPartitions: Int): RDD[(K, C)] = {
  87 + combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
86 88 }
87 89
88 90 /**
@@ -143,10 +145,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
143 145 /**
144 146 * Merge the values for each key using an associative reduce function. This will also perform
145 147 * the merging locally on each mapper before sending results to a reducer, similarly to a
146   - * "combiner" in MapReduce. Output will be hash-partitioned with numSplits splits.
  148 + * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
147 149 */
148   - def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = {
149   - reduceByKey(new HashPartitioner(numSplits), func)
  150 + def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
  151 + reduceByKey(new HashPartitioner(numPartitions), func)
150 152 }
151 153
152 154 /**
@@ -164,10 +166,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
164 166
165 167 /**
166 168 * Group the values for each key in the RDD into a single sequence. Hash-partitions the
167   - * resulting RDD with into `numSplits` partitions.
  169 + * resulting RDD with into `numPartitions` partitions.
168 170 */
169   - def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = {
170   - groupByKey(new HashPartitioner(numSplits))
  171 + def groupByKey(numPartitions: Int): RDD[(K, Seq[V])] = {
  172 + groupByKey(new HashPartitioner(numPartitions))
171 173 }
172 174
173 175 /**
@@ -285,8 +287,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
285 287 * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
286 288 * (k, v2) is in `other`. Performs a hash join across the cluster.
287 289 */
288   - def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = {
289   - join(other, new HashPartitioner(numSplits))
  290 + def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = {
  291 + join(other, new HashPartitioner(numPartitions))
290 292 }
291 293
292 294 /**
@@ -303,10 +305,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
303 305 * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
304 306 * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
305 307 * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
306   - * into `numSplits` partitions.
  308 + * into `numPartitions` partitions.
307 309 */
308   - def leftOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, Option[W]))] = {
309   - leftOuterJoin(other, new HashPartitioner(numSplits))
  310 + def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = {
  311 + leftOuterJoin(other, new HashPartitioner(numPartitions))
310 312 }
311 313
312 314 /**
@@ -325,8 +327,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
325 327 * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
326 328 * RDD into the given number of partitions.
327 329 */
328   - def rightOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Option[V], W))] = {
329   - rightOuterJoin(other, new HashPartitioner(numSplits))
  330 + def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = {
  331 + rightOuterJoin(other, new HashPartitioner(numPartitions))
330 332 }
331 333
332 334 /**
@@ -361,7 +363,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
361 363 throw new SparkException("Default partitioner cannot partition array keys.")
362 364 }
363 365 val cg = new CoGroupedRDD[K](
364   - Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]),
  366 + Seq(self.asInstanceOf[RDD[(K, _)]], other.asInstanceOf[RDD[(K, _)]]),
365 367 partitioner)
366 368 val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
367 369 prfs.mapValues {
@@ -380,9 +382,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
380 382 throw new SparkException("Default partitioner cannot partition array keys.")
381 383 }
382 384 val cg = new CoGroupedRDD[K](
383   - Seq(self.asInstanceOf[RDD[(_, _)]],
384   - other1.asInstanceOf[RDD[(_, _)]],
385   - other2.asInstanceOf[RDD[(_, _)]]),
  385 + Seq(self.asInstanceOf[RDD[(K, _)]],
  386 + other1.asInstanceOf[RDD[(K, _)]],
  387 + other2.asInstanceOf[RDD[(K, _)]]),
386 388 partitioner)
387 389 val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
388 390 prfs.mapValues {
@@ -412,17 +414,17 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
412 414 * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
413 415 * list of values for that key in `this` as well as `other`.
414 416 */
415   - def cogroup[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Seq[V], Seq[W]))] = {
416   - cogroup(other, new HashPartitioner(numSplits))
  417 + def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
  418 + cogroup(other, new HashPartitioner(numPartitions))
417 419 }
418 420
419 421 /**
420 422 * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
421 423 * tuple with the list of values for that key in `this`, `other1` and `other2`.
422 424 */
423   - def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numSplits: Int)
  425 + def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
424 426 : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
425   - cogroup(other1, other2, new HashPartitioner(numSplits))
  427 + cogroup(other1, other2, new HashPartitioner(numPartitions))
426 428 }
427 429
428 430 /** Alias for cogroup. */
@@ -634,9 +636,9 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
634 636 * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
635 637 * order of the keys).
636 638 */
637   - def sortByKey(ascending: Boolean = true, numSplits: Int = self.splits.size): RDD[(K,V)] = {
  639 + def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[(K,V)] = {
638 640 val shuffled =
639   - new ShuffledRDD[K, V](self, new RangePartitioner(numSplits, self, ascending))
  641 + new ShuffledRDD[K, V](self, new RangePartitioner(numPartitions, self, ascending))
640 642 shuffled.mapPartitions(iter => {
641 643 val buf = iter.toArray
642 644 if (ascending) {
@@ -650,9 +652,9 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
650 652
651 653 private[spark]
652 654 class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev) {
653   - override def getSplits = firstParent[(K, V)].splits
  655 + override def getPartitions = firstParent[(K, V)].partitions
654 656 override val partitioner = firstParent[(K, V)].partitioner
655   - override def compute(split: Split, context: TaskContext) =
  657 + override def compute(split: Partition, context: TaskContext) =
656 658 firstParent[(K, V)].iterator(split, context).map{ case (k, v) => (k, f(v)) }
657 659 }
658 660
@@ -660,9 +662,9 @@ private[spark]
660 662 class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U])
661 663 extends RDD[(K, U)](prev) {
662 664
663   - override def getSplits = firstParent[(K, V)].splits
  665 + override def getPartitions = firstParent[(K, V)].partitions
664 666 override val partitioner = firstParent[(K, V)].partitioner
665   - override def compute(split: Split, context: TaskContext) = {
  667 + override def compute(split: Partition, context: TaskContext) = {
666 668 firstParent[(K, V)].iterator(split, context).flatMap { case (k, v) => f(v).map(x => (k, x)) }
667 669 }
668 670 }
2  core/src/main/scala/spark/Split.scala → core/src/main/scala/spark/Partition.scala
@@ -3,7 +3,7 @@ package spark
3 3 /**
4 4 * A partition of an RDD.
5 5 */
6   -trait Split extends Serializable {
  6 +trait Partition extends Serializable {
7 7 /**
8 8 * Get the split's index within its parent RDD
9 9 */
76 core/src/main/scala/spark/RDD.scala
@@ -27,7 +27,7 @@ import spark.rdd.FlatMappedRDD
27 27 import spark.rdd.GlommedRDD
28 28 import spark.rdd.MappedRDD
29 29 import spark.rdd.MapPartitionsRDD
30   -import spark.rdd.MapPartitionsWithSplitRDD
  30 +import spark.rdd.MapPartitionsWithIndexRDD
31 31 import spark.rdd.PipedRDD
32 32 import spark.rdd.SampledRDD
33 33 import spark.rdd.UnionRDD
@@ -49,7 +49,7 @@ import SparkContext._
49 49 *
50 50 * Internally, each RDD is characterized by five main properties:
51 51 *
52   - * - A list of splits (partitions)
  52 + * - A list of partitions
53 53 * - A function for computing each split
54 54 * - A list of dependencies on other RDDs
55 55 * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
@@ -76,13 +76,13 @@ abstract class RDD[T: ClassManifest](
76 76 // =======================================================================
77 77
78 78 /** Implemented by subclasses to compute a given partition. */
79   - def compute(split: Split, context: TaskContext): Iterator[T]
  79 + def compute(split: Partition, context: TaskContext): Iterator[T]
80 80
81 81 /**
82 82 * Implemented by subclasses to return the set of partitions in this RDD. This method will only
83 83 * be called once, so it is safe to implement a time-consuming computation in it.
84 84 */
85   - protected def getSplits: Array[Split]
  85 + protected def getPartitions: Array[Partition]
86 86
87 87 /**
88 88 * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
@@ -91,7 +91,7 @@ abstract class RDD[T: ClassManifest](
91 91 protected def getDependencies: Seq[Dependency[_]] = deps
92 92
93 93 /** Optionally overridden by subclasses to specify placement preferences. */
94   - protected def getPreferredLocations(split: Split): Seq[String] = Nil
  94 + protected def getPreferredLocations(split: Partition): Seq[String] = Nil
95 95
96 96 /** Optionally overridden by subclasses to specify how they are partitioned. */
97 97 val partitioner: Option[Partitioner] = None
@@ -137,10 +137,10 @@ abstract class RDD[T: ClassManifest](
137 137 /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
138 138 def getStorageLevel = storageLevel
139 139
140   - // Our dependencies and splits will be gotten by calling subclass's methods below, and will
  140 + // Our dependencies and partitions will be gotten by calling subclass's methods below, and will
141 141 // be overwritten when we're checkpointed
142 142 private var dependencies_ : Seq[Dependency[_]] = null
143   - @transient private var splits_ : Array[Split] = null
  143 + @transient private var partitions_ : Array[Partition] = null
144 144
145 145 /** An Option holding our checkpoint RDD, if we are checkpointed */
146 146 private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD)
@@ -159,15 +159,15 @@ abstract class RDD[T: ClassManifest](
159 159 }
160 160
161 161 /**
162   - * Get the array of splits of this RDD, taking into account whether the
  162 + * Get the array of partitions of this RDD, taking into account whether the
163 163 * RDD is checkpointed or not.
164 164 */
165   - final def splits: Array[Split] = {
166   - checkpointRDD.map(_.splits).getOrElse {
167   - if (splits_ == null) {
168   - splits_ = getSplits
  165 + final def partitions: Array[Partition] = {
  166 + checkpointRDD.map(_.partitions).getOrElse {
  167 + if (partitions_ == null) {
  168 + partitions_ = getPartitions
169 169 }
170   - splits_
  170 + partitions_
171 171 }
172 172 }
173 173
@@ -175,7 +175,7 @@ abstract class RDD[T: ClassManifest](
175 175 * Get the preferred location of a split, taking into account whether the
176 176 * RDD is checkpointed or not.
177 177 */
178   - final def preferredLocations(split: Split): Seq[String] = {
  178 + final def preferredLocations(split: Partition): Seq[String] = {
179 179 checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
180 180 getPreferredLocations(split)
181 181 }
@@ -186,7 +186,7 @@ abstract class RDD[T: ClassManifest](
186 186 * This should ''not'' be called by users directly, but is available for implementors of custom
187 187 * subclasses of RDD.
188 188 */
189   - final def iterator(split: Split, context: TaskContext): Iterator[T] = {
  189 + final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
190 190 if (storageLevel != StorageLevel.NONE) {
191 191 SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
192 192 } else {
@@ -197,7 +197,7 @@ abstract class RDD[T: ClassManifest](
197 197 /**
198 198 * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
199 199 */
200   - private[spark] def computeOrReadCheckpoint(split: Split, context: TaskContext): Iterator[T] = {
  200 + private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = {
201 201 if (isCheckpointed) {
202 202 firstParent[T].iterator(split, context)
203 203 } else {
@@ -227,15 +227,15 @@ abstract class RDD[T: ClassManifest](
227 227 /**
228 228 * Return a new RDD containing the distinct elements in this RDD.
229 229 */
230   - def distinct(numSplits: Int): RDD[T] =
231   - map(x => (x, null)).reduceByKey((x, y) => x, numSplits).map(_._1)
  230 + def distinct(numPartitions: Int): RDD[T] =
  231 + map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
232 232
233   - def distinct(): RDD[T] = distinct(splits.size)
  233 + def distinct(): RDD[T] = distinct(partitions.size)
234 234
235 235 /**
236   - * Return a new RDD that is reduced into `numSplits` partitions.
  236 + * Return a new RDD that is reduced into `numPartitions` partitions.
237 237 */
238   - def coalesce(numSplits: Int): RDD[T] = new CoalescedRDD(this, numSplits)
  238 + def coalesce(numPartitions: Int): RDD[T] = new CoalescedRDD(this, numPartitions)
239 239
240 240 /**
241 241 * Return a sampled subset of this RDD.
@@ -303,9 +303,9 @@ abstract class RDD[T: ClassManifest](
303 303 * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
304 304 * mapping to that key.
305 305 */
306   - def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = {
  306 + def groupBy[K: ClassManifest](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] = {
307 307 val cleanF = sc.clean(f)
308   - this.map(t => (cleanF(t), t)).groupByKey(numSplits)
  308 + this.map(t => (cleanF(t), t)).groupByKey(numPartitions)
309 309 }
310 310
311 311 /**
@@ -336,14 +336,24 @@ abstract class RDD[T: ClassManifest](
336 336 preservesPartitioning: Boolean = false): RDD[U] =
337 337 new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning)
338 338
339   - /**
  339 + /**
  340 + * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
  341 + * of the original partition.
  342 + */
  343 + def mapPartitionsWithIndex[U: ClassManifest](
  344 + f: (Int, Iterator[T]) => Iterator[U],
  345 + preservesPartitioning: Boolean = false): RDD[U] =
  346 + new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
  347 +
  348 + /**
340 349 * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
341 350 * of the original partition.
342 351 */
  352 + @deprecated("use mapPartitionsWithIndex")
343 353 def mapPartitionsWithSplit[U: ClassManifest](
344 354 f: (Int, Iterator[T]) => Iterator[U],
345 355 preservesPartitioning: Boolean = false): RDD[U] =
346   - new MapPartitionsWithSplitRDD(this, sc.clean(f), preservesPartitioning)
  356 + new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
347 357
348 358 /**
349 359 * Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
@@ -471,7 +481,7 @@ abstract class RDD[T: ClassManifest](
471 481 }
472 482 result
473 483 }
474   - val evaluator = new CountEvaluator(splits.size, confidence)
  484 + val evaluator = new CountEvaluator(partitions.size, confidence)
475 485 sc.runApproximateJob(this, countElements, evaluator, timeout)
476 486 }
477 487
@@ -522,7 +532,7 @@ abstract class RDD[T: ClassManifest](
522 532 }
523 533 map
524 534 }
525   - val evaluator = new GroupedCountEvaluator[T](splits.size, confidence)
  535 + val evaluator = new GroupedCountEvaluator[T](partitions.size, confidence)
526 536 sc.runApproximateJob(this, countPartition, evaluator, timeout)
527 537 }
528 538
@@ -537,7 +547,7 @@ abstract class RDD[T: ClassManifest](
537 547 }
538 548 val buf = new ArrayBuffer[T]
539 549 var p = 0
540   - while (buf.size < num && p < splits.size) {
  550 + while (buf.size < num && p < partitions.size) {
541 551 val left = num - buf.size
542 552 val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p), true)
543 553 buf ++= res(0)
@@ -657,11 +667,11 @@ abstract class RDD[T: ClassManifest](
657 667
658 668 /**
659 669 * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`)
660   - * created from the checkpoint file, and forget its old dependencies and splits.
  670 + * created from the checkpoint file, and forget its old dependencies and partitions.
661 671 */
662 672 private[spark] def markCheckpointed(checkpointRDD: RDD[_]) {
663 673 clearDependencies()
664   - splits_ = null
  674 + partitions_ = null
665 675 deps = null // Forget the constructor argument for dependencies too
666 676 }
667 677
@@ -676,15 +686,15 @@ abstract class RDD[T: ClassManifest](
676 686 }
677 687
678 688 /** A description of this RDD and its recursive dependencies for debugging. */
679   - def toDebugString(): String = {
  689 + def toDebugString: String = {
680 690 def debugString(rdd: RDD[_], prefix: String = ""): Seq[String] = {
681   - Seq(prefix + rdd + " (" + rdd.splits.size + " splits)") ++
  691 + Seq(prefix + rdd + " (" + rdd.partitions.size + " partitions)") ++
682 692 rdd.dependencies.flatMap(d => debugString(d.rdd, prefix + " "))
683 693 }
684 694 debugString(this).mkString("\n")
685 695 }
686 696
687   - override def toString(): String = "%s%s[%d] at %s".format(
  697 + override def toString: String = "%s%s[%d] at %s".format(
688 698 Option(name).map(_ + " ").getOrElse(""),
689 699 getClass.getSimpleName,
690 700 id,
12 core/src/main/scala/spark/RDDCheckpointData.scala
@@ -16,7 +16,7 @@ private[spark] object CheckpointState extends Enumeration {
16 16 /**
17 17 * This class contains all the information related to RDD checkpointing. Each instance of this class
18 18 * is associated with a RDD. It manages process of checkpointing of the associated RDD, as well as,
19   - * manages the post-checkpoint state by providing the updated splits, iterator and preferred locations
  19 + * manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations
20 20 * of the checkpointed RDD.
21 21 */
22 22 private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
@@ -67,11 +67,11 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
67 67 rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path) _)
68 68 val newRDD = new CheckpointRDD[T](rdd.context, path)
69 69
70   - // Change the dependencies and splits of the RDD
  70 + // Change the dependencies and partitions of the RDD
71 71 RDDCheckpointData.synchronized {
72 72 cpFile = Some(path)
73 73 cpRDD = Some(newRDD)
74   - rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and splits
  74 + rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions
75 75 cpState = Checkpointed
76 76 RDDCheckpointData.clearTaskCaches()
77 77 logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
@@ -79,15 +79,15 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
79 79 }
80 80
81 81 // Get preferred location of a split after checkpointing
82   - def getPreferredLocations(split: Split): Seq[String] = {
  82 + def getPreferredLocations(split: Partition): Seq[String] = {
83 83 RDDCheckpointData.synchronized {
84 84 cpRDD.get.preferredLocations(split)
85 85 }
86 86 }
87 87
88   - def getSplits: Array[Split] = {
  88 + def getPartitions: Array[Partition] = {
89 89 RDDCheckpointData.synchronized {
90   - cpRDD.get.splits
  90 + cpRDD.get.partitions
91 91 }
92 92 }
93 93
28 core/src/main/scala/spark/SparkContext.scala
@@ -39,7 +39,7 @@ import spark.broadcast._
39 39 import spark.deploy.LocalSparkCluster
40 40 import spark.partial.ApproximateEvaluator
41 41 import spark.partial.PartialResult
42   -import rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD}
  42 +import rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
43 43 import scheduler.{ResultTask, ShuffleMapTask, DAGScheduler, TaskScheduler}
44 44 import spark.scheduler.local.LocalScheduler
45 45 import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
@@ -53,7 +53,7 @@ import storage.{StorageStatus, StorageUtils, RDDInfo}
53 53 * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
54 54 *
55 55 * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
56   - * @param jobName A name for your job, to display on the cluster web UI.
  56 + * @param appName A name for your application, to display on the cluster web UI.
57 57 * @param sparkHome Location where Spark is installed on cluster nodes.
58 58 * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
59 59 * system or HDFS, HTTP, HTTPS, or FTP URLs.
@@ -61,7 +61,7 @@ import storage.{StorageStatus, StorageUtils, RDDInfo}
61 61 */
62 62 class SparkContext(
63 63 val master: String,
64   - val jobName: String,
  64 + val appName: String,
65 65 val sparkHome: String = null,
66 66 val jars: Seq[String] = Nil,
67 67 environment: Map[String, String] = Map())
@@ -143,7 +143,7 @@ class SparkContext(
143 143
144 144 case SPARK_REGEX(sparkUrl) =>
145 145 val scheduler = new ClusterScheduler(this)
146   - val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, jobName)
  146 + val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
147 147 scheduler.initialize(backend)
148 148 scheduler
149 149
@@ -162,7 +162,7 @@ class SparkContext(
162 162 val localCluster = new LocalSparkCluster(
163 163 numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
164 164 val sparkUrl = localCluster.start()
165   - val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, jobName)
  165 + val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
166 166 scheduler.initialize(backend)
167 167 backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
168 168 localCluster.stop()
@@ -178,9 +178,9 @@ class SparkContext(
178 178 val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
179 179 val masterWithoutProtocol = master.replaceFirst("^mesos://", "") // Strip initial mesos://
180 180 val backend = if (coarseGrained) {
181   - new CoarseMesosSchedulerBackend(scheduler, this, masterWithoutProtocol, jobName)
  181 + new CoarseMesosSchedulerBackend(scheduler, this, masterWithoutProtocol, appName)
182 182 } else {
183   - new MesosSchedulerBackend(scheduler, this, masterWithoutProtocol, jobName)
  183 + new MesosSchedulerBackend(scheduler, this, masterWithoutProtocol, appName)
184 184 }
185 185 scheduler.initialize(backend)
186 186 scheduler
@@ -216,7 +216,7 @@ class SparkContext(
216 216
217 217 /** Distribute a local Scala collection to form an RDD. */
218 218 def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
219   - new ParallelCollection[T](this, seq, numSlices, Map[Int, Seq[String]]())
  219 + new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
220 220 }
221 221
222 222 /** Distribute a local Scala collection to form an RDD. */
@@ -229,7 +229,7 @@ class SparkContext(
229 229 * Create a new partition for each collection item. */
230 230 def makeRDD[T: ClassManifest](seq: Seq[(T, Seq[String])]): RDD[T] = {
231 231 val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
232   - new ParallelCollection[T](this, seq.map(_._1), seq.size, indexToPrefs)
  232 + new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
233 233 }
234 234
235 235 /**
@@ -614,14 +614,14 @@ class SparkContext(
614 614 * Run a job on all partitions in an RDD and return the results in an array.
615 615 */
616 616 def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
617   - runJob(rdd, func, 0 until rdd.splits.size, false)
  617 + runJob(rdd, func, 0 until rdd.partitions.size, false)
618 618 }
619 619
620 620 /**
621 621 * Run a job on all partitions in an RDD and return the results in an array.
622 622 */
623 623 def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
624   - runJob(rdd, func, 0 until rdd.splits.size, false)
  624 + runJob(rdd, func, 0 until rdd.partitions.size, false)
625 625 }
626 626
627 627 /**
@@ -632,7 +632,7 @@ class SparkContext(
632 632 processPartition: (TaskContext, Iterator[T]) => U,
633 633 resultHandler: (Int, U) => Unit)
634 634 {
635   - runJob[T, U](rdd, processPartition, 0 until rdd.splits.size, false, resultHandler)
  635 + runJob[T, U](rdd, processPartition, 0 until rdd.partitions.size, false, resultHandler)
636 636 }
637 637
638 638 /**
@@ -644,7 +644,7 @@ class SparkContext(
644 644 resultHandler: (Int, U) => Unit)
645 645 {
646 646 val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter)
647   - runJob[T, U](rdd, processFunc, 0 until rdd.splits.size, false, resultHandler)
  647 + runJob[T, U](rdd, processFunc, 0 until rdd.partitions.size, false, resultHandler)
648 648 }
649 649
650 650 /**
@@ -696,7 +696,7 @@ class SparkContext(
696 696 /** Default level of parallelism to use when not given by user (e.g. for reduce tasks) */
697 697 def defaultParallelism: Int = taskScheduler.defaultParallelism
698 698
699   - /** Default min number of splits for Hadoop RDDs when not given by user */
  699 + /** Default min number of partitions for Hadoop RDDs when not given by user */
700 700 def defaultMinSplits: Int = math.min(defaultParallelism, 2)
701 701
702 702 private var nextShuffleId = new AtomicInteger(0)
6 core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
@@ -44,7 +44,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
44 44 /**
45 45 * Return a new RDD containing the distinct elements in this RDD.
46 46 */
47   - def distinct(numSplits: Int): JavaDoubleRDD = fromRDD(srdd.distinct(numSplits))
  47 + def distinct(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.distinct(numPartitions))
48 48
49 49 /**
50 50 * Return a new RDD containing only the elements that satisfy a predicate.
@@ -53,9 +53,9 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
53 53 fromRDD(srdd.filter(x => f(x).booleanValue()))
54 54
55 55 /**
56   - * Return a new RDD that is reduced into `numSplits` partitions.
  56 + * Return a new RDD that is reduced into `numPartitions` partitions.
57 57 */
58   - def coalesce(numSplits: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numSplits))
  58 + def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions))
59 59
60 60 /**
61 61 * Return a sampled subset of this RDD.
44 core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -54,7 +54,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
54 54 /**
55 55 * Return a new RDD containing the distinct elements in this RDD.
56 56 */
57   - def distinct(numSplits: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numSplits))
  57 + def distinct(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numPartitions))
58 58
59 59 /**
60 60 * Return a new RDD containing only the elements that satisfy a predicate.
@@ -63,9 +63,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
63 63 new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
64 64
65 65 /**
66   - * Return a new RDD that is reduced into `numSplits` partitions.
  66 + * Return a new RDD that is reduced into `numPartitions` partitions.
67 67 */
68   - def coalesce(numSplits: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.coalesce(numSplits))
  68 + def coalesce(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.coalesce(numPartitions))
69 69
70 70 /**
71 71 * Return a sampled subset of this RDD.
@@ -122,8 +122,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
122 122 def combineByKey[C](createCombiner: JFunction[V, C],
123 123 mergeValue: JFunction2[C, V, C],
124 124 mergeCombiners: JFunction2[C, C, C],
125   - numSplits: Int): JavaPairRDD[K, C] =
126   - combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numSplits))
  125 + numPartitions: Int): JavaPairRDD[K, C] =
  126 + combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
127 127
128 128 /**
129 129 * Merge the values for each key using an associative reduce function. This will also perform
@@ -162,10 +162,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
162 162 /**
163 163 * Merge the values for each key using an associative reduce function. This will also perform
164 164 * the merging locally on each mapper before sending results to a reducer, similarly to a
165   - * "combiner" in MapReduce. Output will be hash-partitioned with numSplits splits.
  165 + * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
166 166 */
167   - def reduceByKey(func: JFunction2[V, V, V], numSplits: Int): JavaPairRDD[K, V] =
168   - fromRDD(rdd.reduceByKey(func, numSplits))
  167 + def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V] =
  168 + fromRDD(rdd.reduceByKey(func, numPartitions))
169 169
170 170 /**
171 171 * Group the values for each key in the RDD into a single sequence. Allows controlling the
@@ -176,10 +176,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
176 176
177 177 /**
178 178 * Group the values for each key in the RDD into a single sequence. Hash-partitions the
179   - * resulting RDD with into `numSplits` partitions.
  179 + * resulting RDD with into `numPartitions` partitions.
180 180 */
181   - def groupByKey(numSplits: Int): JavaPairRDD[K, JList[V]] =
182   - fromRDD(groupByResultToJava(rdd.groupByKey(numSplits)))
  181 + def groupByKey(numPartitions: Int): JavaPairRDD[K, JList[V]] =
  182 + fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))
183 183
184 184 /**
185 185 * Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
@@ -261,8 +261,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
261 261 * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
262 262 * (k, v2) is in `other`. Performs a hash join across the cluster.
263 263 */
264   - def join[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (V, W)] =
265   - fromRDD(rdd.join(other, numSplits))
  264 + def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] =
  265 + fromRDD(rdd.join(other, numPartitions))
266 266
267 267 /**
268 268 * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
@@ -277,10 +277,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
277 277 * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
278 278 * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
279 279 * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
280   - * into `numSplits` partitions.
  280 + * into `numPartitions` partitions.
281 281 */
282   - def leftOuterJoin[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (V, Option[W])] =
283   - fromRDD(rdd.leftOuterJoin(other, numSplits))
  282 + def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Option[W])] =
  283 + fromRDD(rdd.leftOuterJoin(other, numPartitions))
284 284
285 285 /**
286 286 * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
@@ -297,8 +297,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
297 297 * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
298 298 * RDD into the given number of partitions.
299 299 */
300   - def rightOuterJoin[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (Option[V], W)] =
301   - fromRDD(rdd.rightOuterJoin(other, numSplits))
  300 + def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Option[V], W)] =
  301 + fromRDD(rdd.rightOuterJoin(other, numPartitions))
302 302
303 303 /**
304 304 * Return the key-value pairs in this RDD to the master as a Map.
@@ -362,16 +362,16 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
362 362 * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
363 363 * list of values for that key in `this` as well as `other`.
364 364 */
365   - def cogroup[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (JList[V], JList[W])]
366   - = fromRDD(cogroupResultToJava(rdd.cogroup(other, numSplits)))
  365 + def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])]
  366 + = fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
367 367
368 368 /**
369 369 * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
370 370 * tuple with the list of values for that key in `this`, `other1` and `other2`.
371 371 */
372   - def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numSplits: Int)
  372 + def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int)
373 373 : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
374   - fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numSplits)))
  374 + fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))
375 375
376 376 /** Alias for cogroup. */
377 377 def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
6 core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -30,7 +30,7 @@ JavaRDDLike[T, JavaRDD[T]] {
30 30 /**
31 31 * Return a new RDD containing the distinct elements in this RDD.
32 32 */
33   - def distinct(numSplits: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numSplits))
  33 + def distinct(numPartitions: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numPartitions))
34 34
35 35 /**
36 36 * Return a new RDD containing only the elements that satisfy a predicate.
@@ -39,9 +39,9 @@ JavaRDDLike[T, JavaRDD[T]] {
39 39 wrapRDD(rdd.filter((x => f(x).booleanValue())))
40 40
41 41 /**
42   - * Return a new RDD that is reduced into `numSplits` partitions.
  42 + * Return a new RDD that is reduced into `numPartitions` partitions.
43 43 */
44   - def coalesce(numSplits: Int): JavaRDD[T] = rdd.coalesce(numSplits)
  44 + def coalesce(numPartitions: Int): JavaRDD[T] = rdd.coalesce(numPartitions)
45 45
46 46 /**
47 47 * Return a sampled subset of this RDD.
12 core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -4,7 +4,7 @@ import java.util.{List => JList}
4 4 import scala.Tuple2
5 5 import scala.collection.JavaConversions._
6 6
7   -import spark.{SparkContext, Split, RDD, TaskContext}
  7 +import spark.{SparkContext, Partition, RDD, TaskContext}
8 8 import spark.api.java.JavaPairRDD._
9 9 import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
10 10 import spark.partial.{PartialResult, BoundedDouble}
@@ -20,7 +20,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
20 20 def rdd: RDD[T]
21 21
22 22 /** Set of partitions in this RDD. */
23   - def splits: JList[Split] = new java.util.ArrayList(rdd.splits.toSeq)
  23 + def splits: JList[Partition] = new java.util.ArrayList(rdd.partitions.toSeq)
24 24
25 25 /** The [[spark.SparkContext]] that this RDD was created on. */
26 26 def context: SparkContext = rdd.context
@@ -36,7 +36,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
36 36 * This should ''not'' be called by users directly, but is available for implementors of custom
37 37 * subclasses of RDD.
38 38 */
39   - def iterator(split: Split, taskContext: TaskContext): java.util.Iterator[T] =
  39 + def iterator(split: Partition, taskContext: TaskContext): java.util.Iterator[T] =
40 40 asJavaIterator(rdd.iterator(split, taskContext))
41 41
42 42 // Transformations (return a new RDD)
@@ -146,12 +146,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
146 146 * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
147 147 * mapping to that key.
148 148 */
149   - def groupBy[K](f: JFunction[T, K], numSplits: Int): JavaPairRDD[K, JList[T]] = {
  149 + def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = {
150 150 implicit val kcm: ClassManifest[K] =
151 151 implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
152 152 implicit val vcm: ClassManifest[JList[T]] =
153 153 implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[JList[T]]]
154   - JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numSplits)(f.returnType)))(kcm, vcm)
  154 + JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType)))(kcm, vcm)
155 155 }
156 156
157 157 /**
@@ -333,6 +333,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
333 333
334 334 /** A description of this RDD and its recursive dependencies for debugging. */
335 335 def toDebugString(): String = {
336   - rdd.toDebugString()
  336 + rdd.toDebugString
337 337 }
338 338 }
22 core/src/main/scala/spark/api/java/JavaSparkContext.scala
@@ -23,41 +23,41 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
23 23
24 24 /**
25 25 * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
26   - * @param jobName A name for your job, to display on the cluster web UI
  26 + * @param appName A name for your application, to display on the cluster web UI
27 27 */
28   - def this(master: String, jobName: String) = this(new SparkContext(master, jobName))
  28 + def this(master: String, appName: String) = this(new SparkContext(master, appName))
29 29
30 30 /**
31 31 * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
32   - * @param jobName A name for your job, to display on the cluster web UI
  32 + * @param appName A name for your application, to display on the cluster web UI
33 33 * @param sparkHome The SPARK_HOME directory on the slave nodes
34 34 * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
35 35 * system or HDFS, HTTP, HTTPS, or FTP URLs.
36 36 */
37   - def this(master: String, jobName: String, sparkHome: String, jarFile: String) =
38   - this(new SparkContext(master, jobName, sparkHome, Seq(jarFile)))
  37 + def this(master: String, appName: String, sparkHome: String, jarFile: String) =
  38 + this(new SparkContext(master, appName, sparkHome, Seq(jarFile)))
39 39
40 40 /**
41 41 * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
42   - * @param jobName A name for your job, to display on the cluster web UI
  42 + * @param appName A name for your application, to display on the cluster web UI
43 43 * @param sparkHome The SPARK_HOME directory on the slave nodes
44 44 * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
45 45 * system or HDFS, HTTP, HTTPS, or FTP URLs.
46 46 */
47   - def this(master: String, jobName: String, sparkHome: String, jars: Array[String]) =
48   - this(new SparkContext(master, jobName, sparkHome, jars.toSeq))
  47 + def this(master: String, appName: String, sparkHome: String, jars: Array[String]) =
  48 + this(new SparkContext(master, appName, sparkHome, jars.toSeq))
49 49
50 50 /**
51 51 * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
52   - * @param jobName A name for your job, to display on the cluster web UI
  52 + * @param appName A name for your application, to display on the cluster web UI
53 53 * @param sparkHome The SPARK_HOME directory on the slave nodes
54 54 * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
55 55 * system or HDFS, HTTP, HTTPS, or FTP URLs.
56 56 * @param environment Environment variables to set on worker nodes
57 57 */
58   - def this(master: String, jobName: String, sparkHome: String, jars: Array[String],
  58 + def this(master: String, appName: String, sparkHome: String, jars: Array[String],
59 59 environment: JMap[String, String]) =
60   - this(new SparkContext(master, jobName, sparkHome, jars.toSeq, environment))
  60 + this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment))
61 61
62 62 private[spark] val env = sc.env
63 63
2  core/src/main/scala/spark/api/python/PythonPartitioner.scala
@@ -9,7 +9,7 @@ import java.util.Arrays
9 9 *
10 10 * Stores the unique id() of the Python-side partitioning function so that it is incorporated into
11 11 * equality comparisons. Correctness requires that the id is a unique identifier for the
12   - * lifetime of the job (i.e. that it is not re-used as the id of a different partitioning
  12 + * lifetime of the program (i.e. that it is not re-used as the id of a different partitioning
13 13 * function). This can be ensured by using the Python id() function and maintaining a reference
14 14 * to the Python partitioning function so that its id() is not reused.
15 15 */
10 core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -32,11 +32,11 @@ private[spark] class PythonRDD[T: ClassManifest](
32 32 this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec,
33 33 broadcastVars, accumulator)
34 34
35   - override def getSplits = parent.splits
  35 + override def getPartitions = parent.partitions
36 36
37 37 override val partitioner = if (preservePartitoning) parent.partitioner else None
38 38
39   - override def compute(split: Split, context: TaskContext): Iterator[Array[Byte]] = {
  39 + override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
40 40 val SPARK_HOME = new ProcessBuilder().environment().get("SPARK_HOME")
41 41
42 42 val pb = new ProcessBuilder(Seq(pythonExec, SPARK_HOME + "/python/pyspark/worker.py"))
@@ -65,7 +65,7 @@ private[spark] class PythonRDD[T: ClassManifest](
65 65 SparkEnv.set(env)
66 66 val out = new PrintWriter(proc.getOutputStream)
67 67 val dOut = new DataOutputStream(proc.getOutputStream)
68   - // Split index
  68 + // Partition index
69 69 dOut.writeInt(split.index)
70 70 // sparkFilesDir
71 71 PythonRDD.writeAsPickle(SparkFiles.getRootDirectory, dOut)
@@ -155,8 +155,8 @@ private class PythonException(msg: String) extends Exception(msg)
155 155 */
156 156 private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
157 157 RDD[(Array[Byte], Array[Byte])](prev) {
158   - override def getSplits = prev.splits
159   - override def compute(split: Split, context: TaskContext) =
  158 + override def getPartitions = prev.partitions
  159 + override def compute(split: Partition, context: TaskContext) =