Skip to content

Commit

Permalink
Merge pull request #3 from markhamstra/master-csd
Browse files Browse the repository at this point in the history
SPY-138 DAGScheduler job <--> stage accounting
  • Loading branch information
vnivargi committed Dec 13, 2013
2 parents 499fa12 + e5684e5 commit c3fdbab
Show file tree
Hide file tree
Showing 84 changed files with 5,576 additions and 580 deletions.
559 changes: 559 additions & 0 deletions CHANGES.txt

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ versions without YARN, use:
# Cloudera CDH 4.2.0 with MapReduce v1
$ SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.2.0 sbt/sbt assembly

For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
For Apache Hadoop 2.0.X, 2.1.X, 2.2.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
with YARN, also set `SPARK_YARN=true`:

# Apache Hadoop 2.0.5-alpha
Expand All @@ -64,8 +64,8 @@ with YARN, also set `SPARK_YARN=true`:
# Cloudera CDH 4.2.0 with MapReduce v2
$ SPARK_HADOOP_VERSION=2.0.0-cdh4.2.0 SPARK_YARN=true sbt/sbt assembly

For convenience, these variables may also be set through the `conf/spark-env.sh` file
described below.
# Apache Hadoop 2.2.0 with YARN
$ SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt assembly

When developing a Spark application, specify the Hadoop version by adding the
"hadoop-client" artifact to your project's dependencies. For example, if you're
Expand Down
28 changes: 28 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@
</repositories>

<dependencies>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-actor</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-remote</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-slf4j</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-zeromq</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.9.3</artifactId>
Expand Down Expand Up @@ -140,6 +157,17 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>new-yarn</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.9.3</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>

<profile>
<id>bigtop-dist</id>
<!-- This profile uses the assembly plugin to create a special "dist" package for BigTop
Expand Down
12 changes: 12 additions & 0 deletions bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@
<url>http://spark.incubator.apache.org/</url>

<dependencies>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-actor</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-remote</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.9.3</artifactId>
Expand Down
10 changes: 3 additions & 7 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<groupId>${akka.group}</groupId>
<artifactId>akka-actor</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<groupId>${akka.group}</groupId>
<artifactId>akka-remote</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<groupId>${akka.group}</groupId>
<artifactId>akka-slf4j</artifactId>
</dependency>
<dependency>
Expand All @@ -126,10 +126,6 @@
<groupId>colt</groupId>
<artifactId>colt</artifactId>
</dependency>
<dependency>
<groupId>com.github.scala-incubator.io</groupId>
<artifactId>scala-io-file_2.9.2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.mesos</groupId>
<artifactId>mesos</artifactId>
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ private[spark] class MapOutputTracker extends Logging {
case Some(bytes) =>
return bytes
case None =>
statuses = mapStatuses(shuffleId)
statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
epochGotten = epoch
}
}
Expand All @@ -261,9 +261,13 @@ private[spark] class MapOutputTracker extends Logging {
cachedSerializedStatuses(shuffleId) = bytes
}
}
return bytes
bytes
}

def has(shuffleId: Int): Boolean = {
cachedSerializedStatuses.get(shuffleId).isDefined || mapStatuses.contains(shuffleId)
}

// Serialize an array of map output locations into an efficient byte format so that we can send
// it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
// generally be pretty compressible because many map outputs will be on the same hostname.
Expand Down
28 changes: 26 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger

import scala.collection.Map
import scala.collection.generic.Growable
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap

Expand Down Expand Up @@ -82,7 +81,7 @@ class SparkContext(
val sparkHome: String = null,
val jars: Seq[String] = Nil,
val environment: Map[String, String] = Map(),
// This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc)
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc)
// too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set
// of data-local splits on host
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
Expand Down Expand Up @@ -227,6 +226,31 @@ class SparkContext(
scheduler.initialize(backend)
scheduler

case "yarn-client" =>
val scheduler = try {
val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(this).asInstanceOf[ClusterScheduler]

} catch {
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
}
}

val backend = try {
val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
val cons = clazz.getConstructor(classOf[ClusterScheduler], classOf[SparkContext])
cons.newInstance(scheduler, this).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case th: Throwable => {
throw new SparkException("YARN mode not available ?", th)
}
}

scheduler.initialize(backend)
scheduler

case _ =>
if (MESOS_REGEX.findFirstIn(master).isEmpty) {
logWarning("Master %s does not match expected format, parsing as Mesos URL".format(master))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark.deploy

import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
import akka.actor.ActorSystem

import org.apache.spark.deploy.worker.Worker
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.{Logging}
import org.apache.spark.util.Utils
import org.apache.spark.Logging

import scala.collection.mutable.ArrayBuffer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import akka.actor.{ActorRef, Actor, Props, Terminated}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}

import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.Logging
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{Utils, AkkaUtils}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package org.apache.spark.executor
import com.codahale.metrics.{Gauge, MetricRegistry}

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.fs.LocalFileSystem

import scala.collection.JavaConversions._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[spark] object ShuffleCopier extends Logging {
extends FileClientHandler with Logging {

override def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader) {
logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)");
logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)")
resultCollectCallBack(header.blockId, header.fileLen.toLong, in.readBytes(header.fileLen))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
override def compute(split: Partition, context: TaskContext) = {
val currSplit = split.asInstanceOf[CartesianPartition]
for (x <- rdd1.iterator(currSplit.s1, context);
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
}

override def getDependencies: Seq[Dependency[_]] = List(
Expand Down
Loading

0 comments on commit c3fdbab

Please sign in to comment.