Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import akka.actor.ActorRef
import com.google.common.base.Charsets
import com.google.common.io.Files

import org.apache.spark.Logging
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import org.apache.spark.util.logging.FileAppender

/**
* Manages the execution of one executor process.
Expand All @@ -42,12 +43,15 @@ private[spark] class ExecutorRunner(
val sparkHome: File,
val workDir: File,
val workerUrl: String,
val conf: SparkConf,
var state: ExecutorState.Value)
extends Logging {

val fullId = appId + "/" + execId
var workerThread: Thread = null
var process: Process = null
var stdoutAppender: FileAppender = null
var stderrAppender: FileAppender = null

// NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might
// make sense to remove this in the future.
Expand Down Expand Up @@ -76,6 +80,13 @@ private[spark] class ExecutorRunner(
if (process != null) {
logInfo("Killing process!")
process.destroy()
process.waitFor()
if (stdoutAppender != null) {
stdoutAppender.stop()
}
if (stderrAppender != null) {
stderrAppender.stop()
}
val exitCode = process.waitFor()
worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode))
}
Expand Down Expand Up @@ -137,11 +148,11 @@ private[spark] class ExecutorRunner(

// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
CommandUtils.redirectStream(process.getInputStream, stdout)
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, Charsets.UTF_8)
CommandUtils.redirectStream(process.getErrorStream, stderr)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
// long-lived processes only. However, in the future, we might restart the executor a few
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private[spark] class Worker(
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host,
appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
workDir, akkaUrl, ExecutorState.RUNNING)
workDir, akkaUrl, conf, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
Expand Down
78 changes: 44 additions & 34 deletions core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import scala.xml.Node

import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils
import org.apache.spark.Logging
import org.apache.spark.util.logging.{FileAppender, RollingFileAppender}

private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with Logging {
private val worker = parent.worker
private val workDir = parent.workDir

Expand All @@ -39,21 +41,18 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
val offset = Option(request.getParameter("offset")).map(_.toLong)
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)

val path = (appId, executorId, driverId) match {
val logDir = (appId, executorId, driverId) match {
case (Some(a), Some(e), None) =>
s"${workDir.getPath}/$appId/$executorId/$logType"
s"${workDir.getPath}/$appId/$executorId/"
case (None, None, Some(d)) =>
s"${workDir.getPath}/$driverId/$logType"
s"${workDir.getPath}/$driverId/"
case _ =>
throw new Exception("Request must specify either application or driver identifiers")
}

val (startByte, endByte) = getByteRange(path, offset, byteLength)
val file = new File(path)
val logLength = file.length

val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n"
pre + Utils.offsetBytes(path, startByte, endByte)
val (logText, startByte, endByte, logLength) = getLog(logDir, logType, offset, byteLength)
val pre = s"==== Bytes $startByte-$endByte of $logLength of $logDir$logType ====\n"
pre + logText
}

def render(request: HttpServletRequest): Seq[Node] = {
Expand All @@ -65,19 +64,16 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
val offset = Option(request.getParameter("offset")).map(_.toLong)
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)

val (path, params) = (appId, executorId, driverId) match {
val (logDir, params) = (appId, executorId, driverId) match {
case (Some(a), Some(e), None) =>
(s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e")
(s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e")
case (None, None, Some(d)) =>
(s"${workDir.getPath}/$d/$logType", s"driverId=$d")
(s"${workDir.getPath}/$d/", s"driverId=$d")
case _ =>
throw new Exception("Request must specify either application or driver identifiers")
}

val (startByte, endByte) = getByteRange(path, offset, byteLength)
val file = new File(path)
val logLength = file.length
val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>
val (logText, startByte, endByte, logLength) = getLog(logDir, logType, offset, byteLength)
val linkToMaster = <p><a href={worker.activeMasterWebUiUrl}>Back to Master</a></p>
val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span>

Expand Down Expand Up @@ -127,23 +123,37 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
UIUtils.basicSparkPage(content, logType + " log page for " + appId)
}

/** Determine the byte range for a log or log page. */
private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = {
val defaultBytes = 100 * 1024
val maxBytes = 1024 * 1024
val file = new File(path)
val logLength = file.length()
val getOffset = offset.getOrElse(logLength - defaultBytes)
val startByte =
if (getOffset < 0) {
0L
} else if (getOffset > logLength) {
logLength
} else {
getOffset
/** Get the part of the log files given the offset and desired length of bytes */
private def getLog(
logDirectory: String,
logType: String,
offsetOption: Option[Long],
byteLength: Int
): (String, Long, Long, Long) = {
try {
val files = RollingFileAppender.getSortedRolledOverFiles(logDirectory, logType)
logDebug(s"Sorted log files of type $logType in $logDirectory:\n${files.mkString("\n")}")

val totalLength = files.map { _.length }.sum
val offset = offsetOption.getOrElse(totalLength - byteLength)
val startIndex = {
if (offset < 0) {
0L
} else if (offset > totalLength) {
totalLength
} else {
offset
}
}
val logPageLength = math.min(byteLength, maxBytes)
val endByte = math.min(startByte + logPageLength, logLength)
(startByte, endByte)
val endIndex = math.min(startIndex + totalLength, totalLength)
logDebug(s"Getting log from $startIndex to $endIndex")
val logText = Utils.offsetBytes(files, startIndex, endIndex)
logDebug(s"Got log of length ${logText.length} bytes")
(logText, startIndex, endIndex, totalLength)
} catch {
case e: Exception =>
logError(s"Error getting $logType logs from directory $logDirectory", e)
("Error getting logs due to exception: " + e.getMessage, 0, 0, 0)
}
}
}
53 changes: 53 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,59 @@ private[spark] object Utils extends Logging {
Source.fromBytes(buff).mkString
}

/**
* Return a string containing data across a set of files. The `startIndex`
* and `endIndex` is based on the cumulative size of all the files take in
* the given order. See figure below for more details.
*/
def offsetBytes(files: Seq[File], start: Long, end: Long): String = {
val fileLengths = files.map { _.length }
val startIndex = math.max(start, 0)
val endIndex = math.min(end, fileLengths.sum)
val fileToLength = files.zip(fileLengths).toMap
logDebug("Log files: \n" + fileToLength.mkString("\n"))

val stringBuffer = new StringBuffer((endIndex - startIndex).toInt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice here to assert some basic things to throw better exceptions if they are wrong:

startIndex >= 0
endIndex > startIndex
endIndex > length of all files

var sum = 0L
for (file <- files) {
val startIndexOfFile = sum
val endIndexOfFile = sum + fileToLength(file)
logDebug(s"Processing file $file, " +
s"with start index = $startIndexOfFile, end index = $endIndex")

/*
____________
range 1: | |
| case A |

files: |==== file 1 ====|====== file 2 ======|===== file 3 =====|

| case B . case C . case D |
range 2: |___________.____________________.______________|
*/

if (startIndex <= startIndexOfFile && endIndex >= endIndexOfFile) {
// Case C: read the whole file
stringBuffer.append(offsetBytes(file.getAbsolutePath, 0, fileToLength(file)))
} else if (startIndex > startIndexOfFile && startIndex < endIndexOfFile) {
// Case A and B: read from [start of required range] to [end of file / end of range]
val effectiveStartIndex = startIndex - startIndexOfFile
val effectiveEndIndex = math.min(endIndex - startIndexOfFile, fileToLength(file))
stringBuffer.append(Utils.offsetBytes(
file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex))
} else if (endIndex > startIndexOfFile && endIndex < endIndexOfFile) {
// Case D: read from [start of file] to [end of require range]
val effectiveStartIndex = math.max(startIndex - startIndexOfFile, 0)
val effectiveEndIndex = endIndex - startIndexOfFile
stringBuffer.append(Utils.offsetBytes(
file.getAbsolutePath, effectiveStartIndex, effectiveEndIndex))
}
sum += fileToLength(file)
logDebug(s"After processing file $file, string built is ${stringBuffer.toString}}")
}
stringBuffer.toString
}

/**
* Clone an object using a Spark serializer.
*/
Expand Down
Loading