Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

SPY-287 Merging Apache 0.8.2 changes #7

Merged
merged 19 commits into from about 1 month ago

9 participants

Mark Hamstra Jon Hartlaub Patrick Wendell Ewen Cheslack-Postava Reynold Xin Kay Ousterhout Matei Zaharia Shivaram Venkataraman Raymond Liu
Mark Hamstra

Should be all bug-fixes

and others added some commits December 10, 2013
Patrick Wendell [maven-release-plugin] prepare for next development iteration 8ce9bd8
Patrick Wendell Version updates not handled by maven release plug-in 8f56390
Ewen Cheslack-Postava Force pseudo-tty allocation in spark-ec2 script.
ssh commands need the -t argument repeated twice if there is no local
tty, e.g. if the process running spark-ec2 uses nohup and the parent
process exits.
2e2ead4
Patrick Wendell Merge pull request #271 from ewencp/really-force-ssh-pseudo-tty-0.8
Force pseudo-tty allocation in spark-ec2 script.

ssh commands need the -t argument repeated twice if there is no local
tty, e.g. if the process running spark-ec2 uses nohup and the parent
process exits.

Without this change, if you run the script this way (e.g. using nohup from a cron job), it will fail setting up the nodes because some of the ssh commands complain about missing ttys and then fail.

