diff --git a/core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder b/core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder new file mode 100644 index 0000000000000..784e58270ab42 --- /dev/null +++ b/core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder @@ -0,0 +1 @@ +org.apache.spark.deploy.history.BasicEventFilterBuilder \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala new file mode 100644 index 0000000000000..106da1675f71e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import scala.collection.mutable + +import org.apache.spark.SparkContext +import org.apache.spark.deploy.history.EventFilter.FilterStatistics +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.storage.BlockManagerId + +/** + * This class tracks both live jobs and live executors, and pass the list to the + * [[BasicEventFilter]] to help BasicEventFilter to reject finished jobs (+ stages/tasks/RDDs) + * and dead executors. + */ +private[spark] class BasicEventFilterBuilder extends SparkListener with EventFilterBuilder { + private val _liveJobToStages = new mutable.HashMap[Int, Set[Int]] + private val _stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]] + private val _stageToRDDs = new mutable.HashMap[Int, Set[Int]] + private val _liveExecutors = new mutable.HashSet[String] + + private var totalJobs: Long = 0L + private var totalStages: Long = 0L + private var totalTasks: Long = 0L + + def liveJobs: Set[Int] = _liveJobToStages.keySet.toSet + def liveStages: Set[Int] = _stageToRDDs.keySet.toSet + def liveTasks: Set[Long] = _stageToTasks.values.flatten.toSet + def liveRDDs: Set[Int] = _stageToRDDs.values.flatten.toSet + def liveExecutors: Set[String] = _liveExecutors.toSet + + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + totalJobs += 1 + totalStages += jobStart.stageIds.length + _liveJobToStages += jobStart.jobId -> jobStart.stageIds.toSet + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + val stages = _liveJobToStages.getOrElse(jobEnd.jobId, Seq.empty[Int]) + _liveJobToStages -= jobEnd.jobId + _stageToTasks --= stages + _stageToRDDs --= stages + } + + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { + val stageId = stageSubmitted.stageInfo.stageId + _stageToRDDs.put(stageId, stageSubmitted.stageInfo.rddInfos.map(_.id).toSet) + _stageToTasks.getOrElseUpdate(stageId, new mutable.HashSet[Long]()) + } + + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + totalTasks += 1 + _stageToTasks.get(taskStart.stageId).foreach { tasks => + tasks += taskStart.taskInfo.taskId + } + } + + override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { + _liveExecutors += executorAdded.executorId + } + + override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + _liveExecutors -= executorRemoved.executorId + } + + override def createFilter(): EventFilter = { + val stats = FilterStatistics(totalJobs, liveJobs.size, totalStages, + liveStages.size, totalTasks, liveTasks.size) + + new BasicEventFilter(stats, liveJobs, liveStages, liveTasks, liveRDDs, liveExecutors) + } +} + +/** + * This class provides the functionality to reject events which are related to the finished + * jobs based on the given information. This class only deals with job related events, and provides + * a PartialFunction which returns false for rejected events for finished jobs, returns true + * otherwise. + */ +private[spark] abstract class JobEventFilter( + stats: Option[FilterStatistics], + liveJobs: Set[Int], + liveStages: Set[Int], + liveTasks: Set[Long], + liveRDDs: Set[Int]) extends EventFilter with Logging { + + logDebug(s"jobs : $liveJobs") + logDebug(s"stages : $liveStages") + logDebug(s"tasks : $liveTasks") + logDebug(s"RDDs : $liveRDDs") + + override def statistics(): Option[FilterStatistics] = stats + + protected val acceptFnForJobEvents: PartialFunction[SparkListenerEvent, Boolean] = { + case e: SparkListenerStageCompleted => + liveStages.contains(e.stageInfo.stageId) + case e: SparkListenerStageSubmitted => + liveStages.contains(e.stageInfo.stageId) + case e: SparkListenerTaskStart => + liveTasks.contains(e.taskInfo.taskId) + case e: SparkListenerTaskGettingResult => + liveTasks.contains(e.taskInfo.taskId) + case e: SparkListenerTaskEnd => + liveTasks.contains(e.taskInfo.taskId) + case e: SparkListenerJobStart => + liveJobs.contains(e.jobId) + case e: SparkListenerJobEnd => + liveJobs.contains(e.jobId) + case e: SparkListenerUnpersistRDD => + liveRDDs.contains(e.rddId) + case e: SparkListenerExecutorMetricsUpdate => + e.accumUpdates.exists { case (taskId, stageId, _, _) => + liveTasks.contains(taskId) || liveStages.contains(stageId) + } + case e: SparkListenerSpeculativeTaskSubmitted => + liveStages.contains(e.stageId) + } +} + +/** + * This class rejects events which are related to the finished jobs or dead executors, + * based on the given information. The events which are not related to the job and executor + * will be considered as "Don't mind". + */ +private[spark] class BasicEventFilter( + _stats: FilterStatistics, + _liveJobs: Set[Int], + _liveStages: Set[Int], + _liveTasks: Set[Long], + _liveRDDs: Set[Int], + liveExecutors: Set[String]) + extends JobEventFilter( + Some(_stats), + _liveJobs, + _liveStages, + _liveTasks, + _liveRDDs) with Logging { + + logDebug(s"live executors : $liveExecutors") + + private val _acceptFn: PartialFunction[SparkListenerEvent, Boolean] = { + case e: SparkListenerExecutorAdded => liveExecutors.contains(e.executorId) + case e: SparkListenerExecutorRemoved => liveExecutors.contains(e.executorId) + case e: SparkListenerExecutorBlacklisted => liveExecutors.contains(e.executorId) + case e: SparkListenerExecutorUnblacklisted => liveExecutors.contains(e.executorId) + case e: SparkListenerStageExecutorMetrics => liveExecutors.contains(e.execId) + case e: SparkListenerBlockManagerAdded => acceptBlockManagerEvent(e.blockManagerId) + case e: SparkListenerBlockManagerRemoved => acceptBlockManagerEvent(e.blockManagerId) + case e: SparkListenerBlockUpdated => acceptBlockManagerEvent(e.blockUpdatedInfo.blockManagerId) + } + + private def acceptBlockManagerEvent(blockManagerId: BlockManagerId): Boolean = { + blockManagerId.isDriver || liveExecutors.contains(blockManagerId.executorId) + } + + override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = { + _acceptFn.orElse(acceptFnForJobEvents) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala new file mode 100644 index 0000000000000..a5f2394960b70 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import scala.io.{Codec, Source} +import scala.util.control.NonFatal + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.json4s.jackson.JsonMethods.parse + +import org.apache.spark.deploy.history.EventFilter.FilterStatistics +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.util.{JsonProtocol, Utils} + +/** + * EventFilterBuilder provides the interface to gather the information from events being received + * by [[SparkListenerInterface]], and create a new [[EventFilter]] instance which leverages + * information gathered to decide whether the event should be accepted or not. + */ +private[spark] trait EventFilterBuilder extends SparkListenerInterface { + def createFilter(): EventFilter +} + +/** [[EventFilter]] decides whether the given event should be accepted or rejected. */ +private[spark] trait EventFilter { + /** + * Provide statistic information of event filter, which would be used for measuring the score + * of compaction. + * + * To simplify the condition, currently the fields of statistic are static, since major kinds of + * events compaction would filter out are job related event types. If the filter doesn't track + * with job related events, return None instead. + */ + def statistics(): Option[FilterStatistics] + + /** + * Classify whether the event is accepted or rejected by this filter. + * + * The method should return the partial function which matches the events where the filter can + * decide whether the event should be accepted or rejected. Otherwise it should leave the events + * be unmatched. + */ + def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] +} + +private[spark] object EventFilter extends Logging { + case class FilterStatistics( + totalJobs: Long, + liveJobs: Long, + totalStages: Long, + liveStages: Long, + totalTasks: Long, + liveTasks: Long) + + def applyFilterToFile( + fs: FileSystem, + filters: Seq[EventFilter], + path: Path, + onAccepted: (String, SparkListenerEvent) => Unit, + onRejected: (String, SparkListenerEvent) => Unit, + onUnidentified: String => Unit): Unit = { + Utils.tryWithResource(EventLogFileReader.openEventLog(path, fs)) { in => + val lines = Source.fromInputStream(in)(Codec.UTF8).getLines() + + lines.zipWithIndex.foreach { case (line, lineNum) => + try { + val event = try { + Some(JsonProtocol.sparkEventFromJson(parse(line))) + } catch { + // ignore any exception occurred from unidentified json + case NonFatal(_) => + onUnidentified(line) + None + } + + event.foreach { e => + val results = filters.flatMap(_.acceptFn().lift.apply(e)) + if (results.nonEmpty && results.forall(_ == false)) { + onRejected(line, e) + } else { + onAccepted(line, e) + } + } + } catch { + case e: Exception => + logError(s"Exception parsing Spark event log: ${path.getName}", e) + logError(s"Malformed line #$lineNum: $line\n") + throw e + } + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala new file mode 100644 index 0000000000000..80a0a7067a4e4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.IOException +import java.net.URI +import java.util.ServiceLoader + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.history.EventFilter.FilterStatistics +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{EVENT_LOG_COMPACTION_SCORE_THRESHOLD, EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN} +import org.apache.spark.scheduler.ReplayListenerBus +import org.apache.spark.util.Utils + +/** + * This class compacts the old event log files into one compact file, via two phases reading: + * + * 1) Initialize available [[EventFilterBuilder]] instances, and replay the old event log files with + * builders, so that these builders can gather the information to create [[EventFilter]] instances. + * 2) Initialize [[EventFilter]] instances from [[EventFilterBuilder]] instances, and replay the + * old event log files with filters. Rewrite the events to the compact file which the filters decide + * to accept. + * + * This class will calculate the score based on statistic from [[EventFilter]] instances, which + * represents approximate rate of filtered-out events. Score is being calculated via applying + * heuristic; task events tend to take most size in event log. + */ +class EventLogFileCompactor( + sparkConf: SparkConf, + hadoopConf: Configuration, + fs: FileSystem) extends Logging { + private val maxFilesToRetain: Int = sparkConf.get(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN) + private val compactionThresholdScore: Double = sparkConf.get(EVENT_LOG_COMPACTION_SCORE_THRESHOLD) + + /** + * Compacts the old event log files into one compact file, and clean old event log files being + * compacted away. + * + * This method assumes caller will provide the sorted list of files which are sorted by + * the index of event log file, with at most one compact file placed first if it exists. + * + * When compacting the files, the range of compaction for given file list is determined as: + * (first ~ the file where there're `maxFilesToRetain` files on the right side) + * + * This method skips compaction for some circumstances described below: + * - not enough files on the range of compaction + * - score is lower than the threshold of compaction (meaning compaction won't help much) + * + * If this method returns the compaction result as SUCCESS, caller needs to re-read the list + * of event log files, as new compact file is available as well as old event log files are + * removed. + */ + def compact(eventLogFiles: Seq[FileStatus]): CompactionResult = { + assertPrecondition(eventLogFiles) + + if (eventLogFiles.length < maxFilesToRetain) { + return CompactionResult(CompactionResultCode.NOT_ENOUGH_FILES, None) + } + + val filesToCompact = findFilesToCompact(eventLogFiles) + if (filesToCompact.isEmpty) { + CompactionResult(CompactionResultCode.NOT_ENOUGH_FILES, None) + } else { + val builders = initializeBuilders(fs, filesToCompact.map(_.getPath)) + + val filters = builders.map(_.createFilter()) + val minScore = filters.flatMap(_.statistics()).map(calculateScore).min + + if (minScore < compactionThresholdScore) { + CompactionResult(CompactionResultCode.LOW_SCORE_FOR_COMPACTION, None) + } else { + rewrite(filters, filesToCompact) + cleanupCompactedFiles(filesToCompact) + CompactionResult(CompactionResultCode.SUCCESS, Some( + RollingEventLogFilesWriter.getEventLogFileIndex(filesToCompact.last.getPath.getName))) + } + } + } + + private def assertPrecondition(eventLogFiles: Seq[FileStatus]): Unit = { + val idxCompactedFiles = eventLogFiles.zipWithIndex.filter { case (file, _) => + EventLogFileWriter.isCompacted(file.getPath) + } + require(idxCompactedFiles.size < 2 && idxCompactedFiles.headOption.forall(_._2 == 0), + "The number of compact files should be at most 1, and should be placed first if exists.") + } + + /** + * Loads all available EventFilterBuilders in classloader via ServiceLoader, and initializes + * them via replaying events in given files. + */ + private def initializeBuilders(fs: FileSystem, files: Seq[Path]): Seq[EventFilterBuilder] = { + val bus = new ReplayListenerBus() + + val builders = ServiceLoader.load(classOf[EventFilterBuilder], + Utils.getContextOrSparkClassLoader).asScala.toSeq + builders.foreach(bus.addListener) + + files.foreach { log => + Utils.tryWithResource(EventLogFileReader.openEventLog(log, fs)) { in => + bus.replay(in, log.getName) + } + } + + builders + } + + private def calculateScore(stats: FilterStatistics): Double = { + // For now it's simply measuring how many task events will be filtered out (rejected) + // but it can be sophisticated later once we get more heuristic information and found + // the case where this simple calculation doesn't work. + (stats.totalTasks - stats.liveTasks) * 1.0 / stats.totalTasks + } + + /** + * This method rewrites the event log files into one compact file: the compact file will only + * contain the events which pass the filters. Events will be dropped only when all filters + * decide to reject the event or don't mind about the event. Otherwise, the original line for + * the event is written to the compact file as it is. + */ + private[history] def rewrite( + filters: Seq[EventFilter], + eventLogFiles: Seq[FileStatus]): String = { + require(eventLogFiles.nonEmpty) + + val lastIndexEventLogPath = eventLogFiles.last.getPath + val logWriter = new CompactedEventLogFileWriter(lastIndexEventLogPath, "dummy", None, + lastIndexEventLogPath.getParent.toUri, sparkConf, hadoopConf) + + logWriter.start() + eventLogFiles.foreach { file => + EventFilter.applyFilterToFile(fs, filters, file.getPath, + onAccepted = (line, _) => logWriter.writeEvent(line, flushLogger = true), + onRejected = (_, _) => {}, + onUnidentified = line => logWriter.writeEvent(line, flushLogger = true) + ) + } + logWriter.stop() + + logWriter.logPath + } + + private def cleanupCompactedFiles(files: Seq[FileStatus]): Unit = { + files.foreach { file => + var deleted = false + try { + deleted = fs.delete(file.getPath, true) + } catch { + case _: IOException => + } + if (!deleted) { + logWarning(s"Failed to remove ${file.getPath} / skip removing.") + } + } + } + + private def findFilesToCompact(eventLogFiles: Seq[FileStatus]): Seq[FileStatus] = { + val numNormalEventLogFiles = { + if (EventLogFileWriter.isCompacted(eventLogFiles.head.getPath)) { + eventLogFiles.length - 1 + } else { + eventLogFiles.length + } + } + + // This avoids compacting only compact file. + if (numNormalEventLogFiles > maxFilesToRetain) { + eventLogFiles.dropRight(maxFilesToRetain) + } else { + Seq.empty + } + } +} + +/** + * Describes the result of compaction. + * + * @param code The result of compaction. + * @param compactIndex The index of compact file if the compaction is successful. + * Otherwise it will be None. + */ +case class CompactionResult(code: CompactionResultCode.Value, compactIndex: Option[Long]) + +object CompactionResultCode extends Enumeration { + val SUCCESS, NOT_ENOUGH_FILES, LOW_SCORE_FOR_COMPACTION = Value +} + +/** + * This class helps to write compact file; to avoid reimplementing everything, it extends + * [[SingleEventLogFileWriter]], but only `originalFilePath` is used to determine the + * path of compact file. + */ +private class CompactedEventLogFileWriter( + originalFilePath: Path, + appId: String, + appAttemptId: Option[String], + logBaseDir: URI, + sparkConf: SparkConf, + hadoopConf: Configuration) + extends SingleEventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) { + + override val logPath: String = originalFilePath.toUri.toString + EventLogFileWriter.COMPACTED +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala index c8956ed3d423d..9f63a6441a838 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala @@ -173,7 +173,8 @@ class SingleFileEventLogFileReader( override def fileSizeForLastIndex: Long = status.getLen - override def completed: Boolean = !rootPath.getName.endsWith(EventLogFileWriter.IN_PROGRESS) + override def completed: Boolean = !rootPath.getName.stripSuffix(EventLogFileWriter.COMPACTED) + .endsWith(EventLogFileWriter.IN_PROGRESS) override def fileSizeForLastIndexForDFS: Option[Long] = { if (completed) { @@ -218,15 +219,23 @@ class RollingEventLogFilesFileReader( private lazy val eventLogFiles: Seq[FileStatus] = { val eventLogFiles = files.filter(isEventLogFile).sortBy { status => - getIndex(status.getPath.getName) + val filePath = status.getPath + var idx = getEventLogFileIndex(filePath.getName).toDouble + // trick to place compacted file later than normal file if index is same. + if (EventLogFileWriter.isCompacted(filePath)) { + idx += 0.1 + } + idx } - val indices = eventLogFiles.map { file => getIndex(file.getPath.getName) }.sorted + val filesToRead = dropBeforeLastCompactFile(eventLogFiles) + val indices = filesToRead.map { file => getEventLogFileIndex(file.getPath.getName) } require((indices.head to indices.last) == indices, "Found missing event log file, expected" + - s" indices: ${(indices.head to indices.last)}, actual: ${indices}") - eventLogFiles + s" indices: ${indices.head to indices.last}, actual: ${indices}") + filesToRead } - override def lastIndex: Option[Long] = Some(getIndex(lastEventLogFile.getPath.getName)) + override def lastIndex: Option[Long] = Some( + getEventLogFileIndex(lastEventLogFile.getPath.getName)) override def fileSizeForLastIndex: Long = lastEventLogFile.getLen @@ -261,4 +270,11 @@ class RollingEventLogFilesFileReader( override def totalSize: Long = eventLogFiles.map(_.getLen).sum private def lastEventLogFile: FileStatus = eventLogFiles.last + + private def dropBeforeLastCompactFile(eventLogFiles: Seq[FileStatus]): Seq[FileStatus] = { + val lastCompactedFileIdx = eventLogFiles.lastIndexWhere { fs => + EventLogFileWriter.isCompacted(fs.getPath) + } + eventLogFiles.drop(lastCompactedFileIdx) + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala index 3fa5ef94892aa..1d58d054b7825 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala @@ -113,9 +113,9 @@ abstract class EventLogFileWriter( } } - protected def writeJson(json: String, flushLogger: Boolean = false): Unit = { + protected def writeLine(line: String, flushLogger: Boolean = false): Unit = { // scalastyle:off println - writer.foreach(_.println(json)) + writer.foreach(_.println(line)) // scalastyle:on println if (flushLogger) { writer.foreach(_.flush()) @@ -164,6 +164,7 @@ abstract class EventLogFileWriter( object EventLogFileWriter { // Suffix applied to the names of files still being written by applications. val IN_PROGRESS = ".inprogress" + val COMPACTED = ".compact" val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) @@ -192,9 +193,11 @@ object EventLogFileWriter { def codecName(log: Path): Option[String] = { // Compression codec is encoded as an extension, e.g. app_123.lzf // Since we sanitize the app ID to not include periods, it is safe to split on it - val logName = log.getName.stripSuffix(IN_PROGRESS) + val logName = log.getName.stripSuffix(COMPACTED).stripSuffix(IN_PROGRESS) logName.split("\\.").tail.lastOption } + + def isCompacted(log: Path): Boolean = log.getName.endsWith(COMPACTED) } /** @@ -211,7 +214,7 @@ class SingleEventLogFileWriter( override val logPath: String = SingleEventLogFileWriter.getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) - private val inProgressPath = logPath + EventLogFileWriter.IN_PROGRESS + protected def inProgressPath = logPath + EventLogFileWriter.IN_PROGRESS override def start(): Unit = { requireLogBaseDirAsDirectory() @@ -222,7 +225,7 @@ class SingleEventLogFileWriter( } override def writeEvent(eventJson: String, flushLogger: Boolean = false): Unit = { - writeJson(eventJson, flushLogger) + writeLine(eventJson, flushLogger) } /** @@ -327,10 +330,11 @@ class RollingEventLogFilesWriter( } } - writeJson(eventJson, flushLogger) + writeLine(eventJson, flushLogger) } - private def rollEventLogFile(): Unit = { + /** exposed for testing only */ + private[history] def rollEventLogFile(): Unit = { closeWriter() index += 1 @@ -399,16 +403,20 @@ object RollingEventLogFilesWriter { status.isDirectory && status.getPath.getName.startsWith(EVENT_LOG_DIR_NAME_PREFIX) } + def isEventLogFile(fileName: String): Boolean = { + fileName.startsWith(EVENT_LOG_FILE_NAME_PREFIX) + } + def isEventLogFile(status: FileStatus): Boolean = { - status.isFile && status.getPath.getName.startsWith(EVENT_LOG_FILE_NAME_PREFIX) + status.isFile && isEventLogFile(status.getPath.getName) } def isAppStatusFile(status: FileStatus): Boolean = { status.isFile && status.getPath.getName.startsWith(APPSTATUS_FILE_NAME_PREFIX) } - def getIndex(eventLogFileName: String): Long = { - require(eventLogFileName.startsWith(EVENT_LOG_FILE_NAME_PREFIX), "Not an event log file!") + def getEventLogFileIndex(eventLogFileName: String): Long = { + require(isEventLogFile(eventLogFileName), "Not an event log file!") val index = eventLogFileName.stripPrefix(EVENT_LOG_FILE_NAME_PREFIX).split("_")(0) index.toLong } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 40b05cf96d1e3..110198815c255 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -195,6 +195,24 @@ package object config { "configured to be at least 10 MiB.") .createWithDefaultString("128m") + private[spark] val EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN = + ConfigBuilder("spark.eventLog.rolling.maxFilesToRetain") + // TODO: remove this when integrating compactor with FsHistoryProvider + .internal() + .doc("The maximum number of event log files which will be retained as non-compacted. " + + "By default, all event log files will be retained. Please set the configuration " + + s"and ${EVENT_LOG_ROLLING_MAX_FILE_SIZE.key} accordingly if you want to control " + + "the overall size of event log files.") + .intConf + .checkValue(_ > 0, "Max event log files to retain should be higher than 0.") + .createWithDefault(Integer.MAX_VALUE) + + private[spark] val EVENT_LOG_COMPACTION_SCORE_THRESHOLD = + ConfigBuilder("spark.eventLog.rolling.compaction.score.threshold") + .internal() + .doubleConf + .createWithDefault(0.7d) + private[spark] val EXECUTOR_ID = ConfigBuilder("spark.executor.id").stringConf.createOptional diff --git a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala new file mode 100644 index 0000000000000..86511ae08784a --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import org.apache.spark.{SparkFunSuite, Success, TaskResultLost, TaskState} +import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} +import org.apache.spark.scheduler._ +import org.apache.spark.status.ListenerEventsTestHelper + +class BasicEventFilterBuilderSuite extends SparkFunSuite { + import ListenerEventsTestHelper._ + + override protected def beforeEach(): Unit = { + ListenerEventsTestHelper.reset() + } + + test("track live jobs") { + var time = 0L + + val listener = new BasicEventFilterBuilder + listener.onOtherEvent(SparkListenerLogStart("TestSparkVersion")) + + // Start the application. + time += 1 + listener.onApplicationStart(SparkListenerApplicationStart( + "name", + Some("id"), + time, + "user", + Some("attempt"), + None)) + + // Start a couple of executors. + time += 1 + val execIds = Array("1", "2") + execIds.foreach { id => + listener.onExecutorAdded(createExecutorAddedEvent(id, time)) + } + + // Start a job with 2 stages / 4 tasks each + time += 1 + + val rddsForStage0 = createRdds(2) + val rddsForStage1 = createRdds(2) + + val stage0 = createStage(rddsForStage0, Nil) + val stage1 = createStage(rddsForStage1, Seq(stage0.stageId)) + val stages = Seq(stage0, stage1) + + val jobProps = createJobProps() + listener.onJobStart(SparkListenerJobStart(1, time, stages, jobProps)) + + // Submit stage 0 + time += 1 + stages.head.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stages.head, jobProps)) + + // Start tasks from stage 0 + time += 1 + + val s0Tasks = ListenerEventsTestHelper.createTasks(4, execIds, time) + s0Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, + stages.head.attemptNumber(), task)) + } + + // Fail one of the tasks, re-start it. + time += 1 + s0Tasks.head.markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber, + "taskType", TaskResultLost, s0Tasks.head, new ExecutorMetrics, null)) + + time += 1 + val reattempt = createTaskWithNewAttempt(s0Tasks.head, time) + listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptNumber, + reattempt)) + + // Succeed all tasks in stage 0. + val pending = s0Tasks.drop(1) ++ Seq(reattempt) + + time += 1 + pending.foreach { task => + task.markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber, + "taskType", Success, task, new ExecutorMetrics, TaskMetrics.empty)) + } + + // End stage 0. + time += 1 + stages.head.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(stages.head)) + + assert(listener.liveJobs === Set(1)) + assert(listener.liveStages === Set(0)) + // stage 1 not yet submitted - RDDs for stage 1 is not available + assert(listener.liveRDDs === rddsForStage0.map(_.id).toSet) + assert(listener.liveTasks === (s0Tasks ++ Seq(reattempt)).map(_.taskId).toSet) + + // Submit stage 1. + time += 1 + stages.last.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stages.last, jobProps)) + + // Start and fail all tasks of stage 1. + time += 1 + val s1Tasks = createTasks(4, execIds, time) + s1Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId, + stages.last.attemptNumber, + task)) + } + + time += 1 + s1Tasks.foreach { task => + task.markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stages.last.stageId, stages.last.attemptNumber, + "taskType", TaskResultLost, task, new ExecutorMetrics, null)) + } + + // Fail stage 1. + time += 1 + stages.last.completionTime = Some(time) + stages.last.failureReason = Some("uh oh") + listener.onStageCompleted(SparkListenerStageCompleted(stages.last)) + + // - Re-submit stage 1, all tasks, and succeed them and the stage. + val oldS1 = stages.last + val newS1 = new StageInfo(oldS1.stageId, oldS1.attemptNumber + 1, oldS1.name, oldS1.numTasks, + oldS1.rddInfos, oldS1.parentIds, oldS1.details, oldS1.taskMetrics) + + time += 1 + newS1.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(newS1, jobProps)) + + val newS1Tasks = createTasks(4, execIds, time) + + newS1Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(newS1.stageId, newS1.attemptNumber, task)) + } + + time += 1 + newS1Tasks.foreach { task => + task.markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(newS1.stageId, newS1.attemptNumber, "taskType", + Success, task, new ExecutorMetrics, null)) + } + + time += 1 + newS1.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(newS1)) + + assert(listener.liveJobs === Set(1)) + assert(listener.liveStages === Set(0, 1)) + // stage 0 and 1 are finished but it stores the information regarding stage + assert(listener.liveRDDs === (rddsForStage0.map(_.id) ++ rddsForStage1.map(_.id)).toSet) + assert(listener.liveTasks === + (s0Tasks ++ Seq(reattempt) ++ s1Tasks ++ newS1Tasks).map(_.taskId).toSet) + + // Start next job. + time += 1 + + val rddsForStage2 = createRdds(2) + val rddsForStage3 = createRdds(2) + + val stage3 = createStage(rddsForStage2, Nil) + val stage4 = createStage(rddsForStage3, Seq(stage3.stageId)) + val stagesForJob2 = Seq(stage3, stage4) + + listener.onJobStart(SparkListenerJobStart(2, time, stagesForJob2, jobProps)) + + // End job 1. + time += 1 + listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded)) + + // everything related to job 1 should be cleaned up, but not for job 2 + assert(listener.liveJobs === Set(2)) + assert(listener.liveStages.isEmpty) + // no RDD information available as these stages are not submitted yet + assert(listener.liveRDDs.isEmpty) + // stageToTasks has no information for job 2, as no task has been started + assert(listener.liveTasks.isEmpty) + } + + test("track live executors") { + var time = 0L + + val listener = new BasicEventFilterBuilder + listener.onOtherEvent(SparkListenerLogStart("TestSparkVersion")) + + // Start the application. + time += 1 + listener.onApplicationStart(SparkListenerApplicationStart( + "name", + Some("id"), + time, + "user", + Some("attempt"), + None)) + + // Start a couple of executors. + time += 1 + val execIds = (1 to 3).map(_.toString) + execIds.foreach { id => + listener.onExecutorAdded(createExecutorAddedEvent(id, time)) + } + + // End one of executors. + time += 1 + listener.onExecutorRemoved(createExecutorRemovedEvent(execIds.head, time)) + + assert(listener.liveExecutors === execIds.drop(1).toSet) + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala new file mode 100644 index 0000000000000..2da40dccba53e --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import org.apache.spark.{storage, SparkContext, SparkFunSuite, Success, TaskState} +import org.apache.spark.deploy.history.EventFilter.FilterStatistics +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.scheduler._ +import org.apache.spark.status.ListenerEventsTestHelper._ +import org.apache.spark.storage.{BlockManagerId, RDDBlockId, StorageLevel} + +class BasicEventFilterSuite extends SparkFunSuite { + import BasicEventFilterSuite._ + + test("filter out events for finished jobs") { + // assume finished job 1 with stage 1, tasks (1, 2), rdds (1, 2) + // live job 2 with stage 2 with tasks (3, 4) & rdds (3, 4), + // and stage 3 with tasks (5, 6) & rdds (5, 6) + val liveJobs = Set(2) + val liveStages = Set(2, 3) + val liveTasks = Set(3L, 4L, 5L, 6L) + val liveRDDs = Set(3, 4, 5, 6) + val liveExecutors: Set[String] = Set("1", "2") + val filterStats = FilterStatistics( + // counts finished job 1 + liveJobs.size + 1, + liveJobs.size, + // counts finished stage 1 for job 1 + liveStages.size + 1, + liveStages.size, + // counts finished tasks (1, 2) for job 1 + liveTasks.size + 2, + liveTasks.size) + + val filter = new BasicEventFilter(filterStats, liveJobs, liveStages, liveTasks, liveRDDs, + liveExecutors) + val acceptFn = filter.acceptFn().lift + + // Verifying with finished job 1 + val rddsForStage1 = createRddsWithId(1 to 2) + val stage1 = createStage(1, rddsForStage1, Nil) + val tasksForStage1 = createTasks(Seq(1L, 2L), liveExecutors.toArray, 0) + tasksForStage1.foreach { task => task.markFinished(TaskState.FINISHED, 5) } + + val jobStartEventForJob1 = SparkListenerJobStart(1, 0, Seq(stage1)) + val jobEndEventForJob1 = SparkListenerJobEnd(1, 0, JobSucceeded) + val stageSubmittedEventsForJob1 = SparkListenerStageSubmitted(stage1) + val stageCompletedEventsForJob1 = SparkListenerStageCompleted(stage1) + val unpersistRDDEventsForJob1 = (1 to 2).map(SparkListenerUnpersistRDD) + + // job events for finished job should be rejected + assert(Some(false) === acceptFn(jobStartEventForJob1)) + assert(Some(false) === acceptFn(jobEndEventForJob1)) + + // stage events for finished job should be rejected + // NOTE: it doesn't filter out stage events which are also related to the executor + assertFilterStageEvents( + acceptFn, + stageSubmittedEventsForJob1, + stageCompletedEventsForJob1, + unpersistRDDEventsForJob1, + SparkListenerSpeculativeTaskSubmitted(stage1.stageId, stageAttemptId = 1), + Some(false)) + + // task events for finished job should be rejected + assertFilterTaskEvents(acceptFn, tasksForStage1, stage1, Some(false)) + + // Verifying with live job 2 + val rddsForStage2 = createRddsWithId(3 to 4) + val stage2 = createStage(2, rddsForStage2, Nil) + val tasksForStage2 = createTasks(Seq(3L, 4L), liveExecutors.toArray, 0) + tasksForStage1.foreach { task => task.markFinished(TaskState.FINISHED, 5) } + + val jobStartEventForJob2 = SparkListenerJobStart(2, 0, Seq(stage2)) + val stageSubmittedEventsForJob2 = SparkListenerStageSubmitted(stage2) + val stageCompletedEventsForJob2 = SparkListenerStageCompleted(stage2) + val unpersistRDDEventsForJob2 = rddsForStage2.map { rdd => SparkListenerUnpersistRDD(rdd.id) } + + // job events for live job should be accepted + assert(Some(true) === acceptFn(jobStartEventForJob2)) + + // stage events for live job should be accepted + assertFilterStageEvents( + acceptFn, + stageSubmittedEventsForJob2, + stageCompletedEventsForJob2, + unpersistRDDEventsForJob2, + SparkListenerSpeculativeTaskSubmitted(stage2.stageId, stageAttemptId = 1), + Some(true)) + + // task events for live job should be accepted + assertFilterTaskEvents(acceptFn, tasksForStage2, stage2, Some(true)) + } + + test("accept all events for block manager addition/removal on driver") { + val filter = new BasicEventFilter(EMPTY_STATS, Set.empty, Set.empty, Set.empty, Set.empty, + Set.empty) + val acceptFn = filter.acceptFn().lift + + val bmId = BlockManagerId(SparkContext.DRIVER_IDENTIFIER, "host1", 1) + assert(Some(true) === acceptFn(SparkListenerBlockManagerAdded(0, bmId, 1))) + assert(Some(true) === acceptFn(SparkListenerBlockManagerRemoved(1, bmId))) + assert(Some(true) === acceptFn(SparkListenerBlockUpdated( + storage.BlockUpdatedInfo(bmId, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0, 10)))) + } + + test("filter out events for dead executors") { + // assume executor 1 was dead, and live executor 2 is available + val liveExecutors: Set[String] = Set("2") + + val filter = new BasicEventFilter(EMPTY_STATS, Set.empty, Set.empty, Set.empty, Set.empty, + liveExecutors) + val acceptFn = filter.acceptFn().lift + + // events for dead executor should be rejected + assert(Some(false) === acceptFn(createExecutorAddedEvent(1))) + // though the name of event is stage executor metrics, AppStatusListener only deals with + // live executors + assert(Some(false) === acceptFn( + SparkListenerStageExecutorMetrics(1.toString, 0, 0, new ExecutorMetrics))) + assert(Some(false) === acceptFn(SparkListenerExecutorBlacklisted(0, 1.toString, 1))) + assert(Some(false) === acceptFn(SparkListenerExecutorUnblacklisted(0, 1.toString))) + assert(Some(false) === acceptFn(createExecutorRemovedEvent(1))) + val bmId = BlockManagerId(1.toString, "host1", 1) + assert(Some(false) === acceptFn(SparkListenerBlockManagerAdded(0, bmId, 1))) + assert(Some(false) === acceptFn(SparkListenerBlockManagerRemoved(1, bmId))) + assert(Some(false) === acceptFn(SparkListenerBlockUpdated( + storage.BlockUpdatedInfo(bmId, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0, 10)))) + + // events for live executor should be accepted + assert(Some(true) === acceptFn(createExecutorAddedEvent(2))) + assert(Some(true) === acceptFn( + SparkListenerStageExecutorMetrics(2.toString, 0, 0, new ExecutorMetrics))) + assert(Some(true) === acceptFn(SparkListenerExecutorBlacklisted(0, 2.toString, 1))) + assert(Some(true) === acceptFn(SparkListenerExecutorUnblacklisted(0, 2.toString))) + assert(Some(true) === acceptFn(createExecutorRemovedEvent(2))) + val bmId2 = BlockManagerId(2.toString, "host1", 1) + assert(Some(true) === acceptFn(SparkListenerBlockManagerAdded(0, bmId2, 1))) + assert(Some(true) === acceptFn(SparkListenerBlockManagerRemoved(1, bmId2))) + assert(Some(true) === acceptFn(SparkListenerBlockUpdated( + storage.BlockUpdatedInfo(bmId2, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0, 10)))) + } + + test("other events should be left to other filters") { + val filter = new BasicEventFilter(EMPTY_STATS, Set.empty, Set.empty, Set.empty, Set.empty, + Set.empty) + val acceptFn = filter.acceptFn().lift + + assert(None === acceptFn(SparkListenerEnvironmentUpdate(Map.empty))) + assert(None === acceptFn(SparkListenerApplicationStart("1", Some("1"), 0, "user", None))) + assert(None === acceptFn(SparkListenerApplicationEnd(1))) + assert(None === acceptFn(SparkListenerNodeBlacklisted(0, "host1", 1))) + assert(None === acceptFn(SparkListenerNodeUnblacklisted(0, "host1"))) + assert(None === acceptFn(SparkListenerLogStart("testVersion"))) + } + + private def assertFilterStageEvents( + acceptFn: SparkListenerEvent => Option[Boolean], + stageSubmitted: SparkListenerStageSubmitted, + stageCompleted: SparkListenerStageCompleted, + unpersistRDDs: Seq[SparkListenerUnpersistRDD], + taskSpeculativeSubmitted: SparkListenerSpeculativeTaskSubmitted, + expectedVal: Option[Boolean]): Unit = { + assert(acceptFn(stageSubmitted) === expectedVal) + assert(acceptFn(stageCompleted) === expectedVal) + unpersistRDDs.foreach { event => + assert(acceptFn(event) === expectedVal) + } + assert(acceptFn(taskSpeculativeSubmitted) === expectedVal) + } + + private def assertFilterTaskEvents( + acceptFn: SparkListenerEvent => Option[Boolean], + taskInfos: Seq[TaskInfo], + stageInfo: StageInfo, + expectedVal: Option[Boolean]): Unit = { + taskInfos.foreach { task => + val taskStartEvent = SparkListenerTaskStart(stageInfo.stageId, 0, task) + assert(acceptFn(taskStartEvent) === expectedVal) + + val taskGettingResultEvent = SparkListenerTaskGettingResult(task) + assert(acceptFn(taskGettingResultEvent) === expectedVal) + + val taskEndEvent = SparkListenerTaskEnd(stageInfo.stageId, 0, "taskType", + Success, task, new ExecutorMetrics, null) + assert(acceptFn(taskEndEvent) === expectedVal) + } + } +} + +object BasicEventFilterSuite { + val EMPTY_STATS = FilterStatistics(0, 0, 0, 0, 0, 0) +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala new file mode 100644 index 0000000000000..866e610aab980 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import scala.collection.mutable +import scala.io.{Codec, Source} + +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.json4s.jackson.JsonMethods.parse + +import org.apache.spark.{SparkConf, SparkFunSuite, Success} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper.writeEventsToRollingWriter +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.internal.config.{EVENT_LOG_COMPACTION_SCORE_THRESHOLD, EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN} +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.status.ListenerEventsTestHelper._ +import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util.{JsonProtocol, Utils} + +class EventLogFileCompactorSuite extends SparkFunSuite { + private val sparkConf = testSparkConf() + private val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf) + + test("No event log files") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + + assertNoCompaction(fs, Seq.empty, compactor.compact(Seq.empty), + CompactionResultCode.NOT_ENOUGH_FILES) + } + } + + test("No compact file, less origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 2).map(_ => testEvent): _*) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertNoCompaction(fs, fileStatuses, compactor.compact(fileStatuses), + CompactionResultCode.NOT_ENOUGH_FILES) + } + } + + test("No compact file, more origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 5).map(_ => testEvent): _*) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertCompaction(fs, fileStatuses, compactor.compact(fileStatuses), + expectedNumOfFilesCompacted = 2) + } + } + + test("compact file exists, less origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 2).map(_ => testEvent): _*) + + val fileToCompact = fileStatuses.head.getPath + val compactedPath = new Path(fileToCompact.getParent, + fileToCompact.getName + EventLogFileWriter.COMPACTED) + assert(fs.rename(fileToCompact, compactedPath)) + + val newFileStatuses = Seq(fs.getFileStatus(compactedPath)) ++ fileStatuses.drop(1) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertNoCompaction(fs, newFileStatuses, compactor.compact(newFileStatuses), + CompactionResultCode.NOT_ENOUGH_FILES) + } + } + + test("compact file exists, number of origin files are same as max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 4).map(_ => testEvent): _*) + + val fileToCompact = fileStatuses.head.getPath + val compactedPath = new Path(fileToCompact.getParent, + fileToCompact.getName + EventLogFileWriter.COMPACTED) + assert(fs.rename(fileToCompact, compactedPath)) + + val newFileStatuses = Seq(fs.getFileStatus(compactedPath)) ++ fileStatuses.drop(1) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertNoCompaction(fs, newFileStatuses, compactor.compact(newFileStatuses), + CompactionResultCode.NOT_ENOUGH_FILES) + } + } + + test("compact file exists, more origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 10).map(_ => testEvent): _*) + + val fileToCompact = fileStatuses.head.getPath + val compactedPath = new Path(fileToCompact.getParent, + fileToCompact.getName + EventLogFileWriter.COMPACTED) + assert(fs.rename(fileToCompact, compactedPath)) + + val newFileStatuses = Seq(fs.getFileStatus(compactedPath)) ++ fileStatuses.drop(1) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertCompaction(fs, newFileStatuses, compactor.compact(newFileStatuses), + expectedNumOfFilesCompacted = 7) + } + } + + test("events for finished job are dropped in new compact file") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + // 1, 2 will be compacted into one file, 3~5 are dummies to ensure max files to retain + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + Seq( + SparkListenerExecutorAdded(0, "exec1", new ExecutorInfo("host1", 1, Map.empty)), + SparkListenerJobStart(1, 0, Seq.empty)), + Seq( + SparkListenerJobEnd(1, 1, JobSucceeded), + SparkListenerExecutorAdded(2, "exec2", new ExecutorInfo("host2", 1, Map.empty))), + testEvent, + testEvent, + testEvent) + + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertCompaction(fs, fileStatuses, compactor.compact(fileStatuses), + expectedNumOfFilesCompacted = 2) + + val expectCompactFileBasePath = fileStatuses.take(2).last.getPath + val compactFilePath = getCompactFilePath(expectCompactFileBasePath) + Utils.tryWithResource(EventLogFileReader.openEventLog(compactFilePath, fs)) { is => + val lines = Source.fromInputStream(is)(Codec.UTF8).getLines().toList + assert(lines.length === 2, "Compacted file should have only two events being accepted") + lines.foreach { line => + val event = JsonProtocol.sparkEventFromJson(parse(line)) + assert(!event.isInstanceOf[SparkListenerJobStart] && + !event.isInstanceOf[SparkListenerJobEnd]) + } + } + } + } + + test("Don't compact file if score is lower than threshold") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + val newConf = sparkConf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.7d) + + // job 1 having 4 tasks + val rddsForStage1 = createRddsWithId(1 to 2) + val stage1 = createStage(1, rddsForStage1, Nil) + val tasks = createTasks(4, Array("exec1"), 0L).map(createTaskStartEvent(_, 1, 0)) + + // job 2 having 4 tasks + val rddsForStage2 = createRddsWithId(3 to 4) + val stage2 = createStage(2, rddsForStage2, Nil) + val tasks2 = createTasks(4, Array("exec1"), 0L).map(createTaskStartEvent(_, 2, 0)) + + // here job 1 is finished and job 2 is still live, hence half of total tasks are considered + // as live + val fileStatuses = writeEventsToRollingWriter(fs, "app", dir, newConf, hadoopConf, + Seq(SparkListenerJobStart(1, 0, Seq(stage1)), SparkListenerStageSubmitted(stage1)), + tasks, + Seq(SparkListenerJobStart(2, 0, Seq(stage2)), SparkListenerStageSubmitted(stage2)), + tasks2, + Seq(SparkListenerJobEnd(1, 0, JobSucceeded)), + testEvent, + testEvent, + testEvent) + + val compactor = new EventLogFileCompactor(newConf, hadoopConf, fs) + assertNoCompaction(fs, fileStatuses, compactor.compact(fileStatuses), + CompactionResultCode.LOW_SCORE_FOR_COMPACTION) + } + } + + test("rewrite files with test filters") { + class TestEventFilter1 extends EventFilter { + override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = { + case _: SparkListenerApplicationEnd => true + case _: SparkListenerBlockManagerAdded => true + case _: SparkListenerApplicationStart => false + } + + override def statistics(): Option[EventFilter.FilterStatistics] = None + } + + class TestEventFilter2 extends EventFilter { + override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = { + case _: SparkListenerApplicationEnd => true + case _: SparkListenerEnvironmentUpdate => true + case _: SparkListenerNodeBlacklisted => true + case _: SparkListenerBlockManagerAdded => false + case _: SparkListenerApplicationStart => false + case _: SparkListenerNodeUnblacklisted => false + } + + override def statistics(): Option[EventFilter.FilterStatistics] = None + } + + def writeEventToWriter(writer: EventLogFileWriter, event: SparkListenerEvent): String = { + val line = EventLogTestHelper.convertEvent(event) + writer.writeEvent(line, flushLogger = true) + line + } + + withTempDir { tempDir => + val sparkConf = new SparkConf + val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf) + val fs = new Path(tempDir.getAbsolutePath).getFileSystem(hadoopConf) + + val writer = new SingleEventLogFileWriter("app", None, tempDir.toURI, sparkConf, hadoopConf) + writer.start() + + val expectedLines = new mutable.ArrayBuffer[String] + + // filterApplicationEnd: Some(true) & Some(true) => filter in + expectedLines += writeEventToWriter(writer, SparkListenerApplicationEnd(0)) + + // filterBlockManagerAdded: Some(true) & Some(false) => filter in + expectedLines += writeEventToWriter(writer, SparkListenerBlockManagerAdded( + 0, BlockManagerId("1", "host1", 1), 10)) + + // filterApplicationStart: Some(false) & Some(false) => filter out + writeEventToWriter(writer, SparkListenerApplicationStart("app", None, 0, "user", None)) + + // filterNodeBlacklisted: None & Some(true) => filter in + expectedLines += writeEventToWriter(writer, SparkListenerNodeBlacklisted(0, "host1", 1)) + + // filterNodeUnblacklisted: None & Some(false) => filter out + writeEventToWriter(writer, SparkListenerNodeUnblacklisted(0, "host1")) + + // other events: None & None => filter in + expectedLines += writeEventToWriter(writer, SparkListenerUnpersistRDD(0)) + + writer.stop() + + val filters = Seq(new TestEventFilter1, new TestEventFilter2) + + val logPath = new Path(writer.logPath) + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + val newPath = compactor.rewrite(filters, Seq(fs.getFileStatus(logPath))) + assert(new Path(newPath).getName === logPath.getName + EventLogFileWriter.COMPACTED) + + Utils.tryWithResource(EventLogFileReader.openEventLog(new Path(newPath), fs)) { is => + val lines = Source.fromInputStream(is)(Codec.UTF8).getLines() + var linesLength = 0 + lines.foreach { line => + linesLength += 1 + assert(expectedLines.contains(line)) + } + assert(linesLength === expectedLines.length) + } + } + } + + private def assertCompaction( + fs: FileSystem, + originalFiles: Seq[FileStatus], + compactRet: CompactionResult, + expectedNumOfFilesCompacted: Int): Unit = { + assert(CompactionResultCode.SUCCESS === compactRet.code) + + val expectRetainedFiles = originalFiles.drop(expectedNumOfFilesCompacted) + expectRetainedFiles.foreach { status => assert(fs.exists(status.getPath)) } + + val expectRemovedFiles = originalFiles.take(expectedNumOfFilesCompacted) + expectRemovedFiles.foreach { status => assert(!fs.exists(status.getPath)) } + + val expectCompactFileBasePath = originalFiles.take(expectedNumOfFilesCompacted).last.getPath + val expectCompactFileIndex = RollingEventLogFilesWriter.getEventLogFileIndex( + expectCompactFileBasePath.getName) + assert(Some(expectCompactFileIndex) === compactRet.compactIndex) + + val expectCompactFilePath = getCompactFilePath(expectCompactFileBasePath) + assert(fs.exists(expectCompactFilePath)) + } + + private def getCompactFilePath(expectCompactFileBasePath: Path): Path = { + new Path(expectCompactFileBasePath.getParent, + expectCompactFileBasePath.getName + EventLogFileWriter.COMPACTED) + } + + private def assertNoCompaction( + fs: FileSystem, + originalFiles: Seq[FileStatus], + compactRet: CompactionResult, + expectedCompactRet: CompactionResultCode.Value): Unit = { + assert(expectedCompactRet === compactRet.code) + assert(None === compactRet.compactIndex) + originalFiles.foreach { status => assert(fs.exists(status.getPath)) } + } + + private def testEvent: Seq[SparkListenerEvent] = + Seq(SparkListenerApplicationStart("app", Some("app"), 0, "user", None)) + + private def testSparkConf(): SparkConf = { + new SparkConf() + .set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 3) + // to simplify the tests, we set the score threshold as 0.0d + // individual test can override the value to verify the functionality + .set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d) + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala index a2ce4acdaaf37..8eab2da1a37b7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala @@ -288,13 +288,15 @@ class RollingEventLogFilesReaderSuite extends EventLogFileReadersSuite { assert(status.isDirectory) val statusInDir = fileSystem.listStatus(logPath) - val eventFiles = statusInDir.filter(isEventLogFile).sortBy { s => getIndex(s.getPath.getName) } + val eventFiles = statusInDir.filter(isEventLogFile).sortBy { s => + getEventLogFileIndex(s.getPath.getName) + } assert(eventFiles.nonEmpty) val lastEventFile = eventFiles.last val allLen = eventFiles.map(_.getLen).sum assert(reader.rootPath === fileSystem.makeQualified(logPath)) - assert(reader.lastIndex === Some(getIndex(lastEventFile.getPath.getName))) + assert(reader.lastIndex === Some(getEventLogFileIndex(lastEventFile.getPath.getName))) assert(reader.fileSizeForLastIndex === lastEventFile.getLen) assert(reader.completed === isCompleted) assert(reader.modificationTime === lastEventFile.getModificationTime) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala index c4b40884eebf5..060b878fb8ef2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala @@ -291,7 +291,7 @@ class RollingEventLogFilesWriterSuite extends EventLogFileWritersSuite { expectedMaxSizeBytes: Long): Unit = { assert(eventLogFiles.forall(f => f.getLen <= expectedMaxSizeBytes)) assert((1 to expectedLastIndex) === - eventLogFiles.map(f => getIndex(f.getPath.getName))) + eventLogFiles.map(f => getEventLogFileIndex(f.getPath.getName))) } val appId = getUniqueApplicationId @@ -373,6 +373,6 @@ class RollingEventLogFilesWriterSuite extends EventLogFileWritersSuite { private def listEventLogFiles(logDirPath: Path): Seq[FileStatus] = { fileSystem.listStatus(logDirPath).filter(isEventLogFile) - .sortBy { fs => getIndex(fs.getPath.getName) } + .sortBy { fs => getEventLogFileIndex(fs.getPath.getName) } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala index 55eddce3968c2..298fd65f293cb 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala @@ -17,12 +17,17 @@ package org.apache.spark.deploy.history +import java.io.File import java.nio.charset.StandardCharsets -import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.json4s.jackson.JsonMethods.{compact, render} import org.apache.spark.SparkConf import org.apache.spark.internal.config._ +import org.apache.spark.scheduler._ +import org.apache.spark.util.JsonProtocol object EventLogTestHelper { def getUniqueApplicationId: String = "test-" + System.currentTimeMillis @@ -56,4 +61,52 @@ object EventLogTestHelper { eventStr } } + + def writeEventLogFile( + sparkConf: SparkConf, + hadoopConf: Configuration, + dir: File, + idx: Int, + events: Seq[SparkListenerEvent]): String = { + // to simplify the code, we don't concern about file name being matched with the naming rule + // of event log file + val writer = new SingleEventLogFileWriter(s"app$idx", None, dir.toURI, sparkConf, hadoopConf) + writer.start() + events.foreach { event => writer.writeEvent(convertEvent(event), flushLogger = true) } + writer.stop() + writer.logPath + } + + def writeEventsToRollingWriter( + fs: FileSystem, + appId: String, + dir: File, + sparkConf: SparkConf, + hadoopConf: Configuration, + eventsFiles: Seq[SparkListenerEvent]*): Seq[FileStatus] = { + val writer = new RollingEventLogFilesWriter(appId, None, dir.toURI, sparkConf, hadoopConf) + writer.start() + + eventsFiles.dropRight(1).foreach { events => + writeEventsToRollingWriter(writer, events, rollFile = true) + } + eventsFiles.lastOption.foreach { events => + writeEventsToRollingWriter(writer, events, rollFile = false) + } + + writer.stop() + EventLogFileReader(fs, new Path(writer.logPath)).get.listEventLogFiles + } + + def writeEventsToRollingWriter( + writer: RollingEventLogFilesWriter, + events: Seq[SparkListenerEvent], + rollFile: Boolean): Unit = { + events.foreach { event => writer.writeEvent(convertEvent(event), flushLogger = true) } + if (rollFile) writer.rollEventLogFile() + } + + def convertEvent(event: SparkListenerEvent): String = { + compact(render(JsonProtocol.sparkEventToJson(event))) + } } diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index a289dddbdc9e6..e7eed7bf4c879 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -32,12 +32,12 @@ import org.apache.spark.internal.config.Status._ import org.apache.spark.metrics.ExecutorMetricType import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster._ +import org.apache.spark.status.ListenerEventsTestHelper._ import org.apache.spark.status.api.v1 import org.apache.spark.storage._ import org.apache.spark.util.Utils class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { - private val conf = new SparkConf() .set(LIVE_ENTITY_UPDATE_PERIOD, 0L) .set(ASYNC_TRACKING_ENABLED, false) @@ -1694,40 +1694,4 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { def blockId: BlockId = RDDBlockId(rddId, partId) } - - /** Create a stage submitted event for the specified stage Id. */ - private def createStageSubmittedEvent(stageId: Int) = { - SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0, - Seq.empty, Seq.empty, "details")) - } - - /** Create a stage completed event for the specified stage Id. */ - private def createStageCompletedEvent(stageId: Int) = { - SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0, - Seq.empty, Seq.empty, "details")) - } - - /** Create an executor added event for the specified executor Id. */ - private def createExecutorAddedEvent(executorId: Int) = { - SparkListenerExecutorAdded(0L, executorId.toString, - new ExecutorInfo("host1", 1, Map.empty, Map.empty)) - } - - /** Create an executor added event for the specified executor Id. */ - private def createExecutorRemovedEvent(executorId: Int) = { - SparkListenerExecutorRemoved(10L, executorId.toString, "test") - } - - /** Create an executor metrics update event, with the specified executor metrics values. */ - private def createExecutorMetricsUpdateEvent( - stageId: Int, - executorId: Int, - executorMetrics: Array[Long]): SparkListenerExecutorMetricsUpdate = { - val taskMetrics = TaskMetrics.empty - taskMetrics.incDiskBytesSpilled(111) - taskMetrics.incMemoryBytesSpilled(222) - val accum = Array((333L, 1, 1, taskMetrics.accumulators().map(AccumulatorSuite.makeInfo))) - val executorUpdates = Map((stageId, 0) -> new ExecutorMetrics(executorMetrics)) - SparkListenerExecutorMetricsUpdate(executorId.toString, accum, executorUpdates) - } } diff --git a/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala b/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala new file mode 100644 index 0000000000000..585c8cc2ae6d4 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.Properties + +import scala.collection.immutable.Map + +import org.apache.spark.{AccumulatorSuite, SparkContext, Success, TaskState} +import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} +import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded, SparkListenerExecutorMetricsUpdate, SparkListenerExecutorRemoved, SparkListenerJobStart, SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerTaskEnd, SparkListenerTaskStart, StageInfo, TaskInfo, TaskLocality} +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.storage.{RDDInfo, StorageLevel} + +object ListenerEventsTestHelper { + + private var taskIdTracker = -1L + private var rddIdTracker = -1 + private var stageIdTracker = -1 + + def reset(): Unit = { + taskIdTracker = -1L + rddIdTracker = -1 + stageIdTracker = -1 + } + + def createJobProps(): Properties = { + val jobProps = new Properties() + jobProps.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, "jobDescription") + jobProps.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "jobGroup") + jobProps.setProperty(SparkContext.SPARK_SCHEDULER_POOL, "schedPool") + jobProps + } + + def createRddsWithId(ids: Seq[Int]): Seq[RDDInfo] = { + ids.map { rddId => + new RDDInfo(rddId, s"rdd${rddId}", 2, StorageLevel.NONE, false, Nil) + } + } + + def createRdds(count: Int): Seq[RDDInfo] = { + (1 to count).map { _ => + val rddId = nextRddId() + new RDDInfo(rddId, s"rdd${rddId}", 2, StorageLevel.NONE, false, Nil) + } + } + + def createStage(id: Int, rdds: Seq[RDDInfo], parentIds: Seq[Int]): StageInfo = { + new StageInfo(id, 0, s"stage${id}", 4, rdds, parentIds, s"details${id}") + } + + def createStage(rdds: Seq[RDDInfo], parentIds: Seq[Int]): StageInfo = { + createStage(nextStageId(), rdds, parentIds) + } + + def createTasks(ids: Seq[Long], execs: Array[String], time: Long): Seq[TaskInfo] = { + ids.zipWithIndex.map { case (id, idx) => + val exec = execs(idx % execs.length) + new TaskInfo(id, idx, 1, time, exec, s"$exec.example.com", + TaskLocality.PROCESS_LOCAL, idx % 2 == 0) + } + } + + def createTasks(count: Int, execs: Array[String], time: Long): Seq[TaskInfo] = { + createTasks((1 to count).map { _ => nextTaskId() }, execs, time) + } + + def createTaskWithNewAttempt(orig: TaskInfo, time: Long): TaskInfo = { + // Task reattempts have a different ID, but the same index as the original. + new TaskInfo(nextTaskId(), orig.index, orig.attemptNumber + 1, time, orig.executorId, + s"${orig.executorId}.example.com", TaskLocality.PROCESS_LOCAL, orig.speculative) + } + + def createTaskStartEvent( + taskInfo: TaskInfo, + stageId: Int, + attemptId: Int): SparkListenerTaskStart = { + SparkListenerTaskStart(stageId, attemptId, taskInfo) + } + + /** Create a stage submitted event for the specified stage Id. */ + def createStageSubmittedEvent(stageId: Int): SparkListenerStageSubmitted = { + SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + } + + /** Create a stage completed event for the specified stage Id. */ + def createStageCompletedEvent(stageId: Int): SparkListenerStageCompleted = { + SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + } + + def createExecutorAddedEvent(executorId: Int): SparkListenerExecutorAdded = { + createExecutorAddedEvent(executorId.toString, 0) + } + + /** Create an executor added event for the specified executor Id. */ + def createExecutorAddedEvent(executorId: String, time: Long): SparkListenerExecutorAdded = { + SparkListenerExecutorAdded(time, executorId, + new ExecutorInfo("host1", 1, Map.empty, Map.empty)) + } + + def createExecutorRemovedEvent(executorId: Int): SparkListenerExecutorRemoved = { + createExecutorRemovedEvent(executorId.toString, 10L) + } + + /** Create an executor added event for the specified executor Id. */ + def createExecutorRemovedEvent(executorId: String, time: Long): SparkListenerExecutorRemoved = { + SparkListenerExecutorRemoved(time, executorId, "test") + } + + /** Create an executor metrics update event, with the specified executor metrics values. */ + def createExecutorMetricsUpdateEvent( + stageId: Int, + executorId: Int, + executorMetrics: Array[Long]): SparkListenerExecutorMetricsUpdate = { + val taskMetrics = TaskMetrics.empty + taskMetrics.incDiskBytesSpilled(111) + taskMetrics.incMemoryBytesSpilled(222) + val accum = Array((333L, 1, 1, taskMetrics.accumulators().map(AccumulatorSuite.makeInfo))) + val executorUpdates = Map((stageId, 0) -> new ExecutorMetrics(executorMetrics)) + SparkListenerExecutorMetricsUpdate(executorId.toString, accum, executorUpdates) + } + + private def nextTaskId(): Long = { + taskIdTracker += 1 + taskIdTracker + } + + private def nextRddId(): Int = { + rddIdTracker += 1 + rddIdTracker + } + + private def nextStageId(): Int = { + stageIdTracker += 1 + stageIdTracker + } +}