Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark
Browse files Browse the repository at this point in the history
  • Loading branch information
zhzhan committed Sep 25, 2014
2 parents 68deb11 + b848771 commit 7e0cc36
Show file tree
Hide file tree
Showing 77 changed files with 2,175 additions and 829 deletions.
4 changes: 3 additions & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@
<include>com.google.common.**</include>
</includes>
<excludes>
<exclude>com.google.common.base.Optional**</exclude>
<exclude>com/google/common/base/Absent*</exclude>
<exclude>com/google/common/base/Optional*</exclude>
<exclude>com/google/common/base/Present*</exclude>
</excludes>
</relocation>
</relocations>
Expand Down
2 changes: 2 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,9 @@
<filter>
<artifact>com.google.guava:guava</artifact>
<includes>
<include>com/google/common/base/Absent*</include>
<include>com/google/common/base/Optional*</include>
<include>com/google/common/base/Present*</include>
</includes>
</filter>
</filters>
Expand Down
32 changes: 22 additions & 10 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1030,28 +1030,40 @@ class SparkContext(config: SparkConf) extends Logging {
}

/**
* Support function for API backtraces.
* Set the thread-local property for overriding the call sites
* of actions and RDDs.
*/
def setCallSite(site: String) {
setLocalProperty("externalCallSite", site)
def setCallSite(shortCallSite: String) {
setLocalProperty(CallSite.SHORT_FORM, shortCallSite)
}

/**
* Support function for API backtraces.
* Set the thread-local property for overriding the call sites
* of actions and RDDs.
*/
private[spark] def setCallSite(callSite: CallSite) {
setLocalProperty(CallSite.SHORT_FORM, callSite.shortForm)
setLocalProperty(CallSite.LONG_FORM, callSite.longForm)
}

/**
* Clear the thread-local property for overriding the call sites
* of actions and RDDs.
*/
def clearCallSite() {
setLocalProperty("externalCallSite", null)
setLocalProperty(CallSite.SHORT_FORM, null)
setLocalProperty(CallSite.LONG_FORM, null)
}

/**
* Capture the current user callsite and return a formatted version for printing. If the user
* has overridden the call site, this will return the user's version.
* has overridden the call site using `setCallSite()`, this will return the user's version.
*/
private[spark] def getCallSite(): CallSite = {
Option(getLocalProperty("externalCallSite")) match {
case Some(callSite) => CallSite(callSite, longForm = "")
case None => Utils.getCallSite
}
Option(getLocalProperty(CallSite.SHORT_FORM)).map { case shortCallSite =>
val longCallSite = Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse("")
CallSite(shortCallSite, longCallSite)
}.getOrElse(Utils.getCallSite())
}

/**
Expand Down
48 changes: 48 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Uses the given Partitioner to partition the output RDD.
*/
def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
: JavaPairRDD[K, (Optional[V], Optional[W])] = {
val joinResult = rdd.fullOuterJoin(other, partitioner)
fromRDD(joinResult.mapValues{ case (v, w) =>
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
})
}

/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
* partitioner/parallelism level.
Expand Down Expand Up @@ -563,6 +579,38 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
* parallelism level.
*/
def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = {
val joinResult = rdd.fullOuterJoin(other)
fromRDD(joinResult.mapValues{ case (v, w) =>
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
})
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
*/
def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
: JavaPairRDD[K, (Optional[V], Optional[W])] = {
val joinResult = rdd.fullOuterJoin(other, numPartitions)
fromRDD(joinResult.mapValues{ case (v, w) =>
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
})
}

/**
* Return the key-value pairs in this RDD to the master as a Map.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
defaultProperties
}

// Respect SPARK_*_MEMORY for cluster mode
driverMemory = sys.env.get("SPARK_DRIVER_MEMORY").orNull
executorMemory = sys.env.get("SPARK_EXECUTOR_MEMORY").orNull

parseOpts(args.toList)
mergeSparkProperties()
checkRequiredArguments()
Expand Down
16 changes: 11 additions & 5 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.concurrent._

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -375,12 +376,17 @@ private[spark] class Executor(
}

val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
retryAttempts, retryIntervalMs, timeout)
if (response.reregisterBlockManager) {
logWarning("Told to re-register on heartbeat")
env.blockManager.reregister()
try {
val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
retryAttempts, retryIntervalMs, timeout)
if (response.reregisterBlockManager) {
logWarning("Told to re-register on heartbeat")
env.blockManager.reregister()
}
} catch {
case NonFatal(t) => logWarning("Issue communicating with driver in heartbeater", t)
}

Thread.sleep(interval)
}
}
Expand Down
42 changes: 42 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,23 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Uses the given Partitioner to partition the output RDD.
*/
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues {
case (vs, Seq()) => vs.map(v => (Some(v), None))
case (Seq(), ws) => ws.map(w => (None, Some(w)))
case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w))
}
}

/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level.
Expand Down Expand Up @@ -585,6 +602,31 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
rightOuterJoin(other, new HashPartitioner(numPartitions))
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
* parallelism level.
*/
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = {
fullOuterJoin(other, defaultPartitioner(self, other))
}

