Skip to content

Commit

Permalink
Merge branch 'meta-cache' into partition
Browse files Browse the repository at this point in the history
  • Loading branch information
ericl committed Oct 21, 2016
2 parents 2ee5665 + 2a96537 commit f40e72f
Show file tree
Hide file tree
Showing 91 changed files with 1,378 additions and 832 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.10.3 - http://py4j.sourceforge.net/)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.10.4 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
importFrom("methods", "setGeneric", "setMethod", "setOldClass")
importFrom("methods", "is", "new", "signature", "show")
importFrom("stats", "gaussian", "setNames")
importFrom("utils", "download.file", "packageVersion", "untar")
importFrom("utils", "download.file", "object.size", "packageVersion", "untar")

# Disable native libraries till we figure out how to package it
# See SPARKR-7839
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ test_that("createDataFrame uses files for large objects", {
# To simulate a large file scenario, we set spark.r.maxAllocationLimit to a smaller value
conf <- callJMethod(sparkSession, "conf")
callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100")
df <- createDataFrame(iris)
df <- suppressWarnings(createDataFrame(iris))

# Resetting the conf back to default value
callJMethod(conf, "set", "spark.r.maxAllocationLimit", toString(.Machine$integer.max / 10))
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.3-src.zip:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)

set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.3-src.zip;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.4-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
Expand Down
1 change: 1 addition & 0 deletions conf/spark-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,4 @@
# - SPARK_PID_DIR Where the pid file is stored. (Default: /tmp)
# - SPARK_IDENT_STRING A string representing this instance of spark. (Default: $USER)
# - SPARK_NICENESS The scheduling priority for daemons. (Default: 0)
# - SPARK_NO_DAEMONIZE Run the proposed command in the foreground. It will not output a PID file.
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.10.3</version>
<version>0.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ function drawApplicationTimeline(groupArray, eventObjArray, startTime, offset) {
return a.value - b.value
},
editable: false,
align: 'left',
showCurrentTime: false,
min: startTime,
zoomable: false,
Expand Down Expand Up @@ -99,6 +100,7 @@ function drawJobTimeline(groupArray, eventObjArray, startTime, offset) {
return a.value - b.value;
},
editable: false,
align: 'left',
showCurrentTime: false,
min: startTime,
zoomable: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] object PythonUtils {
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.10.3-src.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.10.4-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
pythonPath.mkString(File.pathSeparator)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1278,7 +1278,7 @@ abstract class RDD[T: ClassTag](
def zipWithUniqueId(): RDD[(T, Long)] = withScope {
val n = this.partitions.length.toLong
this.mapPartitionsWithIndex { case (k, iter) =>
iter.zipWithIndex.map { case (item, i) =>
Utils.getIteratorZipWithIndex(iter, 0L).map { case (item, i) =>
(item, i * n + k)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev)

override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = {
val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
firstParent[T].iterator(split.prev, context).zipWithIndex.map { x =>
(x._1, split.startIndex + x._2)
}
val parentIter = firstParent[T].iterator(split.prev, context)
Utils.getIteratorZipWithIndex(parentIter, split.startIndex)
}
}
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1759,6 +1759,21 @@ private[spark] object Utils extends Logging {
count
}

/**
* Generate a zipWithIndex iterator, avoid index value overflowing problem
* in scala's zipWithIndex
*/
def getIteratorZipWithIndex[T](iterator: Iterator[T], startIndex: Long): Iterator[(T, Long)] = {
new Iterator[(T, Long)] {
var index: Long = startIndex - 1L
def hasNext: Boolean = iterator.hasNext
def next(): (T, Long) = {
index += 1L
(iterator.next(), index)
}
}
}

/**
* Creates a symlink.
*
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS
}

runTest("CheckpointRDD with zero partitions") { reliableCheckpoint: Boolean =>
val rdd = new BlockRDD[Int](sc, Array[BlockId]())
val rdd = new BlockRDD[Int](sc, Array.empty[BlockId])
assert(rdd.partitions.size === 0)
assert(rdd.isCheckpointed === false)
assert(rdd.isCheckpointedAndMaterialized === false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {
test("writeMasterState") {
val workers = Array(createWorkerInfo(), createWorkerInfo())
val activeApps = Array(createAppInfo())
val completedApps = Array[ApplicationInfo]()
val completedApps = Array.empty[ApplicationInfo]
val activeDrivers = Array(createDriverInfo())
val completedDrivers = Array(createDriverInfo())
val stateResponse = new MasterStateResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class SparkSubmitSuite

// scalastyle:off println
test("prints usage on empty input") {
testPrematureExit(Array[String](), "Usage: spark-submit")
testPrematureExit(Array.empty[String], "Usage: spark-submit")
}

test("prints usage with only --help") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class HistoryServerArgumentsSuite extends SparkFunSuite {
.set("spark.testing", "true")

test("No Arguments Parsing") {
val argStrings = Array[String]()
val argStrings = Array.empty[String]
val hsa = new HistoryServerArguments(conf, argStrings)
assert(conf.get("spark.history.fs.logDirectory") === logDir.getAbsolutePath)
assert(conf.get("spark.history.fs.updateInterval") === "1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
}

test("toArray()") {
val empty = ByteBuffer.wrap(Array[Byte]())
val empty = ByteBuffer.wrap(Array.empty[Byte])
val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte))
val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes, bytes, empty))
assert(chunkedByteBuffer.toArray === bytes.array() ++ bytes.array())
Expand All @@ -74,7 +74,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
}

test("toInputStream()") {
val empty = ByteBuffer.wrap(Array[Byte]())
val empty = ByteBuffer.wrap(Array.empty[Byte])
val bytes1 = ByteBuffer.wrap(Array.tabulate(256)(_.toByte))
val bytes2 = ByteBuffer.wrap(Array.tabulate(128)(_.toByte))
val chunkedByteBuffer = new ChunkedByteBuffer(Array(empty, bytes1, bytes2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
check(Array("aaa", "bbb", null))
check(Array(true, false, true))
check(Array('a', 'b', 'c'))
check(Array[Int]())
check(Array.empty[Int])
check(Array(Array("1", "2"), Array("1", "2", "3", "4")))
}

Expand Down
7 changes: 7 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,13 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(Utils.getIteratorSize(iterator) === 5L)
}

test("getIteratorZipWithIndex") {
val iterator = Utils.getIteratorZipWithIndex(Iterator(0, 1, 2), -1L + Int.MaxValue)
assert(iterator.toArray === Array(
(0, -1L + Int.MaxValue), (1, 0L + Int.MaxValue), (2, 1L + Int.MaxValue)
))
}

test("doesDirectoryContainFilesNewerThan") {
// create some temporary directories and files
val parent: File = Utils.createTempDir()
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.2
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ parquet-jackson-1.8.1.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
py4j-0.10.3.jar
py4j-0.10.4.jar
pyrolite-4.13.jar
scala-compiler-2.11.8.jar
scala-library-2.11.8.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ parquet-jackson-1.8.1.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
py4j-0.10.3.jar
py4j-0.10.4.jar
pyrolite-4.13.jar
scala-compiler-2.11.8.jar
scala-library-2.11.8.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.4
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ parquet-jackson-1.8.1.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
py4j-0.10.3.jar
py4j-0.10.4.jar
pyrolite-4.13.jar
scala-compiler-2.11.8.jar
scala-library-2.11.8.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ parquet-jackson-1.8.1.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
py4j-0.10.3.jar
py4j-0.10.4.jar
pyrolite-4.13.jar
scala-compiler-2.11.8.jar
scala-library-2.11.8.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ parquet-jackson-1.8.1.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
py4j-0.10.3.jar
py4j-0.10.4.jar
pyrolite-4.13.jar
scala-compiler-2.11.8.jar
scala-library-2.11.8.jar
Expand Down
6 changes: 2 additions & 4 deletions docs/building-spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,8 @@ For help in setting up IntelliJ IDEA or Eclipse for Spark development, and troub
Tests are run by default via the [ScalaTest Maven plugin](http://www.scalatest.org/user_guide/using_the_scalatest_maven_plugin).
Note that tests should not be run as root or an admin user.

Some of the tests require Spark to be packaged first, so always run `mvn package` with `-DskipTests` the first time. The following is an example of a correct (build, test) sequence:
The following is an example of a command to run the tests:

./build/mvn -Pyarn -Phadoop-2.3 -DskipTests -Phive -Phive-thriftserver clean package
./build/mvn -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver test

The ScalaTest plugin also supports running only a specific Scala test suite as follows:
Expand All @@ -233,9 +232,8 @@ or a Java test:

## Testing with SBT

Some of the tests require Spark to be packaged first, so always run `build/sbt package` the first time. The following is an example of a correct (build, test) sequence:
The following is an example of a command to run the tests:

./build/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver package
./build/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver test

To run only a specific test suite as follows:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ private[kafka010] class KafkaSourceRDD(
buf.toArray
}

override def getPreferredLocations(split: Partition): Seq[String] = {
val part = split.asInstanceOf[KafkaSourceRDDPartition]
part.offsetRange.preferredLoc.map(Seq(_)).getOrElse(Seq.empty)
}

override def compute(
thePart: Partition,
context: TaskContext): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class MatricesSuite extends SparkMLFunSuite {
val spHorz2 = Matrices.horzcat(Array(spMat1, deMat2))
val spHorz3 = Matrices.horzcat(Array(deMat1, spMat2))
val deHorz1 = Matrices.horzcat(Array(deMat1, deMat2))
val deHorz2 = Matrices.horzcat(Array[Matrix]())
val deHorz2 = Matrices.horzcat(Array.empty[Matrix])

assert(deHorz1.numRows === 3)
assert(spHorz2.numRows === 3)
Expand Down Expand Up @@ -341,7 +341,7 @@ class MatricesSuite extends SparkMLFunSuite {
val deVert1 = Matrices.vertcat(Array(deMat1, deMat3))
val spVert2 = Matrices.vertcat(Array(spMat1, deMat3))
val spVert3 = Matrices.vertcat(Array(deMat1, spMat3))
val deVert2 = Matrices.vertcat(Array[Matrix]())
val deVert2 = Matrices.vertcat(Array.empty[Matrix])

assert(deVert1.numRows === 5)
assert(spVert2.numRows === 5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ class TestingUtilsSuite extends SparkMLFunSuite {
assert(!(Vectors.dense(Array(3.1, 3.5)) !~= Vectors.dense(Array(3.130, 3.534)) relTol 0.01))
assert(!(Vectors.dense(Array(3.1, 3.5)) ~= Vectors.dense(Array(3.135, 3.534)) relTol 0.01))
assert(Vectors.dense(Array(3.1)) !~= Vectors.dense(Array(3.130, 3.534)) relTol 0.01)
assert(Vectors.dense(Array[Double]()) !~= Vectors.dense(Array(3.130, 3.534)) relTol 0.01)
assert(Vectors.dense(Array.empty[Double]) !~= Vectors.dense(Array(3.130, 3.534)) relTol 0.01)
assert(Vectors.dense(Array(3.1)) !~== Vectors.dense(Array(3.130, 3.534)) relTol 0.01)
assert(Vectors.dense(Array[Double]()) !~== Vectors.dense(Array(3.130, 3.534)) relTol 0.01)
assert(Vectors.dense(Array.empty[Double]) !~== Vectors.dense(Array(3.130, 3.534)) relTol 0.01)

// Should throw exception with message when test fails.
intercept[TestFailedException](
Expand All @@ -125,7 +125,7 @@ class TestingUtilsSuite extends SparkMLFunSuite {
Vectors.dense(Array(3.1)) ~== Vectors.dense(Array(3.535, 3.534)) relTol 0.01)

intercept[TestFailedException](
Vectors.dense(Array[Double]()) ~== Vectors.dense(Array(3.135)) relTol 0.01)
Vectors.dense(Array.empty[Double]) ~== Vectors.dense(Array(3.135)) relTol 0.01)

// Comparing against zero should fail the test and throw exception with message
// saying that the relative error is meaningless in this situation.
Expand All @@ -145,7 +145,7 @@ class TestingUtilsSuite extends SparkMLFunSuite {
assert(Vectors.dense(Array(3.1)) !~==
Vectors.sparse(2, Array(0, 1), Array(3.130, 3.534)) relTol 0.01)

assert(Vectors.dense(Array[Double]()) !~==
assert(Vectors.dense(Array.empty[Double]) !~==
Vectors.sparse(2, Array(0, 1), Array(3.130, 3.534)) relTol 0.01)
}

Expand Down Expand Up @@ -176,14 +176,14 @@ class TestingUtilsSuite extends SparkMLFunSuite {
assert(!(Vectors.dense(Array(3.1)) ~=
Vectors.dense(Array(3.1 + 1E-6, 3.5 + 2E-7)) absTol 1E-5))

assert(Vectors.dense(Array[Double]()) !~=
assert(Vectors.dense(Array.empty[Double]) !~=
Vectors.dense(Array(3.1 + 1E-6, 3.5 + 2E-7)) absTol 1E-5)

assert(!(Vectors.dense(Array[Double]()) ~=
assert(!(Vectors.dense(Array.empty[Double]) ~=
Vectors.dense(Array(3.1 + 1E-6, 3.5 + 2E-7)) absTol 1E-5))

assert(Vectors.dense(Array[Double]()) ~=
Vectors.dense(Array[Double]()) absTol 1E-5)
assert(Vectors.dense(Array.empty[Double]) ~=
Vectors.dense(Array.empty[Double]) absTol 1E-5)

// Should throw exception with message when test fails.
intercept[TestFailedException](Vectors.dense(Array(3.1, 3.5, 0.0)) !~==
Expand All @@ -195,7 +195,7 @@ class TestingUtilsSuite extends SparkMLFunSuite {
intercept[TestFailedException](Vectors.dense(Array(3.1)) ~==
Vectors.dense(Array(3.1 + 1E-5, 3.5 + 2E-7)) absTol 1E-6)

intercept[TestFailedException](Vectors.dense(Array[Double]()) ~==
intercept[TestFailedException](Vectors.dense(Array.empty[Double]) ~==
Vectors.dense(Array(3.1 + 1E-5, 3.5 + 2E-7)) absTol 1E-6)

// Comparisons of two sparse vectors
Expand All @@ -214,7 +214,7 @@ class TestingUtilsSuite extends SparkMLFunSuite {
assert(Vectors.sparse(3, Array(0, 2), Array(3.1 + 1E-6, 2.4)) !~==
Vectors.sparse(1, Array(0), Array(3.1)) absTol 1E-3)

assert(Vectors.sparse(0, Array[Int](), Array[Double]()) !~==
assert(Vectors.sparse(0, Array.empty[Int], Array.empty[Double]) !~==
Vectors.sparse(1, Array(0), Array(3.1)) absTol 1E-3)

// Comparisons of a dense vector and a sparse vector
Expand All @@ -230,14 +230,14 @@ class TestingUtilsSuite extends SparkMLFunSuite {
assert(Vectors.sparse(3, Array(0, 2), Array(3.1, 2.4)) !~==
Vectors.dense(Array(3.1)) absTol 1E-6)

assert(Vectors.dense(Array[Double]()) !~==
assert(Vectors.dense(Array.empty[Double]) !~==
Vectors.sparse(3, Array(0, 2), Array(0, 2.4)) absTol 1E-6)

assert(Vectors.sparse(1, Array(0), Array(3.1)) !~==
Vectors.dense(Array(3.1, 3.2)) absTol 1E-6)

assert(Vectors.dense(Array(3.1)) !~==
Vectors.sparse(0, Array[Int](), Array[Double]()) absTol 1E-6)
Vectors.sparse(0, Array.empty[Int], Array.empty[Double]) absTol 1E-6)
}

test("Comparing Matrices using absolute error.") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ class LogisticRegression @Since("1.2.0") (
rawCoefficients(coefIndex)
}
} else {
Array[Double]()
Array.empty[Double]
}
val interceptVector = if (interceptsArray.nonEmpty && isMultinomial) {
// The intercepts are never regularized, so we always center the mean.
Expand Down

0 comments on commit f40e72f

Please sign in to comment.