Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
str-janus committed Dec 4, 2014
2 parents 35c1884 + 7e758d7 commit 616d111
Show file tree
Hide file tree
Showing 69 changed files with 2,945 additions and 588 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*.ipr
*.iml
*.iws
*.pyc
.idea/
.idea_modules/
sbt/*.jar
Expand Down Expand Up @@ -49,6 +50,8 @@ dependency-reduced-pom.xml
checkpoint
derby.log
dist/
dev/create-release/*txt
dev/create-release/*new
spark-*-bin-*.tgz
unit-tests.log
/lib/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ private[spark] class AppClient(
val fullId = appId + "/" + id
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort,
cores))
master ! ExecutorStateChanged(appId, id, ExecutorState.RUNNING, None, None)
listener.executorAdded(fullId, workerId, hostPort, cores, memory)

case ExecutorUpdated(id, state, message, exitStatus) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,6 @@ private[spark] class ExecutorRunner(
Files.write(header, stderr, UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

state = ExecutorState.RUNNING
worker ! ExecutorStateChanged(appId, execId, state, None, None)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import org.apache.spark.input.StreamFileInputFormat
import org.apache.spark.{ Partition, SparkContext }

private[spark] class BinaryFileRDD[T](
sc: SparkContext,
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
keyClass: Class[String],
valueClass: Class[T],
@transient conf: Configuration,
minPartitions: Int)
sc: SparkContext,
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
keyClass: Class[String],
valueClass: Class[T],
@transient conf: Configuration,
minPartitions: Int)
extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {

override def getPartitions: Array[Partition] = {
Expand Down
35 changes: 0 additions & 35 deletions core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala

This file was deleted.

34 changes: 0 additions & 34 deletions core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala

This file was deleted.

35 changes: 0 additions & 35 deletions core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala

This file was deleted.

31 changes: 0 additions & 31 deletions core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala

This file was deleted.

32 changes: 0 additions & 32 deletions core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala

This file was deleted.

33 changes: 0 additions & 33 deletions core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala

This file was deleted.

10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def mapValues[U](f: V => U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MappedValuesRDD(self, cleanF)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
preservesPartitioning = true)
}

/**
Expand All @@ -669,7 +671,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new FlatMappedValuesRDD(self, cleanF)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.flatMap { case (k, v) =>
cleanF(v).map(x => (k, x))
},
preservesPartitioning = true)
}

/**
Expand Down
28 changes: 20 additions & 8 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.rdd

import java.util.{Properties, Random}
import java.util.Random

import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
Expand All @@ -36,13 +36,12 @@ import org.apache.spark._
import org.apache.spark.Partitioner._
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.partial.BoundedDouble
import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite}
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, BernoulliCellSampler,
SamplingUtils}
Expand Down Expand Up @@ -270,19 +269,30 @@ abstract class RDD[T: ClassTag](
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
def map[U: ClassTag](f: T => U): RDD[U] = {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =
new FlatMappedRDD(this, sc.clean(f))
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
def filter(f: T => Boolean): RDD[T] = {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}

/**
* Return a new RDD containing the distinct elements in this RDD.
Expand Down Expand Up @@ -503,7 +513,9 @@ abstract class RDD[T: ClassTag](
/**
* Return an RDD created by coalescing all elements within each partition into an array.
*/
def glom(): RDD[Array[T]] = new GlommedRDD(this)
def glom(): RDD[Array[T]] = {
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}

/**
* Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
val curMem = threadMemory(threadId)
val freeMemory = maxMemory - threadMemory.values.sum

// How much we can grant this thread; don't let it grow to more than 1 / numActiveThreads
val maxToGrant = math.min(numBytes, (maxMemory / numActiveThreads) - curMem)
// How much we can grant this thread; don't let it grow to more than 1 / numActiveThreads;
// don't let it be negative
val maxToGrant = math.min(numBytes, math.max(0, (maxMemory / numActiveThreads) - curMem))

if (curMem < maxMemory / (2 * numActiveThreads)) {
// We want to let each thread get at least 1 / (2 * numActiveThreads) before blocking;
Expand Down
Loading

0 comments on commit 616d111

Please sign in to comment.