(This version is for the 0.8 branch. I've filed a separate request for master since changes to the script caused the patches to be different.)
f898238
Reynold Xin Merge pull request #273 from rxin/top
Fixed a performance problem in RDD.top and BoundedPriorityQueue

BoundedPriority was actually traversing the entire queue to calculate the size, resulting in bad performance in insertion.

This should also cherry pick cleanly into branch-0.8.

(cherry picked from commit f4effb3)
Signed-off-by: Reynold Xin <rxin@apache.org>
df5fada
Kay Ousterhout Handle IndirectTaskResults in LocalScheduler 6183102
Kay Ousterhout Fixed test failure by adding exception to abortion msg d7bf08c
Matei Zaharia Merge pull request #281 from kayousterhout/local_indirect_fix
Handle IndirectTaskResults in LocalScheduler

This fixes a bug where large results aren't correctly handled when running in local mode.  Not doing this in master because expecting the Local/Cluster scheduler consolidation to go into 0.9, which will fix this issue (see #127)
88c565d
Reynold Xin Merge pull request #320 from kayousterhout/erroneous_failed_msg
Remove erroneous FAILED state for killed tasks.

Currently, when tasks are killed, the Executor first sends a
status update for the task with a "KILLED" state, and then
sends a second status update with a "FAILED" state saying that
the task failed due to an exception. The second FAILED state is
misleading/unncessary, and occurs due to a NonLocalReturnControl
Exception that gets thrown due to the way we kill tasks. This
commit eliminates that problem.

I'm not at all sure that this is the best way to fix this problem,
so alternate suggestions welcome. @rxin guessing you're the right
person to look at this.

(cherry picked from commit 0475ca8)
Signed-off-by: Reynold Xin <rxin@apache.org>
5c443ad
Shivaram Venkataraman Add collectPartition to JavaRDD interface.
Also remove takePartition from PythonRDD and use collectPartition in rdd.py.

Conflicts:
	core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
	python/pyspark/context.py
	python/pyspark/rdd.py
5092bae
Shivaram Venkataraman Make collectPartitions take an array of partitions
Change the implementation to use runJob instead of PartitionPruningRDD.
Also update the unit tests and the python take implementation
to use the new interface.
91e6e5b
Shivaram Venkataraman Add comment explaining collectPartitions's use 3ef68e4
Shivaram Venkataraman Make broadcast id public for use in R frontend 691dfef
Patrick Wendell Merge pull request #496 from pwendell/master
Fix bug in worker clean-up in UI

Introduced in d5a96fe (/cc @aarondav).

This should be picked into 0.8 and 0.9 as well. The bug causes old (zombie) workers on a node to not disappear immediately from the UI when a new one registers.
(cherry picked from commit a1cd185)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>
f3cc3a7
Shivaram Venkataraman Restore takePartition to PythonRDD, context.py
This is to avoid removing functions in minor releases.
38bf786
Reynold Xin Merge pull request #453 from shivaram/branch-0.8-SparkR
Backport changes used in SparkR to 0.8 branch

Backports two changes from master branch

1. Adding collectPartition to JavaRDD and using it from Python as well
2. Making broadcast id public.
c89b71a
Raymond Liu Merge pull request #583 from colorant/zookeeper.
Minor fix for ZooKeeperPersistenceEngine to use configured working dir

Author: Raymond Liu <raymond.liu@intel.com>

Closes #583 and squashes the following commits:

91b0609 [Raymond Liu] Minor fix for ZooKeeperPersistenceEngine to use configured working dir

(cherry picked from commit 68b2c0d)
Signed-off-by: Aaron Davidson <aaron@databricks.com>

Conflicts:
	core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
62b3158
Mark Hamstra Merge branch 'branch-0.8' of https://github.com/apache/incubator-spark
…into master-csd

Conflicts:
	assembly/pom.xml
	bagel/pom.xml
	core/pom.xml
	examples/pom.xml
	mllib/pom.xml
	pom.xml
	repl-bin/pom.xml
	repl/pom.xml
	streaming/pom.xml
	tools/pom.xml
	yarn/pom.xml
2042fa0
Mark Hamstra POM fixes a57cd14
Jon Hartlaub jhartlaub merged commit c588470 into from February 22, 2014
Jon Hartlaub jhartlaub closed this February 22, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 19 unique commits by 9 authors.

Dec 10, 2013
Patrick Wendell [maven-release-plugin] prepare for next development iteration 8ce9bd8
Dec 12, 2013
Patrick Wendell Version updates not handled by maven release plug-in 8f56390
Dec 16, 2013
Ewen Cheslack-Postava Force pseudo-tty allocation in spark-ec2 script.
ssh commands need the -t argument repeated twice if there is no local
tty, e.g. if the process running spark-ec2 uses nohup and the parent
process exits.
2e2ead4
Patrick Wendell Merge pull request #271 from ewencp/really-force-ssh-pseudo-tty-0.8
Force pseudo-tty allocation in spark-ec2 script.

ssh commands need the -t argument repeated twice if there is no local
tty, e.g. if the process running spark-ec2 uses nohup and the parent
process exits.

Without this change, if you run the script this way (e.g. using nohup from a cron job), it will fail setting up the nodes because some of the ssh commands complain about missing ttys and then fail.

(This version is for the 0.8 branch. I've filed a separate request for master since changes to the script caused the patches to be different.)
f898238
Dec 17, 2013
Reynold Xin Merge pull request #273 from rxin/top
Fixed a performance problem in RDD.top and BoundedPriorityQueue

BoundedPriority was actually traversing the entire queue to calculate the size, resulting in bad performance in insertion.

This should also cherry pick cleanly into branch-0.8.

(cherry picked from commit f4effb3)
Signed-off-by: Reynold Xin <rxin@apache.org>
df5fada
Dec 20, 2013
Kay Ousterhout Handle IndirectTaskResults in LocalScheduler 6183102
Kay Ousterhout Fixed test failure by adding exception to abortion msg d7bf08c
Matei Zaharia Merge pull request #281 from kayousterhout/local_indirect_fix
Handle IndirectTaskResults in LocalScheduler

This fixes a bug where large results aren't correctly handled when running in local mode.  Not doing this in master because expecting the Local/Cluster scheduler consolidation to go into 0.9, which will fix this issue (see #127)
88c565d
Jan 02, 2014
Reynold Xin Merge pull request #320 from kayousterhout/erroneous_failed_msg
Remove erroneous FAILED state for killed tasks.

Currently, when tasks are killed, the Executor first sends a
status update for the task with a "KILLED" state, and then
sends a second status update with a "FAILED" state saying that
the task failed due to an exception. The second FAILED state is
misleading/unncessary, and occurs due to a NonLocalReturnControl
Exception that gets thrown due to the way we kill tasks. This
commit eliminates that problem.

I'm not at all sure that this is the best way to fix this problem,
so alternate suggestions welcome. @rxin guessing you're the right
person to look at this.

(cherry picked from commit 0475ca8)
Signed-off-by: Reynold Xin <rxin@apache.org>
5c443ad
Jan 16, 2014
Shivaram Venkataraman Add collectPartition to JavaRDD interface.
Also remove takePartition from PythonRDD and use collectPartition in rdd.py.

Conflicts:
	core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
	python/pyspark/context.py
	python/pyspark/rdd.py
5092bae
Shivaram Venkataraman Make collectPartitions take an array of partitions
Change the implementation to use runJob instead of PartitionPruningRDD.
Also update the unit tests and the python take implementation
to use the new interface.
91e6e5b
Shivaram Venkataraman Add comment explaining collectPartitions's use 3ef68e4
Shivaram Venkataraman Make broadcast id public for use in R frontend 691dfef
Jan 22, 2014
Patrick Wendell Merge pull request #496 from pwendell/master
Fix bug in worker clean-up in UI

Introduced in d5a96fe (/cc @aarondav).

This should be picked into 0.8 and 0.9 as well. The bug causes old (zombie) workers on a node to not disappear immediately from the UI when a new one registers.
(cherry picked from commit a1cd185)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>
f3cc3a7
Shivaram Venkataraman Restore takePartition to PythonRDD, context.py
This is to avoid removing functions in minor releases.
38bf786
Jan 23, 2014
Reynold Xin Merge pull request #453 from shivaram/branch-0.8-SparkR
Backport changes used in SparkR to 0.8 branch

Backports two changes from master branch

1. Adding collectPartition to JavaRDD and using it from Python as well
2. Making broadcast id public.
c89b71a
Feb 11, 2014
Raymond Liu Merge pull request #583 from colorant/zookeeper.
Minor fix for ZooKeeperPersistenceEngine to use configured working dir

Author: Raymond Liu <raymond.liu@intel.com>

Closes #583 and squashes the following commits:

91b0609 [Raymond Liu] Minor fix for ZooKeeperPersistenceEngine to use configured working dir

(cherry picked from commit 68b2c0d)
Signed-off-by: Aaron Davidson <aaron@databricks.com>

Conflicts:
	core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
62b3158
Feb 21, 2014
Mark Hamstra Merge branch 'branch-0.8' of https://github.com/apache/incubator-spark
…into master-csd

Conflicts:
	assembly/pom.xml
	bagel/pom.xml
	core/pom.xml
	examples/pom.xml
	mllib/pom.xml
	pom.xml
	repl-bin/pom.xml
	repl/pom.xml
	streaming/pom.xml
	tools/pom.xml
	yarn/pom.xml
2042fa0
Mark Hamstra POM fixes a57cd14
This page is out of date. Refresh to see the latest.

Showing 26 changed files with 120 additions and 44 deletions. Show diff stats Hide diff stats

  1. 2  assembly/pom.xml
  2. 2  bagel/pom.xml
  3. 2  core/pom.xml
  4. 11  core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
  5. 2  core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
  6. 2  core/src/main/scala/org/apache/spark/deploy/master/Master.scala
  7. 2  core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
  8. 24  core/src/main/scala/org/apache/spark/executor/Executor.scala
  9. 3  core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
  10. 42  core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
  11. 2  core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
  12. 33  core/src/test/scala/org/apache/spark/JavaAPISuite.java
  13. 4  docs/_config.yml
  14. 2  ec2/spark_ec2.py
  15. 2  examples/pom.xml
  16. 2  mllib/pom.xml
  17. 2  new-yarn/pom.xml
  18. 2  pom.xml
  19. 2  project/SparkBuild.scala
  20. 9  python/pyspark/rdd.py
  21. 2  python/pyspark/shell.py
  22. 2  repl/pom.xml
  23. 2  repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
  24. 2  streaming/pom.xml
  25. 2  tools/pom.xml
  26. 2  yarn/pom.xml
2  assembly/pom.xml
@@ -21,7 +21,7 @@
21 21
   <parent>
22 22
     <groupId>org.apache.spark</groupId>
23 23
     <artifactId>spark-parent</artifactId>
24  
-    <version>0.8.1-csd-3-SNAPSHOT</version>
  24
+    <version>0.8.2-candidate-csd-1-SNAPSHOT</version>
25 25
     <relativePath>../pom.xml</relativePath>
26 26
   </parent>
27 27
 
2  bagel/pom.xml
@@ -21,7 +21,7 @@
21 21
   <parent>
22 22
     <groupId>org.apache.spark</groupId>
23 23
     <artifactId>spark-parent</artifactId>
24  
-    <version>0.8.1-csd-3-SNAPSHOT</version>
  24
+    <version>0.8.2-candidate-csd-1-SNAPSHOT</version>
25 25
     <relativePath>../pom.xml</relativePath>
26 26
   </parent>
27 27
 
2  core/pom.xml
@@ -21,7 +21,7 @@
21 21
   <parent>
22 22
     <groupId>org.apache.spark</groupId>
23 23
     <artifactId>spark-parent</artifactId>
24  
-    <version>0.8.1-csd-3-SNAPSHOT</version>
  24
+    <version>0.8.2-candidate-csd-1-SNAPSHOT</version>
25 25
     <relativePath>../pom.xml</relativePath>
26 26
   </parent>
27 27
 
11  core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -247,6 +247,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
247 247
   }
248 248
 
249 249
   /**
  250
+   * Return an array that contains all of the elements in a specific partition of this RDD.
  251
+   */
  252
+  def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = {
  253
+    // This is useful for implementing `take` from other language frontends
  254
+    // like Python where the data is serialized.
  255
+    import scala.collection.JavaConversions._
  256
+    val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds, true)
  257
+    res.map(x => new java.util.ArrayList(x.toSeq)).toArray
  258
+  }
  259
