Skip to content

Commit

Permalink
Merge branch 'master' into zip
Browse files Browse the repository at this point in the history
Conflicts:
	python/pyspark/tests.py
  • Loading branch information
davies committed Aug 18, 2014
2 parents 813b1e4 + 4bf3de7 commit 6d05fc8
Show file tree
Hide file tree
Showing 220 changed files with 5,364 additions and 2,161 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ log4j-defaults.properties
bootstrap-tooltip.js
jquery-1.11.1.min.js
sorttable.js
.*avsc
.*txt
.*json
.*data
Expand Down
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ If your project is built with Maven, add this to your POM file's `<dependencies>
</dependency>


## A Note About Thrift JDBC server and CLI for Spark SQL

Spark SQL supports Thrift JDBC server and CLI.
See sql-programming-guide.md for more information about those features.
You can use those features by setting `-Phive-thriftserver` when building Spark as follows.

$ sbt/sbt -Phive-thriftserver assembly


## Configuration

Please refer to the [Configuration guide](http://spark.apache.org/docs/latest/configuration.html)
Expand Down
2 changes: 1 addition & 1 deletion bin/spark-shell.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ rem

set SPARK_HOME=%~dp0..

cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell --class org.apache.spark.repl.Main %*
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd --class org.apache.spark.repl.Main %* spark-shell
18 changes: 9 additions & 9 deletions bin/spark-sql
Original file line number Diff line number Diff line change
Expand Up @@ -65,30 +65,30 @@ while (($#)); do
case $1 in
-d | --define | --database | -f | -h | --hiveconf | --hivevar | -i | -p)
ensure_arg_number $# 2
CLI_ARGS+=($1); shift
CLI_ARGS+=($1); shift
CLI_ARGS+=("$1"); shift
CLI_ARGS+=("$1"); shift
;;

-e)
ensure_arg_number $# 2
CLI_ARGS+=($1); shift
CLI_ARGS+=(\"$1\"); shift
CLI_ARGS+=("$1"); shift
CLI_ARGS+=("$1"); shift
;;

-s | --silent)
CLI_ARGS+=($1); shift
CLI_ARGS+=("$1"); shift
;;

-v | --verbose)
# Both SparkSubmit and SparkSQLCLIDriver recognizes -v | --verbose
CLI_ARGS+=($1)
SUBMISSION_ARGS+=($1); shift
CLI_ARGS+=("$1")
SUBMISSION_ARGS+=("$1"); shift
;;

*)
SUBMISSION_ARGS+=($1); shift
SUBMISSION_ARGS+=("$1"); shift
;;
esac
done

eval exec "$FWDIR"/bin/spark-submit --class $CLASS ${SUBMISSION_ARGS[*]} spark-internal ${CLI_ARGS[*]}
exec "$FWDIR"/bin/spark-submit --class $CLASS "${SUBMISSION_ARGS[@]}" spark-internal "${CLI_ARGS[@]}"
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,15 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

/**
* Whether the cleaning thread will block on cleanup tasks.
* This is set to true only for tests.
*
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
* workaround for the issue, which is ultimately caused by the way the BlockManager actors
* issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
* for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
* longer in scope.
*/
private val blockOnCleanupTasks = sc.conf.getBoolean(
"spark.cleaner.referenceTracking.blocking", false)
"spark.cleaner.referenceTracking.blocking", true)

@volatile private var stopped = false

Expand Down Expand Up @@ -174,9 +179,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private def blockManagerMaster = sc.env.blockManager.master
private def broadcastManager = sc.env.broadcastManager
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]

// Used for testing. These methods explicitly blocks until cleanup is completed
// to ensure that more reliable testing.
}

