Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Oct 2, 2014
2 parents 54bd92b + 6e27cb6 commit 6bb9d91
Show file tree
Hide file tree
Showing 29 changed files with 317 additions and 49 deletions.
10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -349,5 +349,15 @@
</plugins>
</build>
</profile>
<profile>
<id>kinesis-asl</id>
<dependencies>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${commons.httpclient.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
19 changes: 17 additions & 2 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ trait FutureAction[T] extends Future[T] {
*/
@throws(classOf[Exception])
def get(): T = Await.result(this, Duration.Inf)

/**
* Returns the job IDs run by the underlying async operation.
*
* This returns the current snapshot of the job list. Certain operations may run multiple
* jobs, so multiple calls to this method may return different lists.
*/
def jobIds: Seq[Int]

}


Expand Down Expand Up @@ -150,8 +159,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
}
}

/** Get the corresponding job id for this action. */
def jobId = jobWaiter.jobId
def jobIds = Seq(jobWaiter.jobId)
}


Expand All @@ -171,6 +179,8 @@ class ComplexFutureAction[T] extends FutureAction[T] {
// is cancelled before the action was even run (and thus we have no thread to interrupt).
@volatile private var _cancelled: Boolean = false

@volatile private var jobs: Seq[Int] = Nil

// A promise used to signal the future.
private val p = promise[T]()

Expand Down Expand Up @@ -219,6 +229,8 @@ class ComplexFutureAction[T] extends FutureAction[T] {
}
}

this.jobs = jobs ++ job.jobIds

// Wait for the job to complete. If the action is cancelled (with an interrupt),
// cancel the job and stop the execution. This is not in a synchronized block because
// Await.ready eventually waits on the monitor in FutureJob.jobWaiter.
Expand Down Expand Up @@ -255,4 +267,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
override def isCompleted: Boolean = p.isCompleted

override def value: Option[Try[T]] = p.future.value

def jobIds = jobs

}
60 changes: 56 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.io.EOFException

import scala.collection.immutable.Map
import scala.reflect.ClassTag
import scala.collection.mutable.ListBuffer

import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.mapred.FileSplit
Expand All @@ -43,6 +44,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
import org.apache.spark.util.{NextIterator, Utils}
import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation}


/**
Expand Down Expand Up @@ -249,9 +251,21 @@ class HadoopRDD[K, V](
}

override def getPreferredLocations(split: Partition): Seq[String] = {
// TODO: Filtering out "localhost" in case of file:// URLs
val hadoopSplit = split.asInstanceOf[HadoopPartition]
hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost")
val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value
val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
case Some(c) =>
try {
val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
Some(HadoopRDD.convertSplitLocationInfo(infos))
} catch {
case e: Exception =>
logDebug("Failed to use InputSplitWithLocations.", e)
None
}
case None => None
}
locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
}

override def checkpoint() {
Expand All @@ -261,7 +275,7 @@ class HadoopRDD[K, V](
def getConf: Configuration = getJobConf()
}

private[spark] object HadoopRDD {
private[spark] object HadoopRDD extends Logging {
/** Constructing Configuration objects is not threadsafe, use this lock to serialize. */
val CONFIGURATION_INSTANTIATION_LOCK = new Object()

Expand Down Expand Up @@ -309,4 +323,42 @@ private[spark] object HadoopRDD {
f(inputSplit, firstParent[T].iterator(split, context))
}
}

private[spark] class SplitInfoReflections {
val inputSplitWithLocationInfo =
Class.forName("org.apache.hadoop.mapred.InputSplitWithLocationInfo")
val getLocationInfo = inputSplitWithLocationInfo.getMethod("getLocationInfo")
val newInputSplit = Class.forName("org.apache.hadoop.mapreduce.InputSplit")
val newGetLocationInfo = newInputSplit.getMethod("getLocationInfo")
val splitLocationInfo = Class.forName("org.apache.hadoop.mapred.SplitLocationInfo")
val isInMemory = splitLocationInfo.getMethod("isInMemory")
val getLocation = splitLocationInfo.getMethod("getLocation")
}

