Skip to content

Commit

Permalink
resolved merge issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilya Ganelin committed Jan 16, 2015
2 parents e446287 + e8422c5 commit 39f3810
Show file tree
Hide file tree
Showing 44 changed files with 854 additions and 241 deletions.
97 changes: 97 additions & 0 deletions core/src/main/java/org/apache/spark/JavaSparkListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark;

import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
import org.apache.spark.scheduler.SparkListenerTaskStart;
import org.apache.spark.scheduler.SparkListenerUnpersistRDD;

/**
* Java clients should extend this class instead of implementing
* SparkListener directly. This is to prevent java clients
* from breaking when new events are added to the SparkListener
* trait.
*
* This is a concrete class instead of abstract to enforce
* new events get added to both the SparkListener and this adapter
* in lockstep.
*/
public class JavaSparkListener implements SparkListener {

@Override
public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { }

@Override
public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { }

@Override
public void onTaskStart(SparkListenerTaskStart taskStart) { }

@Override
public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { }

@Override
public void onTaskEnd(SparkListenerTaskEnd taskEnd) { }

@Override
public void onJobStart(SparkListenerJobStart jobStart) { }

@Override
public void onJobEnd(SparkListenerJobEnd jobEnd) { }

@Override
public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { }

@Override
public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { }

@Override
public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { }

@Override
public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { }

@Override
public void onApplicationStart(SparkListenerApplicationStart applicationStart) { }

@Override
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { }

@Override
public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { }

@Override
public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { }

@Override
public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
}
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
val inputMetrics = blockResult.inputMetrics
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(inputMetrics.readMethod)
existingMetrics.addBytesRead(inputMetrics.bytesRead)

new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

case None =>
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -520,10 +520,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

/** Distribute a local Scala collection to form an RDD.
*
* @note Parallelize acts lazily. If `seq` is a mutable collection and is
* altered after the call to parallelize and before the first action on the
* RDD, the resultant RDD will reflect the modified collection. Pass a copy of
* the argument to avoid this.
* @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
* to parallelize and before the first action on the RDD, the resultant RDD will reflect the
* modified collection. Pass a copy of the argument to avoid this.
*/
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable.ListBuffer

import org.apache.log4j.Level

import org.apache.spark.util.MemoryParam
import org.apache.spark.util.{IntParam, MemoryParam}