/**
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
* element (k, w) in `other`, the resulting RDD will either contain all pairs
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
* in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
*/
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = {
fullOuterJoin(other, new HashPartitioner(numPartitions))
}

/**
* Return the key-value pairs in this RDD to the master as a Map.
*
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.rdd

import java.util.Random
import java.util.{Properties, Random}

import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
Expand All @@ -41,7 +41,7 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}

Expand Down Expand Up @@ -1224,7 +1224,8 @@ abstract class RDD[T: ClassTag](
private var storageLevel: StorageLevel = StorageLevel.NONE

/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
@transient private[spark] val creationSite = Utils.getCallSite
@transient private[spark] val creationSite = sc.getCallSite()

private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("")

private[spark] def elementClassTag: ClassTag[T] = classTag[T]
Expand Down
27 changes: 20 additions & 7 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
/** CallSite represents a place in user code. It can have a short and a long form. */
private[spark] case class CallSite(shortForm: String, longForm: String)

private[spark] object CallSite {
val SHORT_FORM = "callSite.short"
val LONG_FORM = "callSite.long"
}

/**
* Various utility methods used by Spark.
*/
Expand Down Expand Up @@ -859,18 +864,26 @@ private[spark] object Utils extends Logging {
}
}

/**
* A regular expression to match classes of the "core" Spark API that we want to skip when
* finding the call site of a method.
*/
private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
/** Default filtering function for finding call sites using `getCallSite`. */
private def coreExclusionFunction(className: String): Boolean = {
// A regular expression to match classes of the "core" Spark API that we want to skip when
// finding the call site of a method.
val SPARK_CORE_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
val SCALA_CLASS_REGEX = """^scala""".r
val isSparkCoreClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined
val isScalaClass = SCALA_CLASS_REGEX.findFirstIn(className).isDefined
// If the class is a Spark internal class or a Scala class, then exclude.
isSparkCoreClass || isScalaClass
}

/**
* When called inside a class in the spark package, returns the name of the user code class
* (outside the spark package) that called into Spark, as well as which Spark method they called.
* This is used, for example, to tell users where in their code each RDD got created.
*
* @param skipClass Function that is used to exclude non-user-code classes.
*/
def getCallSite: CallSite = {
def getCallSite(skipClass: String => Boolean = coreExclusionFunction): CallSite = {
val trace = Thread.currentThread.getStackTrace()
.filterNot { ste:StackTraceElement =>
// When running under some profilers, the current stack trace might contain some bogus
Expand All @@ -891,7 +904,7 @@ private[spark] object Utils extends Logging {

for (el <- trace) {
if (insideSpark) {
if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) {
if (skipClass(el.getClassName)) {
lastSparkMethod = if (el.getMethodName == "<init>") {
// Spark method is a constructor; get its class name
el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
Expand Down
26 changes: 26 additions & 0 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -1307,4 +1307,30 @@ public void collectUnderlyingScalaRDD() {
SomeCustomClass[] collected = (SomeCustomClass[]) rdd.rdd().retag(SomeCustomClass.class).collect();
Assert.assertEquals(data.size(), collected.length);
}

/**
* Test for SPARK-3647. This test needs to use the maven-built assembly to trigger the issue,
* since that's the only artifact where Guava classes have been relocated.
*/
@Test
public void testGuavaOptional() {
// Stop the context created in setUp() and start a local-cluster one, to force usage of the
// assembly.
sc.stop();
JavaSparkContext localCluster = new JavaSparkContext("local-cluster[1,1,512]", "JavaAPISuite");
try {
JavaRDD<Integer> rdd1 = localCluster.parallelize(Arrays.asList(1, 2, null), 3);
JavaRDD<Optional<Integer>> rdd2 = rdd1.map(
new Function<Integer, Optional<Integer>>() {
@Override
public Optional<Integer> call(Integer i) {
return Optional.fromNullable(i);
}
});
rdd2.collect();
} finally {
localCluster.stop();
}
}

}
3 changes: 3 additions & 0 deletions core/src/test/scala/org/apache/spark/PartitioningSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,13 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(grouped2.join(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.fullOuterJoin(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner)

assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.fullOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)

assert(grouped2.map(_ => 1).partitioner === None)
Expand All @@ -218,6 +220,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,21 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
))
}

test("fullOuterJoin") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.fullOuterJoin(rdd2).collect()
assert(joined.size === 6)
assert(joined.toSet === Set(
(1, (Some(1), Some('x'))),
(1, (Some(2), Some('x'))),
(2, (Some(1), Some('y'))),
(2, (Some(1), Some('z'))),
(3, (Some(1), None)),
(4, (None, Some('w')))
))
}

test("join with no matches") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(rdd.join(emptyKv).collect().size === 0)
assert(rdd.rightOuterJoin(emptyKv).collect().size === 0)
assert(rdd.leftOuterJoin(emptyKv).collect().size === 2)
assert(rdd.fullOuterJoin(emptyKv).collect().size === 2)
assert(rdd.cogroup(emptyKv).collect().size === 2)
assert(rdd.union(emptyKv).collect().size === 2)
}
Expand Down
Loading

0 comments on commit 7e0cc36

Please sign in to comment.