private[spark] val SPLIT_INFO_REFLECTIONS: Option[SplitInfoReflections] = try {
Some(new SplitInfoReflections)
} catch {
case e: Exception =>
logDebug("SplitLocationInfo and other new Hadoop classes are " +
"unavailable. Using the older Hadoop location info code.", e)
None
}

private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Seq[String] = {
val out = ListBuffer[String]()
infos.foreach { loc => {
val locationStr = HadoopRDD.SPLIT_INFO_REFLECTIONS.get.
getLocation.invoke(loc).asInstanceOf[String]
if (locationStr != "localhost") {
if (HadoopRDD.SPLIT_INFO_REFLECTIONS.get.isInMemory.
invoke(loc).asInstanceOf[Boolean]) {
logDebug("Partition " + locationStr + " is cached by Hadoop.")
out += new HDFSCacheTaskLocation(locationStr).toString
} else {
out += new HostTaskLocation(locationStr).toString
}
}
}}
out.seq
}
}
18 changes: 15 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,21 @@ class NewHadoopRDD[K, V](
new NewHadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
}

override def getPreferredLocations(split: Partition): Seq[String] = {
val theSplit = split.asInstanceOf[NewHadoopPartition]
theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost")
override def getPreferredLocations(hsplit: Partition): Seq[String] = {
val split = hsplit.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value
val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
case Some(c) =>
try {
val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
Some(HadoopRDD.convertSplitLocationInfo(infos))
} catch {
case e : Exception =>
logDebug("Failed to use InputSplit#getLocationInfo.", e)
None
}
case None => None
}
locs.getOrElse(split.getLocations.filter(_ != "localhost"))
}

def getConf: Configuration = confBroadcast.value.value
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ abstract class RDD[T: ClassTag](
}