+
  260
+  /**
250 261
    * Reduces the elements of this RDD using the specified commutative and associative binary operator.
251 262
    */
252 263
   def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)
2  core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
22 22
 
23 23
 import org.apache.spark._
24 24
 
25  
-abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
  25
+abstract class Broadcast[T](val id: Long) extends Serializable {
26 26
   def value: T
27 27
 
28 28
   // We cannot have an abstract readObject here due to some weird issues with
2  core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -409,7 +409,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
409 409
     // There may be one or more refs to dead workers on this same node (w/ different ID's),
410 410
     // remove them.
411 411
     workers.filter { w =>
412  
-      (w.host == host && w.port == port) && (w.state == WorkerState.DEAD)
  412
+      (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
413 413
     }.foreach { w =>
414 414
       workers -= w
415 415
     }
2  core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -77,7 +77,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization)
77 77
   }
78 78
 
79 79
   def deserializeFromFile[T <: Serializable](filename: String)(implicit m: Manifest[T]): T = {
80  
-    val fileData = zk.getData("/spark/master_status/" + filename)
  80
+    val fileData = zk.getData(WORKING_DIR + "/" + filename)
81 81
     val clazz = m.erasure.asInstanceOf[Class[T]]
82 82
     val serializer = serialization.serializerFor(clazz)
83 83
     serializer.fromBinary(fileData).asInstanceOf[T]
24  core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -142,11 +142,6 @@ private[spark] class Executor(
142 142
     val tr = runningTasks.get(taskId)
143 143
     if (tr != null) {
144 144
       tr.kill()
145  
-      // We remove the task also in the finally block in TaskRunner.run.
146  
-      // The reason we need to remove it here is because killTask might be called before the task
147  
-      // is even launched, and never reaching that finally block. ConcurrentHashMap's remove is
148  
-      // idempotent.
149  
-      runningTasks.remove(taskId)
150 145
     }
151 146
   }
152 147
 
@@ -168,6 +163,8 @@ private[spark] class Executor(
168 163
   class TaskRunner(execBackend: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
169 164
     extends Runnable {
170 165
 
  166
+    object TaskKilledException extends Exception
  167
+
171 168
     @volatile private var killed = false
172 169
     @volatile private var task: Task[Any] = _
173 170
 
@@ -201,9 +198,11 @@ private[spark] class Executor(
201 198
         // If this task has been killed before we deserialized it, let's quit now. Otherwise,
202 199
         // continue executing the task.
203 200
         if (killed) {
204  
-          logInfo("Executor killed task " + taskId)
205  
-          execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
206  
-          return
  201
+          // Throw an exception rather than returning, because returning within a try{} block
  202
+          // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
  203
+          // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
  204
+          // for the task.
  205
+          throw TaskKilledException
207 206
         }
208 207
 
209 208
         attemptedTask = Some(task)
@@ -217,9 +216,7 @@ private[spark] class Executor(
217 216
 
218 217
         // If the task has been killed, let's fail it.
219 218
         if (task.killed) {
220  
-          logInfo("Executor killed task " + taskId)
221  
-          execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
222  
-          return
  219
+          throw TaskKilledException
223 220
         }
224 221
 
225 222
         for (m <- task.metrics) {
@@ -257,6 +254,11 @@ private[spark] class Executor(
257 254
           execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
258 255
         }
259 256
 
  257
+        case TaskKilledException => {
  258
+          logInfo("Executor killed task " + taskId)
  259
+          execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
  260
+        }
  261
+
260 262
         case t: Throwable => {
261 263
           val serviceTime = (System.currentTimeMillis() - taskStart).toInt
262 264
           val metrics = attemptedTask.flatMap(t => t.metrics)
3  core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -287,7 +287,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
287 287
               }
288 288
             }
289 289
           case None =>
290  
-            logInfo("Ignoring update from TID " + tid + " because its task set is gone")
  290
+            logInfo("Ignoring update with state %s from TID %s because its task set is gone"
  291
+              .format(state, tid))
291 292
         }
292 293
       } catch {
293 294
         case e: Exception => logError("Exception in statusUpdate", e)
42  core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
@@ -21,7 +21,8 @@ import java.nio.ByteBuffer
21 21
 import scala.collection.mutable.ArrayBuffer
22 22
 import scala.collection.mutable.HashMap
23 23
 
24  
-import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, SparkException, Success, TaskState}
  24
+import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, SparkException, Success,
  25
+  TaskEndReason, TaskResultLost, TaskState}
25 26
 import org.apache.spark.TaskState.TaskState
26 27
 import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Pool, Schedulable, Task,
27 28
   TaskDescription, TaskInfo, TaskLocality, TaskResult, TaskSet, TaskSetManager}
@@ -144,7 +145,18 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
144 145
     val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader) match {
145 146
       case directResult: DirectTaskResult[_] => directResult
146 147
       case IndirectTaskResult(blockId) => {
147  
-        throw new SparkException("Expect only DirectTaskResults when using LocalScheduler")
  148
+        logDebug("Fetching indirect task result for TID %s".format(tid))
  149
+        val serializedTaskResult = env.blockManager.getRemoteBytes(blockId)
  150
+        if (!serializedTaskResult.isDefined) {
  151
+          /* We won't be able to get the task result if the block manager had to flush the
  152
+           * result. */
  153
+          taskFailed(tid, state, serializedData)
  154
+          return
  155
+        }
  156
+        val deserializedResult = ser.deserialize[DirectTaskResult[_]](
  157
+          serializedTaskResult.get)
  158
+        env.blockManager.master.removeBlock(blockId)
  159
+        deserializedResult
148 160
       }
149 161
     }
150 162
     result.metrics.resultSize = serializedData.limit()
@@ -164,18 +176,28 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
164 176
     val task = taskSet.tasks(index)
165 177
     info.markFailed()
166 178
     decreaseRunningTasks(1)
167  
-    val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](
168  
-      serializedData, getClass.getClassLoader)
169  
-    sched.dagScheduler.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null))
  179
