Skip to content

Commit

Permalink
Merge branch 'master' into nomocks
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
  • Loading branch information
Stephen Haberman committed Feb 26, 2013
2 parents 921be76 + d6e6abe commit a4adeb2
Show file tree
Hide file tree
Showing 176 changed files with 5,166 additions and 2,212 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ log/
spark-tests.log
streaming-tests.log
dependency-reduced-pom.xml
.ensime
.ensime_lucene
42 changes: 19 additions & 23 deletions bagel/src/main/scala/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ import spark.SparkContext._
import scala.collection.mutable.ArrayBuffer

object Bagel extends Logging {
def run[K : Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest,
C : Manifest, A : Manifest](
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest,
C: Manifest, A: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
combiner: Combiner[M, C],
aggregator: Option[Aggregator[V, A]],
partitioner: Partitioner,
numSplits: Int
numPartitions: Int
)(
compute: (V, Option[C], Option[A], Int) => (V, Array[M])
): RDD[(K, V)] = {
val splits = if (numSplits != 0) numSplits else sc.defaultParallelism
val splits = if (numPartitions != 0) numPartitions else sc.defaultParallelism

var superstep = 0
var verts = vertices
Expand Down Expand Up @@ -50,57 +50,55 @@ object Bagel extends Logging {
verts
}

def run[K : Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest,
C : Manifest](
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
combiner: Combiner[M, C],
partitioner: Partitioner,
numSplits: Int
numPartitions: Int
)(
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)] = {
run[K, V, M, C, Nothing](
sc, vertices, messages, combiner, None, partitioner, numSplits)(
sc, vertices, messages, combiner, None, partitioner, numPartitions)(
addAggregatorArg[K, V, M, C](compute))
}

def run[K : Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest,
C : Manifest](
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
combiner: Combiner[M, C],
numSplits: Int
numPartitions: Int
)(
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)] = {
val part = new HashPartitioner(numSplits)
val part = new HashPartitioner(numPartitions)
run[K, V, M, C, Nothing](
sc, vertices, messages, combiner, None, part, numSplits)(
sc, vertices, messages, combiner, None, part, numPartitions)(
addAggregatorArg[K, V, M, C](compute))
}

def run[K : Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
numSplits: Int
numPartitions: Int
)(
compute: (V, Option[Array[M]], Int) => (V, Array[M])
): RDD[(K, V)] = {
val part = new HashPartitioner(numSplits)
val part = new HashPartitioner(numPartitions)
run[K, V, M, Array[M], Nothing](
sc, vertices, messages, new DefaultCombiner(), None, part, numSplits)(
sc, vertices, messages, new DefaultCombiner(), None, part, numPartitions)(
addAggregatorArg[K, V, M, Array[M]](compute))
}

