Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into profiler
Browse files Browse the repository at this point in the history
Conflicts:
	python/pyspark/worker.py
  • Loading branch information
davies committed Sep 17, 2014
2 parents 09d02c3 + 7d1a372 commit 116d52a
Show file tree
Hide file tree
Showing 81 changed files with 1,726 additions and 307 deletions.
10 changes: 4 additions & 6 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ out/
third_party/libmesos.so
third_party/libmesos.dylib
conf/java-opts
conf/spark-env.sh
conf/streaming-env.sh
conf/log4j.properties
conf/spark-defaults.conf
conf/hive-site.xml
conf/*.sh
conf/*.properties
conf/*.conf
conf/*.xml
docs/_site
docs/api
target/
Expand Down Expand Up @@ -50,7 +49,6 @@ unit-tests.log
/lib/
rat-results.txt
scalastyle.txt
conf/*.conf
scalastyle-output.xml

# For Hive
Expand Down
12 changes: 12 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
## Contributing to Spark

Contributions via GitHub pull requests are gladly accepted from their original
author. Along with any pull requests, please state that the contribution is
your original work and that you license the work to the project under the
project's open source license. Whether or not you state this explicitly, by
submitting any copyrighted material via pull request, email, or other means
you agree to license the material under the project's open source license and
warrant that you have the legal authority to do so.

Please see [Contributing to Spark wiki page](https://cwiki.apache.org/SPARK/Contributing+to+Spark)
for more information.
78 changes: 16 additions & 62 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ and Spark Streaming for stream processing.
## Online Documentation

You can find the latest Spark documentation, including a programming
guide, on the project webpage at <http://spark.apache.org/documentation.html>.
guide, on the [project web page](http://spark.apache.org/documentation.html).
This README file only contains basic setup instructions.

## Building Spark

Spark is built on Scala 2.10. To build Spark and its example programs, run:
Spark is built using [Apache Maven](http://maven.apache.org/).
To build Spark and its example programs, run:

./sbt/sbt assembly
mvn -DskipTests clean package

(You do not need to do this if you downloaded a pre-built package.)
More detailed documentation is available from the project site, at
["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).

## Interactive Scala Shell

Expand Down Expand Up @@ -71,73 +74,24 @@ can be run using:

./dev/run-tests

Please see the guidance on how to
[run all automated tests](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-AutomatedTesting).

## A Note About Hadoop Versions

Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported
storage systems. Because the protocols have changed in different versions of
Hadoop, you must build Spark against the same version that your cluster runs.
You can change the version by setting `-Dhadoop.version` when building Spark.

For Apache Hadoop versions 1.x, Cloudera CDH MRv1, and other Hadoop
versions without YARN, use:

# Apache Hadoop 1.2.1
$ sbt/sbt -Dhadoop.version=1.2.1 assembly

# Cloudera CDH 4.2.0 with MapReduce v1
$ sbt/sbt -Dhadoop.version=2.0.0-mr1-cdh4.2.0 assembly

For Apache Hadoop 2.2.X, 2.1.X, 2.0.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
with YARN, also set `-Pyarn`:

# Apache Hadoop 2.0.5-alpha
$ sbt/sbt -Dhadoop.version=2.0.5-alpha -Pyarn assembly

# Cloudera CDH 4.2.0 with MapReduce v2
$ sbt/sbt -Dhadoop.version=2.0.0-cdh4.2.0 -Pyarn assembly

# Apache Hadoop 2.2.X and newer
$ sbt/sbt -Dhadoop.version=2.2.0 -Pyarn assembly

When developing a Spark application, specify the Hadoop version by adding the
"hadoop-client" artifact to your project's dependencies. For example, if you're
using Hadoop 1.2.1 and build your application using SBT, add this entry to
`libraryDependencies`:

"org.apache.hadoop" % "hadoop-client" % "1.2.1"

If your project is built with Maven, add this to your POM file's `<dependencies>` section:

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>1.2.1</version>
</dependency>


## A Note About Thrift JDBC server and CLI for Spark SQL

Spark SQL supports Thrift JDBC server and CLI.
See sql-programming-guide.md for more information about using the JDBC server and CLI.
You can use those features by setting `-Phive` when building Spark as follows.

$ sbt/sbt -Phive assembly
Please refer to the build documentation at
["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version)
for detailed guidance on building for a particular distribution of Hadoop, including
building for particular Hive and Hive Thriftserver distributions. See also
["Third Party Hadoop Distributions"](http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html)
for guidance on building a Spark application that works with a particular
distribution.

## Configuration

Please refer to the [Configuration guide](http://spark.apache.org/docs/latest/configuration.html)
in the online documentation for an overview on how to configure Spark.


## Contributing to Spark

Contributions via GitHub pull requests are gladly accepted from their original
author. Along with any pull requests, please state that the contribution is
your original work and that you license the work to the project under the
project's open source license. Whether or not you state this explicitly, by
submitting any copyrighted material via pull request, email, or other means
you agree to license the material under the project's open source license and
warrant that you have the legal authority to do so.

Please see [Contributing to Spark wiki page](https://cwiki.apache.org/SPARK/Contributing+to+Spark)
for more information.
14 changes: 14 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<!-- Use the shade plugin to create a big JAR with all the dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
2 changes: 1 addition & 1 deletion bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ else
exit 1
fi
fi
JAVA_VERSION=$("$RUNNER" -version 2>&1 | sed 's/java version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
JAVA_VERSION=$("$RUNNER" -version 2>&1 | sed 's/.* version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')

# Set JAVA_OPTS to be able to load native libraries and to set heap size
if [ "$JAVA_VERSION" -ge 18 ]; then
Expand Down
3 changes: 0 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1072,11 +1072,8 @@ class SparkContext(config: SparkConf) extends Logging {
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
val start = System.nanoTime
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
logInfo(
"Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ private[spark] class PythonRDD(
val total = finishTime - startTime
logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot,
init, finish))
val memoryBytesSpilled = stream.readLong()
val diskBytesSpilled = stream.readLong()
context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += diskBytesSpilled
read()
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
// Signals that an exception has been thrown in python
Expand Down Expand Up @@ -772,7 +776,7 @@ private[spark] object PythonRDD extends Logging {
}

/**
* Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by
* Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by
* PySpark.
*/
def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
Expand Down
51 changes: 51 additions & 0 deletions core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.api.python