/**
* Command-line parser for the driver client.
Expand Down Expand Up @@ -51,8 +51,8 @@ private[spark] class ClientArguments(args: Array[String]) {
parse(args.toList)

def parse(args: List[String]): Unit = args match {
case ("--cores" | "-c") :: value :: tail =>
cores = value.toInt
case ("--cores" | "-c") :: IntParam(value) :: tail =>
cores = value
parse(tail)

case ("--memory" | "-m") :: MemoryParam(value) :: tail =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ object SparkSubmit {
// Yarn cluster only
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"),
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
.orElse(sparkProperties.get("spark.driver.memory"))
.orElse(env.get("SPARK_DRIVER_MEMORY"))
.orNull
driverCores = Option(driverCores)
.orElse(sparkProperties.get("spark.driver.cores"))
.orNull
executorMemory = Option(executorMemory)
.orElse(sparkProperties.get("spark.executor.memory"))
.orElse(env.get("SPARK_EXECUTOR_MEMORY"))
Expand Down Expand Up @@ -406,6 +409,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| --total-executor-cores NUM Total cores for all executors.
|
| YARN-only:
| --driver-cores NUM Number of cores used by the driver, only in cluster mode
| (Default: 1).
| --executor-cores NUM Number of cores per executor (Default: 1).
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
| --num-executors NUM Number of executors to launch (Default: 2).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ private[spark] class ApplicationInfo(
extends Serializable {

@transient var state: ApplicationState.Value = _
@transient var executors: mutable.HashMap[Int, ExecutorInfo] = _
@transient var removedExecutors: ArrayBuffer[ExecutorInfo] = _
@transient var executors: mutable.HashMap[Int, ExecutorDesc] = _
@transient var removedExecutors: ArrayBuffer[ExecutorDesc] = _
@transient var coresGranted: Int = _
@transient var endTime: Long = _
@transient var appSource: ApplicationSource = _
Expand All @@ -55,12 +55,12 @@ private[spark] class ApplicationInfo(

private def init() {
state = ApplicationState.WAITING
executors = new mutable.HashMap[Int, ExecutorInfo]
executors = new mutable.HashMap[Int, ExecutorDesc]
coresGranted = 0
endTime = -1L
appSource = new ApplicationSource(this)
nextExecutorId = 0
removedExecutors = new ArrayBuffer[ExecutorInfo]
removedExecutors = new ArrayBuffer[ExecutorDesc]
}

private def newExecutorId(useID: Option[Int] = None): Int = {
Expand All @@ -75,14 +75,14 @@ private[spark] class ApplicationInfo(
}
}

def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorInfo = {
val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorDesc = {
val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
executors(exec.id) = exec
coresGranted += cores
exec
}

def removeExecutor(exec: ExecutorInfo) {
def removeExecutor(exec: ExecutorDesc) {
if (executors.contains(exec.id)) {
removedExecutors += executors(exec.id)
executors -= exec.id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.deploy.master

import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}

private[spark] class ExecutorInfo(
private[spark] class ExecutorDesc(
val id: Int,
val application: ApplicationInfo,
val worker: WorkerInfo,
Expand All @@ -37,7 +37,7 @@ private[spark] class ExecutorInfo(

override def equals(other: Any): Boolean = {
other match {
case info: ExecutorInfo =>
case info: ExecutorDesc =>
fullId == info.fullId &&
worker.id == info.worker.id &&
cores == info.cores &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ private[spark] class Master(
}
}

def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(masterUrl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private[spark] class WorkerInfo(
Utils.checkHost(host, "Expected hostname")
assert (port > 0)

@transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // executorId => info
@transient var executors: mutable.HashMap[String, ExecutorDesc] = _ // executorId => info
@transient var drivers: mutable.HashMap[String, DriverInfo] = _ // driverId => info
@transient var state: WorkerState.Value = _
@transient var coresUsed: Int = _
Expand Down Expand Up @@ -70,13 +70,13 @@ private[spark] class WorkerInfo(
host + ":" + port
}

def addExecutor(exec: ExecutorInfo) {
def addExecutor(exec: ExecutorDesc) {
executors(exec.fullId) = exec
coresUsed += exec.cores
memoryUsed += exec.memory
}

def removeExecutor(exec: ExecutorInfo) {
def removeExecutor(exec: ExecutorDesc) {
if (executors.contains(exec.fullId)) {
executors -= exec.fullId
coresUsed -= exec.cores
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.json4s.JValue

import org.apache.spark.deploy.{ExecutorState, JsonProtocol}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.deploy.master.ExecutorDesc
import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -109,7 +109,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app
UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
}

private def executorRow(executor: ExecutorInfo): Seq[Node] = {
private def executorRow(executor: ExecutorDesc): Seq[Node] = {
<tr>
<td>{executor.id}</td>
<td>
Expand Down
20 changes: 11 additions & 9 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,10 @@ private[spark] class Executor(
val afterSerialization = System.currentTimeMillis()

for (m <- task.metrics) {
m.incExecutorDeserializeTime(taskStart - deserializeStartTime)
m.incExecutorRunTime(taskFinish - taskStart)
m.incJvmGCTime(gcTime - startGCTime)
m.incResultSerializationTime(afterSerialization - beforeSerialization)
m.setExecutorDeserializeTime(taskStart - deserializeStartTime)
m.setExecutorRunTime(taskFinish - taskStart)
m.setJvmGCTime(gcTime - startGCTime)
m.setResultSerializationTime(afterSerialization - beforeSerialization)
}

val accumUpdates = Accumulators.values
Expand Down Expand Up @@ -257,8 +257,8 @@ private[spark] class Executor(
val serviceTime = System.currentTimeMillis() - taskStart
val metrics = attemptedTask.flatMap(t => t.metrics)
for (m <- metrics) {
m.incExecutorRunTime(serviceTime)
m.incJvmGCTime(gcTime - startGCTime)
m.setExecutorRunTime(serviceTime)
m.setJvmGCTime(gcTime - startGCTime)
}
val reason = new ExceptionFailure(t, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
Expand Down Expand Up @@ -376,10 +376,12 @@ private[spark] class Executor(
val curGCTime = gcTime

for (taskRunner <- runningTasks.values()) {
if (!taskRunner.attemptedTask.isEmpty) {
if (taskRunner.attemptedTask.nonEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
metrics.updateShuffleReadMetrics
metrics.incJvmGCTime(curGCTime - taskRunner.startGCTime)
metrics.updateShuffleReadMetrics()
metrics.updateInputMetrics()
metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)

if (isLocal) {
// JobProgressListener will hold an reference of it during
// onExecutorMetricsUpdate(), then JobProgressListener can not see
Expand Down
Loading

0 comments on commit 39f3810

Please sign in to comment.