/**
* Aggregates the given vertices using the given aggregator, if it
* is specified.
*/
private def agg[K, V <: Vertex, A : Manifest](
private def agg[K, V <: Vertex, A: Manifest](
verts: RDD[(K, V)],
aggregator: Option[Aggregator[V, A]]
): Option[A] = aggregator match {
Expand All @@ -116,7 +114,7 @@ object Bagel extends Logging {
* function. Returns the processed RDD, the number of messages
* created, and the number of active vertices.
*/
private def comp[K : Manifest, V <: Vertex, M <: Message[K], C](
private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
sc: SparkContext,
grouped: RDD[(K, (Seq[C], Seq[V]))],
compute: (V, Option[C]) => (V, Array[M])
Expand Down Expand Up @@ -149,9 +147,7 @@ object Bagel extends Logging {
* Converts a compute function that doesn't take an aggregator to
* one that does, so it can be passed to Bagel.run.
*/
private def addAggregatorArg[
K : Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C
](
private def addAggregatorArg[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C](
compute: (V, Option[C], Int) => (V, Array[M])
): (V, Option[C], Option[Nothing], Int) => (V, Array[M]) = {
(vert: V, msgs: Option[C], aggregated: Option[Nothing], superstep: Int) =>
Expand All @@ -170,7 +166,7 @@ trait Aggregator[V, A] {
def mergeAggregators(a: A, b: A): A
}

class DefaultCombiner[M : Manifest] extends Combiner[M, Array[M]] with Serializable {
class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable {
def createCombiner(msg: M): Array[M] =
Array(msg)
def mergeMsg(combiner: Array[M], msg: M): Array[M] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import scala.xml.{XML,NodeSeq}
object WikipediaPageRank {
def main(args: Array[String]) {
if (args.length < 5) {
System.err.println("Usage: WikipediaPageRank <inputFile> <threshold> <numSplits> <host> <usePartitioner>")
System.err.println("Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <host> <usePartitioner>")
System.exit(-1)
}

Expand All @@ -25,7 +25,7 @@ object WikipediaPageRank {

val inputFile = args(0)
val threshold = args(1).toDouble
val numSplits = args(2).toInt
val numPartitions = args(2).toInt
val host = args(3)
val usePartitioner = args(4).toBoolean
val sc = new SparkContext(host, "WikipediaPageRank")
Expand Down Expand Up @@ -69,7 +69,7 @@ object WikipediaPageRank {
val result =
Bagel.run(
sc, vertices, messages, combiner = new PRCombiner(),
numSplits = numSplits)(
numPartitions = numPartitions)(
utils.computeWithCombiner(numVertices, epsilon))

// Print the result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object WikipediaPageRankStandalone {
n: Long,
partitioner: Partitioner,
usePartitioner: Boolean,
numSplits: Int
numPartitions: Int
): RDD[(String, Double)] = {
var ranks = links.mapValues { edges => defaultRank }
for (i <- 1 to numIterations) {
Expand Down
35 changes: 27 additions & 8 deletions bagel/src/test/scala/bagel/BagelSuite.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package spark.bagel

import org.scalatest.{FunSuite, Assertions, BeforeAndAfter}
import org.scalatest.prop.Checkers
import org.scalacheck.Arbitrary._
import org.scalacheck.Gen
import org.scalacheck.Prop._
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._

import scala.collection.mutable.ArrayBuffer

Expand All @@ -13,7 +11,7 @@ import spark._
class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
class TestMessage(val targetId: String) extends Message[String] with Serializable

class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeouts {

var sc: SparkContext = _

Expand All @@ -25,7 +23,7 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
}

test("halting by voting") {
sc = new SparkContext("local", "test")
val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(true, 0))))
Expand All @@ -36,8 +34,9 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
(self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) =>
(new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]())
}
for ((id, vert) <- result.collect)
for ((id, vert) <- result.collect) {
assert(vert.age === numSupersteps)
}
}

test("halting by message silence") {
Expand All @@ -57,7 +56,27 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
}
(new TestVertex(self.active, self.age + 1), msgsOut)
}
for ((id, vert) <- result.collect)
for ((id, vert) <- result.collect) {
assert(vert.age === numSupersteps)
}
}

test("large number of iterations") {
// This tests whether jobs with a large number of iterations finish in a reasonable time,
// because non-memoized recursion in RDD or DAGScheduler used to cause them to hang
failAfter(10 seconds) {
sc = new SparkContext("local", "test")
val verts = sc.parallelize((1 to 4).map(id => (id.toString, new TestVertex(true, 0))))
val msgs = sc.parallelize(Array[(String, TestMessage)]())
val numSupersteps = 50
val result =
Bagel.run(sc, verts, msgs, sc.defaultParallelism) {
(self: TestVertex, msgs: Option[Array[TestMessage]], superstep: Int) =>
(new TestVertex(superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]())
}
for ((id, vert) <- result.collect) {
assert(vert.age === numSupersteps)
}
}
}
}
4 changes: 2 additions & 2 deletions core/src/main/scala/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
private val loading = new HashSet[String]

/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](rdd: RDD[T], split: Split, context: TaskContext, storageLevel: StorageLevel)
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
: Iterator[T] = {
val key = "rdd_%d_%d".format(rdd.id, split.index)
logInfo("Cache key is " + key)
blockManager.get(key) match {
case Some(cachedValues) =>
// Split is in cache, so just return its values
// Partition is in cache, so just return its values
logInfo("Found partition in cache!")
return cachedValues.asInstanceOf[Iterator[T]]

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/spark/DoubleRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
/** (Experimental) Approximate operation to return the mean within a timeout. */
def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
val evaluator = new MeanEvaluator(self.splits.size, confidence)
val evaluator = new MeanEvaluator(self.partitions.size, confidence)
self.context.runApproximateJob(self, processPartition, evaluator, timeout)
}

/** (Experimental) Approximate operation to return the sum within a timeout. */
def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
val evaluator = new SumEvaluator(self.splits.size, confidence)
val evaluator = new SumEvaluator(self.partitions.size, confidence)
self.context.runApproximateJob(self, processPartition, evaluator, timeout)
}
}
Loading

0 comments on commit a4adeb2

Please sign in to comment.