/**
* Get the preferred locations of a partition (as hostnames), taking into account whether the
* Get the preferred locations of a partition, taking into account whether the
* RDD is checkpointed.
*/
final def preferredLocations(split: Partition): Seq[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ class DAGScheduler(
// If the RDD has some placement preferences (as is the case for input RDDs), get those
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (!rddPrefs.isEmpty) {
return rddPrefs.map(host => TaskLocation(host))
return rddPrefs.map(TaskLocation(_))
}
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
// that has any placement preferences. Ideally we would choose based on transfer sizes,
Expand Down
48 changes: 43 additions & 5 deletions core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,51 @@ package org.apache.spark.scheduler
* In the latter case, we will prefer to launch the task on that executorID, but our next level
* of preference will be executors on the same host if this is not possible.
*/
private[spark]
class TaskLocation private (val host: String, val executorId: Option[String]) extends Serializable {
override def toString: String = "TaskLocation(" + host + ", " + executorId + ")"
private[spark] sealed trait TaskLocation {
def host: String
}

/**
* A location that includes both a host and an executor id on that host.
*/
private [spark] case class ExecutorCacheTaskLocation(override val host: String,
val executorId: String) extends TaskLocation {
}

/**
* A location on a host.
*/
private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {
override def toString = host
}

/**
* A location on a host that is cached by HDFS.
*/
private [spark] case class HDFSCacheTaskLocation(override val host: String)
extends TaskLocation {
override def toString = TaskLocation.inMemoryLocationTag + host
}

private[spark] object TaskLocation {
def apply(host: String, executorId: String) = new TaskLocation(host, Some(executorId))
// We identify hosts on which the block is cached with this prefix. Because this prefix contains
// underscores, which are not legal characters in hostnames, there should be no potential for
// confusion. See RFC 952 and RFC 1123 for information about the format of hostnames.
val inMemoryLocationTag = "hdfs_cache_"

def apply(host: String, executorId: String) = new ExecutorCacheTaskLocation(host, executorId)

def apply(host: String) = new TaskLocation(host, None)
/**
* Create a TaskLocation from a string returned by getPreferredLocations.
* These strings have the form [hostname] or hdfs_cache_[hostname], depending on whether the
* location is cached.
*/
def apply(str: String) = {
val hstr = str.stripPrefix(inMemoryLocationTag)
if (hstr.equals(str)) {
new HostTaskLocation(str)
} else {
new HostTaskLocation(hstr)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,24 @@ private[spark] class TaskSetManager(
}

for (loc <- tasks(index).preferredLocations) {
for (execId <- loc.executorId) {
addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
loc match {
case e: ExecutorCacheTaskLocation =>
addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))
case e: HDFSCacheTaskLocation => {
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) => {
for (e <- set) {
addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
}
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
}
}
case _ => Unit
}
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
for (rack <- sched.getRackForHost(loc.host)) {
Expand Down Expand Up @@ -283,7 +299,10 @@ private[spark] class TaskSetManager(
// on multiple nodes when we replicate cached blocks, as in Spark Streaming
for (index <- speculatableTasks if canRunOnHost(index)) {
val prefs = tasks(index).preferredLocations
val executors = prefs.flatMap(_.executorId)
val executors = prefs.flatMap(_ match {
case e: ExecutorCacheTaskLocation => Some(e.executorId)
case _ => None
});
if (executors.contains(execId)) {
speculatableTasks -= index
return Some((index, TaskLocality.PROCESS_LOCAL))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1439,7 +1439,7 @@ private[spark] object Utils extends Logging {
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
for (offset <- 0 to maxRetries) {
// Do not increment port if startPort is 0, which is treated as a special port
val tryPort = if (startPort == 0) startPort else (startPort + offset) % (65536 - 1024) + 1024
val tryPort = if (startPort == 0) startPort else (startPort + offset) % 65536
try {
val (service, port) = startService(tryPort)
logInfo(s"Successfully started service$serviceString on port $port.")
Expand Down
49 changes: 49 additions & 0 deletions core/src/test/scala/org/apache/spark/FutureActionSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import scala.concurrent.Await
import scala.concurrent.duration.Duration

import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}

import org.apache.spark.SparkContext._

class FutureActionSuite extends FunSuite with BeforeAndAfter with Matchers with LocalSparkContext {

before {
sc = new SparkContext("local", "FutureActionSuite")
}

test("simple async action") {
val rdd = sc.parallelize(1 to 10, 2)
val job = rdd.countAsync()
val res = Await.result(job, Duration.Inf)
res should be (10)
job.jobIds.size should be (1)
}

test("complex async action") {
val rdd = sc.parallelize(1 to 15, 3)
val job = rdd.takeAsync(10)
val res = Await.result(job, Duration.Inf)
res should be (1 to 10)
job.jobIds.size should be (2)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,28 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.resourceOffer("execC", "host3", ANY) !== None)
}

test("Test that locations with HDFSCacheTaskLocation are treated as PROCESS_LOCAL.") {
// Regression test for SPARK-2931
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc,
("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
val taskSet = FakeTask.createTaskSet(3,
Seq(HostTaskLocation("host1")),
Seq(HostTaskLocation("host2")),
Seq(HDFSCacheTaskLocation("host3")))
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
sched.removeExecutor("execA")
manager.executorAdded()
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
sched.removeExecutor("execB")
manager.executorAdded()
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
sched.removeExecutor("execC")
manager.executorAdded()
assert(manager.myLocalityLevels.sameElements(Array(ANY)))
}

def createTaskResult(id: Int): DirectTaskResult[Int] = {
val valueSer = SparkEnv.get.serializer.newInstance()
Expand Down
5 changes: 5 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
<artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${commons.httpclient.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
Expand Down
Loading

0 comments on commit 6bb9d91

Please sign in to comment.