+    var failureReason = "unknown"
  180
+    ser.deserialize[TaskEndReason](serializedData, getClass.getClassLoader) match {
  181
+      case ef: ExceptionFailure =>
  182
+        failureReason = "Exception failure: %s".format(ef.description)
  183
+        val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
  184
+        logInfo("Task loss due to %s\n%s\n%s".format(
  185
+          ef.className, ef.description, locs.mkString("\n")))
  186
+        sched.dagScheduler.taskEnded(task, ef, null, null, info, ef.metrics.getOrElse(null))
  187
+
  188
+      case TaskResultLost =>
  189
+        failureReason = "Lost result for TID %s".format(tid)
  190
+        logWarning(failureReason)
  191
+        sched.dagScheduler.taskEnded(task, TaskResultLost, null, null, info, null)
  192
+
  193
+      case _ => {}
  194
+    }
170 195
     if (!finished(index)) {
171 196
       copiesRunning(index) -= 1
172 197
       numFailures(index) += 1
173  
-      val locs = reason.stackTrace.map(loc => "\tat %s".format(loc.toString))
174  
-      logInfo("Loss was due to %s\n%s\n%s".format(
175  
-        reason.className, reason.description, locs.mkString("\n")))
176 198
       if (numFailures(index) > MAX_TASK_FAILURES) {
177  
-        val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format(
178  
-          taskSet.id, index, MAX_TASK_FAILURES, reason.description)
  199
+        val errorMessage = ("Task %s:%d failed more than %d times; aborting job" +
  200
+          "(most recent failure: %s").format(taskSet.id, index, MAX_TASK_FAILURES, failureReason)
179 201
         decreaseRunningTasks(runningTasks)
180 202
         sched.dagScheduler.taskSetFailed(taskSet, errorMessage)
181 203
         // need to delete failed Taskset from schedule queue
2  core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
@@ -34,6 +34,8 @@ class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
34 34
 
35 35
   override def iterator: Iterator[A] = underlying.iterator.asScala
36 36
 
  37
+  override def size: Int = underlying.size
  38
+
37 39
   override def ++=(xs: TraversableOnce[A]): this.type = {
38 40
     xs.foreach { this += _ }
39 41
     this
33  core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -883,4 +883,37 @@ public void mapOnPairRDD() {
883 883
         new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
884 884
 
885 885
   }
  886
+
  887
+  @Test
  888
+  public void collectPartitions() {
  889
+    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
  890
+
  891
+    JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
  892
+      @Override
  893
+      public Tuple2<Integer, Integer> call(Integer i) throws Exception {
  894
+        return new Tuple2<Integer, Integer>(i, i % 2);
  895
+      }
  896
+    });
  897
+
  898
+    List[] parts = rdd1.collectPartitions(new int[] {0});
  899
+    Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
  900
+
  901
+    parts = rdd1.collectPartitions(new int[] {1, 2});
  902
+    Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
  903
+    Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
  904
+
  905
+    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
  906
+                                      new Tuple2<Integer, Integer>(2, 0)),
  907
+                        rdd2.collectPartitions(new int[] {0})[0]);
  908