import java.nio.ByteOrder

import scala.collection.JavaConversions._
import scala.util.Failure
import scala.util.Try
Expand All @@ -28,6 +30,55 @@ import org.apache.spark.rdd.RDD

/** Utilities for serialization / deserialization between Python and Java, using Pickle. */
private[python] object SerDeUtil extends Logging {
// Unpickle array.array generated by Python 2.6
class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor {
// /* Description of types */
// static struct arraydescr descriptors[] = {
// {'c', sizeof(char), c_getitem, c_setitem},
// {'b', sizeof(char), b_getitem, b_setitem},
// {'B', sizeof(char), BB_getitem, BB_setitem},
// #ifdef Py_USING_UNICODE
// {'u', sizeof(Py_UNICODE), u_getitem, u_setitem},
// #endif
// {'h', sizeof(short), h_getitem, h_setitem},
// {'H', sizeof(short), HH_getitem, HH_setitem},
// {'i', sizeof(int), i_getitem, i_setitem},
// {'I', sizeof(int), II_getitem, II_setitem},
// {'l', sizeof(long), l_getitem, l_setitem},
// {'L', sizeof(long), LL_getitem, LL_setitem},
// {'f', sizeof(float), f_getitem, f_setitem},
// {'d', sizeof(double), d_getitem, d_setitem},
// {'\0', 0, 0, 0} /* Sentinel */
// };
// TODO: support Py_UNICODE with 2 bytes
// FIXME: unpickle array of float is wrong in Pyrolite, so we reverse the
// machine code for float/double here to workaround it.
// we should fix this after Pyrolite fix them
val machineCodes: Map[Char, Int] = if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9,
'L' -> 11, 'l' -> 13, 'f' -> 14, 'd' -> 16, 'u' -> 21
)
} else {
Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8,
'L' -> 10, 'l' -> 12, 'f' -> 15, 'd' -> 17, 'u' -> 20
)
}
override def construct(args: Array[Object]): Object = {
if (args.length == 1) {
construct(args ++ Array(""))
} else if (args.length == 2 && args(1).isInstanceOf[String]) {
val typecode = args(0).asInstanceOf[String].charAt(0)
val data: String = args(1).asInstanceOf[String]
construct(typecode, machineCodes(typecode), data.getBytes("ISO-8859-1"))
} else {
super.construct(args)
}
}
}

def initialize() = {
Unpickler.registerConstructor("array", "array", new ArrayConstructor())
}

