Skip to content

Commit

Permalink
Merge branch 'master' into stratified
Browse files Browse the repository at this point in the history
  • Loading branch information
dorx committed Jul 8, 2014
2 parents 680b677 + e6f7bfc commit a2bf756
Show file tree
Hide file tree
Showing 37 changed files with 512 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import akka.actor._
import akka.actor.OneForOneStrategy
Expand Down Expand Up @@ -768,6 +769,10 @@ class DAGScheduler(
abortStage(stage, "Task not serializable: " + e.toString)
runningStages -= stage
return
case NonFatal(e) => // Other exceptions, such as IllegalArgumentException from Kryo.
abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")
runningStages -= stage
return
}

logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class AccumulatorSuite extends FunSuite with Matchers with LocalSparkContext {
val acc : Accumulator[Int] = sc.accumulator(0)

val d = sc.parallelize(1 to 20)
evaluating {d.foreach{x => acc.value = x}} should produce [Exception]
an [Exception] should be thrownBy {d.foreach{x => acc.value = x}}
}

test ("add value to collection accumulators") {
Expand All @@ -87,11 +87,11 @@ class AccumulatorSuite extends FunSuite with Matchers with LocalSparkContext {
sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val d = sc.parallelize(1 to maxI)
evaluating {
an [SparkException] should be thrownBy {
d.foreach {
x => acc.value += x
}
} should produce [SparkException]
}
resetSparkContext()
}
}
Expand Down
38 changes: 19 additions & 19 deletions core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,45 +27,45 @@ import org.scalatest.Matchers
class NextIteratorSuite extends FunSuite with Matchers {
test("one iteration") {
val i = new StubIterator(Buffer(1))
i.hasNext should be === true
i.next should be === 1
i.hasNext should be === false
i.hasNext should be (true)
i.next should be (1)
i.hasNext should be (false)
intercept[NoSuchElementException] { i.next() }
}

test("two iterations") {
val i = new StubIterator(Buffer(1, 2))
i.hasNext should be === true
i.next should be === 1
i.hasNext should be === true
i.next should be === 2
i.hasNext should be === false
i.hasNext should be (true)
i.next should be (1)
i.hasNext should be (true)
i.next should be (2)
i.hasNext should be (false)
intercept[NoSuchElementException] { i.next() }
}

test("empty iteration") {
val i = new StubIterator(Buffer())
i.hasNext should be === false
i.hasNext should be (false)
intercept[NoSuchElementException] { i.next() }
}

test("close is called once for empty iterations") {
val i = new StubIterator(Buffer())
i.hasNext should be === false
i.hasNext should be === false
i.closeCalled should be === 1
i.hasNext should be (false)
i.hasNext should be (false)
i.closeCalled should be (1)
}

test("close is called once for non-empty iterations") {
val i = new StubIterator(Buffer(1, 2))
i.next should be === 1
i.next should be === 2
i.next should be (1)
i.next should be (2)
// close isn't called until we check for the next element
i.closeCalled should be === 0
i.hasNext should be === false
i.closeCalled should be === 1
i.hasNext should be === false
i.closeCalled should be === 1
i.closeCalled should be (0)
i.hasNext should be (false)
i.closeCalled should be (1)
i.hasNext should be (false)
i.closeCalled should be (1)
}

class StubIterator(ints: Buffer[Int]) extends NextIterator[Int] {
Expand Down
70 changes: 35 additions & 35 deletions core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,53 +63,53 @@ class SizeEstimatorSuite
}

test("simple classes") {
expectResult(16)(SizeEstimator.estimate(new DummyClass1))
expectResult(16)(SizeEstimator.estimate(new DummyClass2))
expectResult(24)(SizeEstimator.estimate(new DummyClass3))
expectResult(24)(SizeEstimator.estimate(new DummyClass4(null)))
expectResult(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3)))
assertResult(16)(SizeEstimator.estimate(new DummyClass1))
assertResult(16)(SizeEstimator.estimate(new DummyClass2))
assertResult(24)(SizeEstimator.estimate(new DummyClass3))
assertResult(24)(SizeEstimator.estimate(new DummyClass4(null)))
assertResult(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3)))
}

// NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors
// (Sun vs IBM). Use a DummyString class to make tests deterministic.
test("strings") {
expectResult(40)(SizeEstimator.estimate(DummyString("")))
expectResult(48)(SizeEstimator.estimate(DummyString("a")))
expectResult(48)(SizeEstimator.estimate(DummyString("ab")))
expectResult(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
assertResult(40)(SizeEstimator.estimate(DummyString("")))
assertResult(48)(SizeEstimator.estimate(DummyString("a")))
assertResult(48)(SizeEstimator.estimate(DummyString("ab")))
assertResult(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
}

test("primitive arrays") {
expectResult(32)(SizeEstimator.estimate(new Array[Byte](10)))
expectResult(40)(SizeEstimator.estimate(new Array[Char](10)))
expectResult(40)(SizeEstimator.estimate(new Array[Short](10)))
expectResult(56)(SizeEstimator.estimate(new Array[Int](10)))
expectResult(96)(SizeEstimator.estimate(new Array[Long](10)))
expectResult(56)(SizeEstimator.estimate(new Array[Float](10)))
expectResult(96)(SizeEstimator.estimate(new Array[Double](10)))
expectResult(4016)(SizeEstimator.estimate(new Array[Int](1000)))
expectResult(8016)(SizeEstimator.estimate(new Array[Long](1000)))
assertResult(32)(SizeEstimator.estimate(new Array[Byte](10)))
assertResult(40)(SizeEstimator.estimate(new Array[Char](10)))
assertResult(40)(SizeEstimator.estimate(new Array[Short](10)))
assertResult(56)(SizeEstimator.estimate(new Array[Int](10)))
assertResult(96)(SizeEstimator.estimate(new Array[Long](10)))
assertResult(56)(SizeEstimator.estimate(new Array[Float](10)))
assertResult(96)(SizeEstimator.estimate(new Array[Double](10)))
assertResult(4016)(SizeEstimator.estimate(new Array[Int](1000)))
assertResult(8016)(SizeEstimator.estimate(new Array[Long](1000)))
}

test("object arrays") {
// Arrays containing nulls should just have one pointer per element
expectResult(56)(SizeEstimator.estimate(new Array[String](10)))
expectResult(56)(SizeEstimator.estimate(new Array[AnyRef](10)))
assertResult(56)(SizeEstimator.estimate(new Array[String](10)))
assertResult(56)(SizeEstimator.estimate(new Array[AnyRef](10)))
// For object arrays with non-null elements, each object should take one pointer plus
// however many bytes that class takes. (Note that Array.fill calls the code in its
// second parameter separately for each object, so we get distinct objects.)
expectResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)))
expectResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)))
expectResult(296)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)))
expectResult(56)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)))
assertResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)))
assertResult(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)))
assertResult(296)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)))
assertResult(56)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)))

// Past size 100, our samples 100 elements, but we should still get the right size.
expectResult(28016)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)))
assertResult(28016)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)))

// If an array contains the *same* element many times, we should only count it once.
val d1 = new DummyClass1
expectResult(72)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object
expectResult(432)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object
assertResult(72)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object
assertResult(432)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object

// Same thing with huge array containing the same element many times. Note that this won't
// return exactly 4032 because it can't tell that *all* the elements will equal the first
Expand All @@ -127,10 +127,10 @@ class SizeEstimatorSuite
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()

expectResult(40)(SizeEstimator.estimate(DummyString("")))
expectResult(48)(SizeEstimator.estimate(DummyString("a")))
expectResult(48)(SizeEstimator.estimate(DummyString("ab")))
expectResult(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
assertResult(40)(SizeEstimator.estimate(DummyString("")))
assertResult(48)(SizeEstimator.estimate(DummyString("a")))
assertResult(48)(SizeEstimator.estimate(DummyString("ab")))
assertResult(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
resetOrClear("os.arch", arch)
}

Expand All @@ -142,10 +142,10 @@ class SizeEstimatorSuite
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()

expectResult(56)(SizeEstimator.estimate(DummyString("")))
expectResult(64)(SizeEstimator.estimate(DummyString("a")))
expectResult(64)(SizeEstimator.estimate(DummyString("ab")))
expectResult(72)(SizeEstimator.estimate(DummyString("abcdefgh")))
assertResult(56)(SizeEstimator.estimate(DummyString("")))
assertResult(64)(SizeEstimator.estimate(DummyString("a")))
assertResult(64)(SizeEstimator.estimate(DummyString("ab")))
assertResult(72)(SizeEstimator.estimate(DummyString("abcdefgh")))

resetOrClear("os.arch", arch)
resetOrClear("spark.test.useCompressedOops", oops)
Expand Down
4 changes: 2 additions & 2 deletions docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -975,8 +975,8 @@ for details.
<td> Return an array with the first <i>n</i> elements of the dataset. Note that this is currently not executed in parallel. Instead, the driver program computes all the elements. </td>
</tr>
<tr>
<td> <b>takeSample</b>(<i>withReplacement</i>, <i>num</i>, <i>seed</i>) </td>
<td> Return an array with a random sample of <i>num</i> elements of the dataset, with or without replacement, using the given random number generator seed. </td>
<td> <b>takeSample</b>(<i>withReplacement</i>, <i>num</i>, [<i>seed</i>]) </td>
<td> Return an array with a random sample of <i>num</i> elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.</td>
</tr>
<tr>
<td> <b>takeOrdered</b>(<i>n</i>, <i>[ordering]</i>) </td>
Expand Down
2 changes: 1 addition & 1 deletion docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
</tr>
<tr>
<td><code>spark.yarn.max.executor.failures</code></td>
<td>2*numExecutors</td>
<td>numExecutors * 2, with minimum of 3</td>
<td>
The maximum number of executor failures before failing the application.
</td>
Expand Down
1 change: 1 addition & 0 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ def launch_cluster(conn, opts, cluster_name):
master_group.authorize(src_group=slave_group)
master_group.authorize('tcp', 22, 22, '0.0.0.0/0')
master_group.authorize('tcp', 8080, 8081, '0.0.0.0/0')
master_group.authorize('tcp', 18080, 18080, '0.0.0.0/0')
master_group.authorize('tcp', 19999, 19999, '0.0.0.0/0')
master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0')
master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.examples.mllib

import scala.collection.mutable

import com.esotericsoftware.kryo.Kryo
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
Expand All @@ -41,6 +43,7 @@ object MovieLensALS {
class ALSRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[Rating])
kryo.register(classOf[mutable.BitSet])
}
}

