Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dt-python-consist…
Browse files Browse the repository at this point in the history
…ency
  • Loading branch information
jkbradley committed Aug 6, 2014
2 parents fe6dbfa + 63bdb1f commit 00f820e
Show file tree
Hide file tree
Showing 32 changed files with 642 additions and 360 deletions.
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
<version>3.2.6</version>
<version>3.2.10</version>
</dependency>
<dependency>
<groupId>colt</groupId>
Expand Down
13 changes: 10 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,23 @@ class JdbcRDD[T: ClassTag](

override def close() {
try {
if (null != rs && ! rs.isClosed()) rs.close()
if (null != rs && ! rs.isClosed()) {
rs.close()
}
} catch {
case e: Exception => logWarning("Exception closing resultset", e)
}
try {
if (null != stmt && ! stmt.isClosed()) stmt.close()
if (null != stmt && ! stmt.isClosed()) {
stmt.close()
}
} catch {
case e: Exception => logWarning("Exception closing statement", e)
}
try {
if (null != conn && ! stmt.isClosed()) conn.close()
if (null != conn && ! conn.isClosed()) {
conn.close()
}
logInfo("closed connection")
} catch {
case e: Exception => logWarning("Exception closing connection", e)
Expand All @@ -120,3 +126,4 @@ object JdbcRDD {
Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1))
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.annotation.DeveloperApi
@DeveloperApi
object TaskLocality extends Enumeration {
// Process local is expected to be used ONLY within TaskSetManager for now.
val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value

type TaskLocality = Value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ private[spark] class TaskSchedulerImpl(

// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
private val executorsByHost = new HashMap[String, HashSet[String]]
protected val executorsByHost = new HashMap[String, HashSet[String]]

protected val hostsByRack = new HashMap[String, HashSet[String]]

private val executorIdToHost = new HashMap[String, String]
protected val executorIdToHost = new HashMap[String, String]

// Listener object to pass upcalls into
var dagScheduler: DAGScheduler = null
Expand Down Expand Up @@ -249,6 +249,7 @@ private[spark] class TaskSchedulerImpl(

// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
Expand All @@ -265,7 +266,7 @@ private[spark] class TaskSchedulerImpl(
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert (availableCpus(i) >= 0)
assert(availableCpus(i) >= 0)
launchedTask = true
}
}
Expand Down
109 changes: 60 additions & 49 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private[spark] class TaskSetManager(
private val numFailures = new Array[Int](numTasks)
// key is taskId, value is a Map of executor id to when it failed
private val failedExecutors = new HashMap[Int, HashMap[String, Long]]()

val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
var tasksSuccessful = 0

Expand Down Expand Up @@ -179,26 +180,17 @@ private[spark] class TaskSetManager(
}
}

var hadAliveLocations = false
for (loc <- tasks(index).preferredLocations) {
for (execId <- loc.executorId) {
addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
}
if (sched.hasExecutorsAliveOnHost(loc.host)) {
hadAliveLocations = true
}
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
for (rack <- sched.getRackForHost(loc.host)) {
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
if(sched.hasHostAliveOnRack(rack)){
hadAliveLocations = true
}
}
}

if (!hadAliveLocations) {
// Even though the task might've had preferred locations, all of those hosts or executors
// are dead; put it in the no-prefs list so we can schedule it elsewhere right away.
if (tasks(index).preferredLocations == Nil) {
addTo(pendingTasksWithNoPrefs)
}

Expand Down Expand Up @@ -239,7 +231,6 @@ private[spark] class TaskSetManager(
*/
private def findTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = {
var indexOffset = list.size

while (indexOffset > 0) {
indexOffset -= 1
val index = list(indexOffset)
Expand Down Expand Up @@ -288,12 +279,12 @@ private[spark] class TaskSetManager(
!hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index)

if (!speculatableTasks.isEmpty) {
// Check for process-local or preference-less tasks; note that tasks can be process-local
// Check for process-local tasks; note that tasks can be process-local
// on multiple nodes when we replicate cached blocks, as in Spark Streaming
for (index <- speculatableTasks if canRunOnHost(index)) {
val prefs = tasks(index).preferredLocations
val executors = prefs.flatMap(_.executorId)
if (prefs.size == 0 || executors.contains(execId)) {
if (executors.contains(execId)) {
speculatableTasks -= index
return Some((index, TaskLocality.PROCESS_LOCAL))
}
Expand All @@ -310,6 +301,17 @@ private[spark] class TaskSetManager(
}
}

// Check for no-preference tasks
if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
val locations = tasks(index).preferredLocations
if (locations.size == 0) {
speculatableTasks -= index
return Some((index, TaskLocality.PROCESS_LOCAL))
}
}
}

// Check for rack-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for (rack <- sched.getRackForHost(host)) {
Expand Down Expand Up @@ -341,20 +343,27 @@ private[spark] class TaskSetManager(
*
* @return An option containing (task index within the task set, locality, is speculative?)
*/
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
private def findTask(execId: String, host: String, maxLocality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value, Boolean)] =
{
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}

if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL, false))
}
}

if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
}

if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
Expand All @@ -363,25 +372,27 @@ private[spark] class TaskSetManager(
}
}

// Look for no-pref tasks after rack-local tasks since they can run anywhere.
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}

if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
for (index <- findTaskFromList(execId, allPendingTasks)) {
return Some((index, TaskLocality.ANY, false))
}
}

