Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into hash-benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Feb 7, 2016
2 parents 315af8c + bc8890b commit 9a1f8ff
Show file tree
Hide file tree
Showing 18 changed files with 101 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])

/**
* Return the key-value pairs in this RDD to the master as a Map.
*
* @note this method should only be used if the resulting data is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collectAsMap(): java.util.Map[K, V] = mapAsSerializableJavaMap(rdd.collectAsMap())

Expand Down
24 changes: 24 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {

/**
* Return an array that contains all of the elements in this RDD.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collect(): JList[T] =
rdd.collect().toSeq.asJava
Expand Down Expand Up @@ -465,6 +468,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
* it will be slow if a lot of partitions are required. In that case, use collect() to get the
* whole RDD instead.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def take(num: Int): JList[T] =
rdd.take(num).toSeq.asJava
Expand Down Expand Up @@ -548,6 +554,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Returns the top k (largest) elements from this RDD as defined by
* the specified Comparator[T] and maintains the order.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
* @param num k, the number of top elements to return
* @param comp the comparator that defines the order
* @return an array of top elements
Expand All @@ -559,6 +568,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Returns the top k (largest) elements from this RDD using the
* natural ordering for T and maintains the order.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
* @param num k, the number of top elements to return
* @return an array of top elements
*/
Expand All @@ -570,6 +582,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Returns the first k (smallest) elements from this RDD as defined by
* the specified Comparator[T] and maintains the order.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
* @param num k, the number of elements to return
* @param comp the comparator that defines the order
* @return an array of top elements
Expand Down Expand Up @@ -601,6 +616,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Returns the first k (smallest) elements from this RDD using the
* natural ordering for T while maintain the order.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
* @param num k, the number of top elements to return
* @return an array of top elements
*/
Expand Down Expand Up @@ -634,6 +652,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* The asynchronous version of `collect`, which returns a future for
* retrieving an array containing all of the elements in this RDD.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collectAsync(): JavaFutureAction[JList[T]] = {
new JavaFutureActionWrapper(rdd.collectAsync(), (x: Seq[T]) => x.asJava)
Expand All @@ -642,6 +663,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* The asynchronous version of the `take` action, which returns a
* future for retrieving the first `num` elements of this RDD.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def takeAsync(num: Int): JavaFutureAction[JList[T]] = {
new JavaFutureActionWrapper(rdd.takeAsync(num), (x: Seq[T]) => x.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.net.URL
import java.util.concurrent.TimeoutException

import scala.collection.mutable.ListBuffer
import scala.concurrent.{future, promise, Await}
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.language.postfixOps
Expand Down Expand Up @@ -249,7 +249,7 @@ private object FaultToleranceTest extends App with Logging {

/** This includes Client retry logic, so it may take a while if the cluster is recovering. */
private def assertUsable() = {
val f = future {
val f = Future {
try {
val res = sc.parallelize(0 until 10).collect()
assertTrue(res.toList == (0 until 10))
Expand Down Expand Up @@ -283,7 +283,7 @@ private object FaultToleranceTest extends App with Logging {
numAlive == 1 && numStandby == masters.size - 1 && numLiveApps >= 1
}

val f = future {
val f = Future {
try {
while (!stateValid()) {
Thread.sleep(1000)
Expand Down Expand Up @@ -405,7 +405,7 @@ private object SparkDocker {
}

private def startNode(dockerCmd: ProcessBuilder) : (String, DockerId, File) = {
val ipPromise = promise[String]()
val ipPromise = Promise[String]()
val outFile = File.createTempFile("fault-tolerance-test", "", Utils.createTempDir())
val outStream: FileWriter = new FileWriter(outFile)
def findIpAndLog(line: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ private[deploy] class Worker(
// rpcEndpoint.
// Copy ids so that it can be used in the cleanup thread.
val appIds = executors.values.map(_.appId).toSet
val cleanupFuture = concurrent.future {
val cleanupFuture = concurrent.Future {
val appDirs = workDir.listFiles()
if (appDirs == null) {
throw new IOException("ERROR: Failed to list files in " + appDirs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*
* Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
* one value per key is preserved in the map returned)
*
* @note this method should only be used if the resulting data is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collectAsMap(): Map[K, V] = self.withScope {
val data = self.collect()
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,9 @@ abstract class RDD[T: ClassTag](
/**
* Return a fixed-size sampled subset of this RDD in an array
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*
* @param withReplacement whether sampling is done with replacement
* @param num size of the returned sample
* @param seed seed for the random number generator
Expand Down Expand Up @@ -836,6 +839,9 @@ abstract class RDD[T: ClassTag](

/**
* Return an array that contains all of the elements in this RDD.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Expand Down Expand Up @@ -1202,6 +1208,9 @@ abstract class RDD[T: ClassTag](
* results from that partition to estimate the number of additional partitions needed to satisfy
* the limit.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*
* @note due to complications in the internal implementation, this method will raise
* an exception if called on an RDD of `Nothing` or `Null`.
*/
Expand Down Expand Up @@ -1263,6 +1272,9 @@ abstract class RDD[T: ClassTag](
* // returns Array(6, 5)
* }}}
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*
* @param num k, the number of top elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
Expand All @@ -1283,6 +1295,9 @@ abstract class RDD[T: ClassTag](
* // returns Array(2, 3)
* }}}
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*
* @param num k, the number of elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Benchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private[spark] object Benchmark {
}
val best = runTimes.min
val avg = runTimes.sum / iters
Result(avg / 1000000, num.toDouble / (best / 1000), best / 1000000)
Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0)
}
}

18 changes: 9 additions & 9 deletions core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.Semaphore
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.future
import scala.concurrent.Future

import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
Expand Down Expand Up @@ -103,7 +103,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft

val rdd1 = rdd.map(x => x)

future {
Future {
taskStartedSemaphore.acquire()
sc.cancelAllJobs()
taskCancelledSemaphore.release(100000)
Expand All @@ -126,7 +126,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
})

// jobA is the one to be cancelled.
val jobA = future {
val jobA = Future {
sc.setJobGroup("jobA", "this is a job to be cancelled")
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
}
Expand Down Expand Up @@ -191,7 +191,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
})

// jobA is the one to be cancelled.
val jobA = future {
val jobA = Future {
sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true)
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(100000); i }.count()
}
Expand Down Expand Up @@ -231,7 +231,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
val f2 = rdd.countAsync()

// Kill one of the action.
future {
Future {
sem1.acquire()
f1.cancel()
JobCancellationSuite.twoJobsSharingStageSemaphore.release(10)
Expand All @@ -247,7 +247,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
// Cancel before launching any tasks
{
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
future { f.cancel() }
Future { f.cancel() }
val e = intercept[SparkException] { f.get() }
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
}
Expand All @@ -263,7 +263,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
})

val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
future {
Future {
// Wait until some tasks were launched before we cancel the job.
sem.acquire()
f.cancel()
Expand All @@ -277,7 +277,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
// Cancel before launching any tasks
{
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000)
future { f.cancel() }
Future { f.cancel() }
val e = intercept[SparkException] { f.get() }
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
}
Expand All @@ -292,7 +292,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
}
})
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000)
future {
Future {
sem.acquire()
f.cancel()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.InputStream
import java.util.concurrent.Semaphore

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.future
import scala.concurrent.Future

import org.mockito.Matchers.{any, eq => meq}
import org.mockito.Mockito._
Expand Down Expand Up @@ -149,7 +149,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
override def answer(invocation: InvocationOnMock): Unit = {
val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
future {
Future {
// Return the first two blocks, and wait till task completion before returning the 3rd one
listener.onBlockFetchSuccess(
ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
Expand Down Expand Up @@ -211,7 +211,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
when(transfer.fetchBlocks(any(), any(), any(), any(), any())).thenAnswer(new Answer[Unit] {
override def answer(invocation: InvocationOnMock): Unit = {
val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
future {
Future {
// Return the first block, and then fail.
listener.onBlockFetchSuccess(
ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,13 @@ class LogisticRegression @Since("1.2.0") (
val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
} else {
val standardizationParam = $(standardization)
def regParamL1Fun = (index: Int) => {
// Remove the L1 penalization on the intercept
if (index == numFeatures) {
0.0
} else {
if ($(standardization)) {
if (standardizationParam) {
regParamL1
} else {
// If `standardization` is false, we still standardize the data
Expand Down
4 changes: 3 additions & 1 deletion mllib/src/main/scala/org/apache/spark/ml/param/params.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ class Param[T](val parent: String, val name: String, val doc: String, val isVali
}
}

override final def toString: String = s"${parent}__$name"
private[this] val stringRepresentation = s"${parent}__$name"

override final def toString: String = stringRepresentation

override final def hashCode: Int = toString.##

Expand Down
17 changes: 17 additions & 0 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,9 @@ def takeSample(self, withReplacement, num, seed=None):
"""
Return a fixed-size sampled subset of this RDD.
Note that this method should only be used if the resulting array is expected
to be small, as all the data is loaded into the driver's memory.
>>> rdd = sc.parallelize(range(0, 10))
>>> len(rdd.takeSample(True, 20, 1))
20
Expand Down Expand Up @@ -766,6 +769,8 @@ def func(it):
def collect(self):
"""
Return a list that contains all of the elements in this RDD.
Note that this method should only be used if the resulting array is expected
to be small, as all the data is loaded into the driver's memory.
"""
with SCCallSiteSync(self.context) as css:
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
Expand Down Expand Up @@ -1213,6 +1218,9 @@ def top(self, num, key=None):
"""
Get the top N elements from a RDD.
Note that this method should only be used if the resulting array is expected
to be small, as all the data is loaded into the driver's memory.
Note: It returns the list sorted in descending order.
>>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
Expand All @@ -1235,6 +1243,9 @@ def takeOrdered(self, num, key=None):
Get the N elements from a RDD ordered in ascending order or as
specified by the optional key function.
Note that this method should only be used if the resulting array is expected
to be small, as all the data is loaded into the driver's memory.
>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
[1, 2, 3, 4, 5, 6]
>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
Expand All @@ -1254,6 +1265,9 @@ def take(self, num):
that partition to estimate the number of additional partitions needed
to satisfy the limit.
Note that this method should only be used if the resulting array is expected
to be small, as all the data is loaded into the driver's memory.
Translated from the Scala implementation in RDD#take().
>>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
Expand Down Expand Up @@ -1511,6 +1525,9 @@ def collectAsMap(self):
"""
Return the key-value pairs in this RDD to the master as a dictionary.
Note that this method should only be used if the resulting data is expected
to be small, as all the data is loaded into the driver's memory.
>>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
>>> m[1]
2
Expand Down
Loading

0 comments on commit 9a1f8ff

Please sign in to comment.