Expand Down
24 changes: 14 additions & 10 deletions python/pyspark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,12 @@ def jsonFile(self, path):
>>> ofn.close()
>>> srdd = sqlCtx.jsonFile(jsonFile)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
>>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1")
>>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}},
... {"f1": 2, "f2": "row2", "f3":{"field4":22}},
... {"f1": 3, "f2": "row3", "f3":{"field4":33}}]
>>> srdd2 = sqlCtx.sql(
... "SELECT field1 AS f1, field2 as f2, field3 as f3, field6 as f4 from table1")
>>> srdd2.collect() == [
... {"f1":1, "f2":"row1", "f3":{"field4":11, "field5": None}, "f4":None},
... {"f1":2, "f2":None, "f3":{"field4":22, "field5": [10, 11]}, "f4":[{"field7": "row2"}]},
... {"f1":None, "f2":"row3", "f3":{"field4":33, "field5": []}, "f4":None}]
True
"""
jschema_rdd = self._ssql_ctx.jsonFile(path)
Expand All @@ -167,10 +169,12 @@ def jsonRDD(self, rdd):
>>> srdd = sqlCtx.jsonRDD(json)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
>>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1")
>>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}},
... {"f1": 2, "f2": "row2", "f3":{"field4":22}},
... {"f1": 3, "f2": "row3", "f3":{"field4":33}}]
>>> srdd2 = sqlCtx.sql(
... "SELECT field1 AS f1, field2 as f2, field3 as f3, field6 as f4 from table1")
>>> srdd2.collect() == [
... {"f1":1, "f2":"row1", "f3":{"field4":11, "field5": None}, "f4":None},
... {"f1":2, "f2":None, "f3":{"field4":22, "field5": [10, 11]}, "f4":[{"field7": "row2"}]},
... {"f1":None, "f2":"row3", "f3":{"field4":33, "field5": []}, "f4":None}]
True
"""
def func(split, iterator):
Expand Down Expand Up @@ -492,8 +496,8 @@ def _test():
globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"},
{"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
jsonStrings = ['{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
'{"field1" : 2, "field2": "row2", "field3":{"field4":22}}',
'{"field1" : 3, "field2": "row3", "field3":{"field4":33}}']
'{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]}, "field6":[{"field7": "row2"}]}',
'{"field1" : null, "field2": "row3", "field3":{"field4":33, "field5": []}}']
globs['jsonStrings'] = jsonStrings
globs['json'] = sc.parallelize(jsonStrings)
globs['nestedRdd1'] = sc.parallelize([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected val UNCACHE = Keyword("UNCACHE")
protected val UNION = Keyword("UNION")
protected val WHERE = Keyword("WHERE")
protected val INTERSECT = Keyword("INTERSECT")
protected val EXCEPT = Keyword("EXCEPT")


Expand All @@ -140,6 +141,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected lazy val query: Parser[LogicalPlan] = (
select * (
UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) } |
EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} |
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
)
Expand Down
Loading

0 comments on commit a2bf756

Please sign in to comment.