+
  909
+    parts = rdd2.collectPartitions(new int[] {1, 2});
  910
+    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
  911
+                                      new Tuple2<Integer, Integer>(4, 0)),
  912
+                        parts[0]);
  913
+    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
  914
+                                      new Tuple2<Integer, Integer>(6, 0),
  915
+                                      new Tuple2<Integer, Integer>(7, 1)),
  916
+                        parts[1]);
  917
+  }
  918
+
886 919
 }
4  docs/_config.yml
@@ -3,8 +3,8 @@ markdown: kramdown
3 3
 
4 4
 # These allow the documentation to be updated with nerw releases
5 5
 # of Spark, Scala, and Mesos.
6  
-SPARK_VERSION: 0.8.1-incubating
7  
-SPARK_VERSION_SHORT: 0.8.1
  6
+SPARK_VERSION: 0.8.2-incubating-SNAPSHOT
  7
+SPARK_VERSION_SHORT: 0.8.2-SNAPSHOT
8 8
 SCALA_VERSION: 2.9.3
9 9
 MESOS_VERSION: 0.13.0
10 10
 SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net
2  ec2/spark_ec2.py
@@ -565,7 +565,7 @@ def ssh(host, opts, command):
565 565
   while True:
566 566
     try:
567 567
       return subprocess.check_call(
568  
-        "ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" %
  568
+        "ssh -t -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" %
569 569
         (opts.identity_file, opts.user, host, command), shell=True)