private object ContextCleaner {
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import akka.actor.Actor
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.TaskScheduler
import org.apache.spark.util.ActorLogReceive

/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
Expand All @@ -36,8 +37,10 @@ private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor {
override def receive = {
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
extends Actor with ActorLogReceive with Logging {

override def receiveWithLogging = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
val response = HeartbeatResponse(
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator
// is allowed. The assumption is that Thread.interrupted does not have a memory fence in read
// (just a volatile field in C), while context.interrupted is a volatile in the JVM, which
// introduces an expensive read fence.
if (context.interrupted) {
if (context.isInterrupted) {
throw new TaskKilledException
} else {
delegate.hasNext
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage

/** Actor class for MapOutputTrackerMaster */
private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf)
extends Actor with Logging {
extends Actor with ActorLogReceive with Logging {
val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

def receive = {
override def receiveWithLogging = {
case GetMapOutputStatuses(shuffleId: Int) =>
val hostPort = sender.path.address.hostPort
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
Expand Down
22 changes: 11 additions & 11 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,22 @@ object SparkEnv extends Logging {
"MapOutputTracker",
new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val shuffleMemoryManager = new ShuffleMemoryManager(conf)

val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)

val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, securityManager, mapOutputTracker)
serializer, conf, securityManager, mapOutputTracker, shuffleManager)

val connectionManager = blockManager.connectionManager

Expand Down Expand Up @@ -250,16 +260,6 @@ object SparkEnv extends Logging {
"."
}

// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val shuffleMemoryManager = new ShuffleMemoryManager(conf)

// Warn about deprecated spark.cache.class property
if (conf.contains("spark.cache.class")) {
logWarning("The spark.cache.class property is no longer being used! Specify storage " +
Expand Down
63 changes: 56 additions & 7 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,18 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.TaskCompletionListener


/**
* :: DeveloperApi ::
* Contextual information about a task which can be read or mutated during execution.
*
* @param stageId stage id
* @param partitionId index of the partition
* @param attemptId the number of attempts to execute this task
* @param runningLocally whether the task is running locally in the driver JVM
* @param taskMetrics performance metrics of the task
*/
@DeveloperApi
class TaskContext(
Expand All @@ -39,27 +47,68 @@ class TaskContext(
def splitId = partitionId

// List of callback functions to execute when the task completes.
@transient private val onCompleteCallbacks = new ArrayBuffer[() => Unit]
@transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener]

// Whether the corresponding task has been killed.
@volatile var interrupted: Boolean = false
@volatile private var interrupted: Boolean = false

// Whether the task has completed.
@volatile private var completed: Boolean = false

/** Checks whether the task has completed. */
def isCompleted: Boolean = completed

// Whether the task has completed, before the onCompleteCallbacks are executed.
@volatile var completed: Boolean = false
/** Checks whether the task has been killed. */
def isInterrupted: Boolean = interrupted

// TODO: Also track whether the task has completed successfully or with exception.

/**
* Add a (Java friendly) listener to be executed on task completion.
* This will be called in all situation - success, failure, or cancellation.
*
* An example use is for HadoopRDD to register a callback to close the input stream.
*/
def addTaskCompletionListener(listener: TaskCompletionListener): this.type = {
onCompleteCallbacks += listener
this
}

/**
* Add a listener in the form of a Scala closure to be executed on task completion.
* This will be called in all situation - success, failure, or cancellation.
*
* An example use is for HadoopRDD to register a callback to close the input stream.
*/
def addTaskCompletionListener(f: TaskContext => Unit): this.type = {
onCompleteCallbacks += new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = f(context)
}
this
}

/**
* Add a callback function to be executed on task completion. An example use
* is for HadoopRDD to register a callback to close the input stream.
* Will be called in any situation - success, failure, or cancellation.
* @param f Callback function.
*/
@deprecated("use addTaskCompletionListener", "1.1.0")
def addOnCompleteCallback(f: () => Unit) {
onCompleteCallbacks += f
onCompleteCallbacks += new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = f()
}
}

def executeOnCompleteCallbacks() {
/** Marks the task as completed and triggers the listeners. */
private[spark] def markTaskCompleted(): Unit = {
completed = true
// Process complete callbacks in the reverse order of registration
onCompleteCallbacks.reverse.foreach { _() }
onCompleteCallbacks.reverse.foreach { _.onTaskCompletion(this) }
}

/** Marks the task for interruption, i.e. cancellation. */
private[spark] def markInterrupted(): Unit = {
interrupted = true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.api.python

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SerializableWritable, SparkException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
Expand All @@ -42,7 +43,7 @@ private[python] object Converter extends Logging {
defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
converterClass.map { cc =>
Try {
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
val c = Utils.classForName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
logInfo(s"Loaded converter: $cc")
c
} match {
Expand Down
Loading

0 comments on commit 6d05fc8

Please sign in to comment.