// Finally, if all else has failed, find a speculative task
findSpeculativeTask(execId, host, locality).map { case (taskIndex, allowedLocality) =>
(taskIndex, allowedLocality, true)
}
// find a speculative task if all others tasks have been scheduled
findSpeculativeTask(execId, host, maxLocality).map {
case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
}

/**
* Respond to an offer of a single executor from the scheduler by finding a task
*
* NOTE: this function is either called with a maxLocality which
* would be adjusted by delay scheduling algorithm or it will be with a special
* NO_PREF locality which will be not modified
*
* @param execId the executor Id of the offered resource
* @param host the host Id of the offered resource
* @param maxLocality the maximum locality we want to schedule the tasks at
*/
def resourceOffer(
execId: String,
Expand All @@ -392,9 +403,14 @@ private[spark] class TaskSetManager(
if (!isZombie) {
val curTime = clock.getTime()

var allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
allowedLocality = maxLocality // We're not allowed to search for farther-away tasks
var allowedLocality = maxLocality

if (maxLocality != TaskLocality.NO_PREF) {
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
allowedLocality = maxLocality
}
}

findTask(execId, host, allowedLocality) match {
Expand All @@ -410,8 +426,11 @@ private[spark] class TaskSetManager(
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
// NO_PREF will not affect the variables related to delay scheduling
if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
}
// Serialize and return the task
val startTime = clock.getTime()
// We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
Expand Down Expand Up @@ -639,8 +658,7 @@ private[spark] class TaskSetManager(
override def executorLost(execId: String, host: String) {
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)

// Re-enqueue pending tasks for this host based on the status of the cluster -- for example, a
// task that used to have locations on only this host might now go to the no-prefs list. Note
// Re-enqueue pending tasks for this host based on the status of the cluster. Note
// that it's okay if we add a task to the same queue twice (if it had multiple preferred
// locations), because findTaskFromList will skip already-running tasks.
for (index <- getPendingTasksForExecutor(execId)) {
Expand Down Expand Up @@ -671,6 +689,9 @@ private[spark] class TaskSetManager(
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure)
}
// recalculate valid locality levels and waits when executor is lost
myLocalityLevels = computeValidLocalityLevels()
localityWaits = myLocalityLevels.map(getLocalityWait)
}

/**
Expand Down Expand Up @@ -722,17 +743,17 @@ private[spark] class TaskSetManager(
conf.get("spark.locality.wait.node", defaultWait).toLong
case TaskLocality.RACK_LOCAL =>
conf.get("spark.locality.wait.rack", defaultWait).toLong
case TaskLocality.ANY =>
0L
case _ => 0L
}
}

/**
* Compute the locality levels used in this TaskSet. Assumes that all tasks have already been
* added to queues using addPendingTask.
*
*/
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
Expand All @@ -742,6 +763,9 @@ private[spark] class TaskSetManager(
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
if (!pendingTasksWithNoPrefs.isEmpty) {
levels += NO_PREF
}
if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
levels += RACK_LOCAL
Expand All @@ -751,20 +775,7 @@ private[spark] class TaskSetManager(
levels.toArray
}

// Re-compute pendingTasksWithNoPrefs since new preferred locations may become available
def executorAdded() {
def newLocAvail(index: Int): Boolean = {
for (loc <- tasks(index).preferredLocations) {
if (sched.hasExecutorsAliveOnHost(loc.host) ||
(sched.getRackForHost(loc.host).isDefined &&
sched.hasHostAliveOnRack(sched.getRackForHost(loc.host).get))) {
return true
}
}
false
}
logInfo("Re-computing pending task lists.")
pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.filter(!newLocAvail(_))
myLocalityLevels = computeValidLocalityLevels()
localityWaits = myLocalityLevels.map(getLocalityWait)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private[spark] class SortShuffleWriter[K, V, C](
private val ser = Serializer.getSerializer(dep.serializer.orNull)

private val conf = SparkEnv.get.conf
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024

private var sorter: ExternalSorter[K, V, _] = null
private var outputFile: File = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
val sortBasedShuffle =
conf.get("spark.shuffle.manager", "") == classOf[SortShuffleManager].getName

private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024

/**
* Contains all the state related to a particular shuffle. This includes a pool of unused
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class ExternalAppendOnlyMap[K, V, C](
private var _memoryBytesSpilled = 0L
private var _diskBytesSpilled = 0L

private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
private val keyComparator = new HashComparator[K]
private val ser = serializer.newInstance()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[spark] class ExternalSorter[K, V, C](

private val conf = SparkEnv.get.conf
private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024

// Size of object batches when reading/writing from serializers.
//
Expand Down
Loading

0 comments on commit 00f820e

Please sign in to comment.