570 570
     except subprocess.CalledProcessError as e:
571 571
       if (tries > 2):
2  examples/pom.xml
@@ -21,7 +21,7 @@
21 21
   <parent>
22 22
     <groupId>org.apache.spark</groupId>
23 23
     <artifactId>spark-parent</artifactId>
24  
-    <version>0.8.1-csd-3-SNAPSHOT</version>
  24
+    <version>0.8.2-candidate-csd-1-SNAPSHOT</version>
25 25
     <relativePath>../pom.xml</relativePath>
26 26
   </parent>
27 27
 
2  mllib/pom.xml
@@ -21,7 +21,7 @@
21 21
   <parent>
22 22
     <groupId>org.apache.spark</groupId>
23 23
     <artifactId>spark-parent</artifactId>
24  
-    <version>0.8.1-csd-3-SNAPSHOT</version>
  24
+    <version>0.8.2-candidate-csd-1-SNAPSHOT</version>
25 25
     <relativePath>../pom.xml</relativePath>
26 26
   </parent>
27 27
 
2  new-yarn/pom.xml
@@ -20,7 +20,7 @@
20 20
   <parent>
21 21
     <groupId>org.apache.spark</groupId>
22 22
     <artifactId>spark-parent</artifactId>
23  
-    <version>0.8.1-incubating</version>
  23