private def checkPickle(t: (Any, Any)): (Boolean, Boolean) = {
val pickle = new Pickler
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.network

import java.io.{FileInputStream, RandomAccessFile, File, InputStream}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.channels.FileChannel.MapMode

import com.google.common.io.ByteStreams
Expand Down Expand Up @@ -66,8 +67,15 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
override def size: Long = length

override def nioByteBuffer(): ByteBuffer = {
val channel = new RandomAccessFile(file, "r").getChannel
channel.map(MapMode.READ_ONLY, offset, length)
var channel: FileChannel = null
try {
channel = new RandomAccessFile(file, "r").getChannel
channel.map(MapMode.READ_ONLY, offset, length)
} finally {
if (channel != null) {
channel.close()
}
}
}

override def inputStream(): InputStream = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,11 +507,16 @@ class DAGScheduler(
resultHandler: (Int, U) => Unit,
properties: Properties = null)
{
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
waiter.awaitResult() match {
case JobSucceeded => {}
case JobSucceeded => {
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
}
case JobFailed(exception: Exception) =>
logInfo("Failed to run " + callSite.shortForm)
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
throw exception
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashSet
import scala.collection.mutable.Queue

import org.apache.spark.{TaskContext, Logging, SparkException}
import org.apache.spark.{TaskContext, Logging}
import org.apache.spark.network.{ManagedBuffer, BlockFetchingListener, BlockTransferService}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.Utils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ private[spark] object JsonProtocol {
}

def taskInfoToJson(taskInfo: TaskInfo): JValue = {
val accumUpdateMap = taskInfo.accumulables
("Task ID" -> taskInfo.taskId) ~
("Index" -> taskInfo.index) ~
("Attempt" -> taskInfo.attempt) ~
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,12 @@ private[spark] object Utils extends Logging {
if (address.isLoopbackAddress) {
// Address resolves to something like 127.0.1.1, which happens on Debian; try to find
// a better address using the local network interfaces
for (ni <- NetworkInterface.getNetworkInterfaces) {
// getNetworkInterfaces returns ifs in reverse order compared to ifconfig output order
// on unix-like system. On windows, it returns in index order.
// It's more proper to pick ip address following system output order.
val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.toList
val reOrderedNetworkIFs = if (isWindows) activeNetworkIFs else activeNetworkIFs.reverse
for (ni <- reOrderedNetworkIFs) {
for (addr <- ni.getInetAddresses if !addr.isLinkLocalAddress &&
!addr.isLoopbackAddress && addr.isInstanceOf[Inet4Address]) {
// We've found an address that looks reasonable!
Expand Down
14 changes: 5 additions & 9 deletions core/src/test/scala/org/apache/spark/ui/UISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import javax.servlet.http.HttpServletRequest
import scala.io.Source
import scala.util.{Failure, Success, Try}

import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.ServletContextHandler
import org.scalatest.FunSuite
import org.scalatest.concurrent.Eventually._
Expand Down Expand Up @@ -108,14 +107,8 @@ class UISuite extends FunSuite {
}

test("jetty selects different port under contention") {
val startPort = 4040
val server = new Server(startPort)

Try { server.start() } match {
case Success(s) =>
case Failure(e) =>
// Either case server port is busy hence setup for test complete
}
val server = new ServerSocket(0)
val startPort = server.getLocalPort
val serverInfo1 = JettyUtils.startJettyServer(
"0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf)
val serverInfo2 = JettyUtils.startJettyServer(
Expand All @@ -126,6 +119,9 @@ class UISuite extends FunSuite {
assert(boundPort1 != startPort)
assert(boundPort2 != startPort)
assert(boundPort1 != boundPort2)
serverInfo1.server.stop()
serverInfo2.server.stop()
server.close()
}

test("jetty binds to port 0 correctly") {
Expand Down
4 changes: 2 additions & 2 deletions dev/merge_spark_pr.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@
# Remote name which points to Apache git
PUSH_REMOTE_NAME = os.environ.get("PUSH_REMOTE_NAME", "apache")
# ASF JIRA username
JIRA_USERNAME = os.environ.get("JIRA_USERNAME", "")
JIRA_USERNAME = os.environ.get("JIRA_USERNAME", "pwendell")
# ASF JIRA password
JIRA_PASSWORD = os.environ.get("JIRA_PASSWORD", "")
JIRA_PASSWORD = os.environ.get("JIRA_PASSWORD", "35500")

GITHUB_BASE = "https://github.com/apache/spark/pull"
GITHUB_API_BASE = "https://api.github.com/repos/apache/spark"
Expand Down
Loading

0 comments on commit 116d52a

Please sign in to comment.