+    <version>0.8.2-incubating-SNAPSHOT</version>
24 24
     <relativePath>../pom.xml</relativePath>
25 25
   </parent>
26 26
 
2  pom.xml
@@ -25,7 +25,7 @@
25 25
   </parent>
26 26
   <groupId>org.apache.spark</groupId>
27 27
   <artifactId>spark-parent</artifactId>
28  
-  <version>0.8.1-csd-3-SNAPSHOT</version>
  28
+  <version>0.8.2-candidate-csd-1-SNAPSHOT</version>
29 29
   <packaging>pom</packaging>
30 30
   <name>Spark Project Parent POM</name>
31 31
   <url>http://spark.incubator.apache.org/</url>
2  project/SparkBuild.scala
@@ -96,7 +96,7 @@ object SparkBuild extends Build {
96 96
 
97 97
   def sharedSettings = Defaults.defaultSettings ++ Seq(
98 98
     organization := "org.apache.spark",
99  
-    version := "0.8.1-incubating",
  99
+    version := "0.8.2-incubating-SNAPSHOT",
100 100
     scalaVersion := "2.9.3",
101 101
     scalacOptions := Seq("-Xmax-classfile-name", "120", "-unchecked", "-deprecation",
102 102
       "-target:" + SCALAC_JVM_VERSION),
9  python/pyspark/rdd.py
@@ -569,9 +569,14 @@ def takeUpToNum(iterator):
569 569
         # Take only up to num elements from each partition we try
570 570
         mapped = self.mapPartitions(takeUpToNum)
571 571
         items = []
  572
+        # TODO(shivaram): Similar to the scala implementation, update the take 
  573
+        # method to scan multiple splits based on an estimate of how many elements 
  574
+        # we have per-split.
572 575
         for partition in range(mapped._jrdd.splits().size()):
573  
-            iterator = self.ctx._takePartition(mapped._jrdd.rdd(), partition)
574  
-            items.extend(self._collect_iterator_through_file(iterator))
  576
+            partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1)
  577
+            partitionsToTake[0] = partition
  578
+            iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator()
  579
+            items.extend(mapped._collect_iterator_through_file(iterator))
575 580
             if len(items) >= num:
576 581
                 break
577 582
         return items[:num]
2  python/pyspark/shell.py
@@ -35,7 +35,7 @@
35 35
       ____              __
36 36
      / __/__  ___ _____/ /__
37 37
     _\ \/ _ \/ _ `/ __/  '_/
38  
-   /__ / .__/\_,_/_/ /_/\_\   version 0.8.1
  38
+   /__ / .__/\_,_/_/ /_/\_\   version 0.8.2-SNAPSHOT
39 39
       /_/
40 40
 """
41 41
 print "Using Python version %s (%s, %s)" % (
2  repl/pom.xml
@@ -21,7 +21,7 @@
21 21
   <parent>
22 22
     <groupId>org.apache.spark</groupId>
23 23
     <artifactId>spark-parent</artifactId>
24  
-    <version>0.8.1-csd-3-SNAPSHOT</version>
  24
+    <version>0.8.2-candidate-csd-1-SNAPSHOT</version>
25 25
     <relativePath>../pom.xml</relativePath>
26 26
   </parent>
27 27
 
2  repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -200,7 +200,7 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
200 200
       ____              __  
201 201
      / __/__  ___ _____/ /__
202 202
     _\ \/ _ \/ _ `/ __/  '_/
203  
-   /___/ .__/\_,_/_/ /_/\_\   version 0.8.1
  203
+   /___/ .__/\_,_/_/ /_/\_\   version 0.8.2-SNAPSHOT
204 204
       /_/                  
205 205
 """)
206 206
     import Properties._
2  streaming/pom.xml
@@ -21,7 +21,7 @@
21 21
   <parent>
22 22
     <groupId>org.apache.spark</groupId>
23 23
     <artifactId>spark-parent</artifactId>
24  
-    <version>0.8.1-csd-3-SNAPSHOT</version>
  24
+    <version>0.8.2-candidate-csd-1-SNAPSHOT</version>
25 25
     <relativePath>../pom.xml</relativePath>
26 26
   </parent>
27 27
 
2  tools/pom.xml
@@ -20,7 +20,7 @@
20 20
   <parent>
21 21
     <groupId>org.apache.spark</groupId>
22 22
     <artifactId>spark-parent</artifactId>
23  
-    <version>0.8.1-csd-3-SNAPSHOT</version>
  23
+    <version>0.8.2-candidate-csd-1-SNAPSHOT</version>
24 24
     <relativePath>../pom.xml</relativePath>
25 25
   </parent>
26 26
 
2  yarn/pom.xml
@@ -20,7 +20,7 @@
20 20
   <parent>
21 21
     <groupId>org.apache.spark</groupId>
22 22
     <artifactId>spark-parent</artifactId>
23  
-    <version>0.8.1-candidate-csd-1-SNAPSHOT</version>
  23
+    <version>0.8.2-candidate-csd-1-SNAPSHOT</version>
24 24
     <relativePath>../pom.xml</relativePath>
25 25
   </parent>
26 26
 
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.