Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,8 @@ private[spark] class SerializerManager(
outputStream: OutputStream,
values: Iterator[T]): Unit = {
val byteStream = new BufferedOutputStream(outputStream)
val autoPick = !blockId.isInstanceOf[StreamBlockId]
val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
blockSerializationStream[T](blockId, byteStream)(implicitly[ClassTag[T]])
.writeAll(values).close()
}

/** Serializes into a chunked byte buffer. */
Expand Down Expand Up @@ -212,4 +211,14 @@ private[spark] class SerializerManager(
.deserializeStream(wrapForCompression(blockId, stream))
.asIterator.asInstanceOf[Iterator[T]]
}

/** Generate a `SerializationStream` for a block. */
private[spark] def blockSerializationStream[T](
blockId: BlockId,
outputStream: OutputStream)
(classTag: ClassTag[T]): SerializationStream = {
val autoPick = !blockId.isInstanceOf[StreamBlockId]
val ser = getSerializer(classTag, autoPick).newInstance()
ser.serializeStream(wrapForCompression(blockId, outputStream))
}
}
40 changes: 40 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.SparkException
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.network.shuffle.RemoteBlockPushResolver
import org.apache.spark.storage.LogBlockType.LogBlockType

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -175,6 +176,42 @@ case class PythonStreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
override def name: String = "python-stream-" + streamId + "-" + uniqueId
}

object LogBlockType extends Enumeration {
type LogBlockType = Value
val TEST = Value
}

/**
* Identifies a block of log data.
*
* @param lastLogTime the timestamp of the last log entry in this block, used for filtering
* and log management.
* @param executorId the ID of the executor that produced this log block.
*/
abstract sealed class LogBlockId(
val lastLogTime: Long,
val executorId: String) extends BlockId {
def logBlockType: LogBlockType
}

object LogBlockId {
def empty(logBlockType: LogBlockType): LogBlockId = {
logBlockType match {
case LogBlockType.TEST => TestLogBlockId(0L, "")
case _ => throw new SparkException(s"Unsupported log block type: $logBlockType")
}
}
}

// Used for test purpose only.
case class TestLogBlockId(override val lastLogTime: Long, override val executorId: String)
extends LogBlockId(lastLogTime, executorId) {
override def name: String =
"test_log_" + lastLogTime + "_" + executorId

override def logBlockType: LogBlockType = LogBlockType.TEST
}

/** Id associated with temporary local data managed as blocks. Not serializable. */
private[spark] case class TempLocalBlockId(id: UUID) extends BlockId {
override def name: String = "temp_local_" + id
Expand Down Expand Up @@ -222,6 +259,7 @@ object BlockId {
val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
val TEST = "test_(.*)".r
val TEST_LOG_BLOCK = "test_log_([0-9]+)_(.*)".r

def apply(name: String): BlockId = name match {
case RDD(rddId, splitIndex) =>
Expand Down Expand Up @@ -262,6 +300,8 @@ object BlockId {
TempLocalBlockId(UUID.fromString(uuid))
case TEMP_SHUFFLE(uuid) =>
TempShuffleBlockId(UUID.fromString(uuid))
case TEST_LOG_BLOCK(lastLogTime, executorId) =>
TestLogBlockId(lastLogTime.toLong, executorId)
case TEST(value) =>
TestBlockId(value)
case _ => throw SparkCoreErrors.unrecognizedBlockIdError(name)
Expand Down
32 changes: 29 additions & 3 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, MigratableResolver, ShuffleManager, ShuffleWriteMetricsReporter}
import org.apache.spark.storage.BlockManagerMessages.{DecommissionBlockManager, ReplicateBlock}
import org.apache.spark.storage.LogBlockType.LogBlockType
import org.apache.spark.storage.memory._
import org.apache.spark.unsafe.Platform
import org.apache.spark.util._
Expand Down Expand Up @@ -294,12 +295,17 @@ private[spark] class BlockManager(
decommissioner.isDefined
}

@inline final private def checkShouldStore(blockId: BlockId) = {
@inline final private def checkShouldStore(blockId: BlockId, level: StorageLevel) = {
// Don't reject broadcast blocks since they may be stored during task exec and
// don't need to be migrated.
if (isDecommissioning() && !blockId.isBroadcast) {
throw SparkCoreErrors.cannotSaveBlockOnDecommissionedExecutorError(blockId)
}
if (blockId.isInstanceOf[LogBlockId] && level != StorageLevel.DISK_ONLY) {
throw SparkException.internalError(
s"Cannot store log block $blockId with storage level $level. " +
"Log blocks must be stored with DISK_ONLY.")
}
}

// This is a lazy val so someone can migrating RDDs even if they don't have a MigratableResolver
Expand Down Expand Up @@ -763,7 +769,7 @@ private[spark] class BlockManager(
level: StorageLevel,
classTag: ClassTag[_]): StreamCallbackWithID = {

checkShouldStore(blockId)
checkShouldStore(blockId, level)

if (blockId.isShuffle) {
logDebug(s"Putting shuffle block ${blockId}")
Expand Down Expand Up @@ -1483,6 +1489,26 @@ private[spark] class BlockManager(
syncWrites, writeMetrics, blockId)
}

/**
* To get a log block writer that can write logs directly to a disk block. Either `save` or
* `close` should be called to finish the writing and release opened resources.
* `save` would write the block to the block manager, while `close` would just close the writer.
*/
def getLogBlockWriter(
logBlockType: LogBlockType): LogBlockWriter = {
new LogBlockWriter(this, logBlockType, conf)
}

/**
* To get a rolling log writer that can write logs to block manager and split the logs
* to multiple blocks if the log size exceeds the threshold.
*/
def getRollingLogWriter(
blockIdGenerator: LogBlockIdGenerator,
rollingSize: Long = 33554432L): RollingLogWriter = {
new RollingLogWriter(this, blockIdGenerator, rollingSize)
}

/**
* Put a new block of serialized bytes to the block manager.
*
Expand Down Expand Up @@ -1540,7 +1566,7 @@ private[spark] class BlockManager(

require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
checkShouldStore(blockId)
checkShouldStore(blockId, level)

val putBlockInfo = {
val newInfo = new BlockInfo(level, classTag, tellMaster)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.storage

import org.apache.spark.SparkException
import org.apache.spark.storage.LogBlockType.LogBlockType

/**
* LogBlockIdGenerator is responsible for generating unique LogBlockIds for log blocks.
*/
trait LogBlockIdGenerator {
// The log block type that this generator supports.
def logBlockType: LogBlockType

// Generates a unique LogBlockId based on the last log time and executor ID.
protected def genUniqueBlockId(lastLogTime: Long, executorId: String): LogBlockId

/**
* Generates a new LogBlockId based on the last log time and executor ID. Make sure that
* the generated LogBlockId has the same log block type as this generator.
*
* @param lastLogTime The timestamp of the last log entry.
* @param executorId The ID of the executor generating the log block.
*/
final def nextBlockId(lastLogTime: Long, executorId: String): LogBlockId = {
val blockId = genUniqueBlockId(lastLogTime, executorId)
if (blockId.logBlockType != this.logBlockType) {
throw SparkException.internalError(
"BlockId generated by LogBlockIdGenerator does not match " +
s"the expected log block type: ${blockId.logBlockType} != ${this.logBlockType}")
}
blockId
}
}
182 changes: 182 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/LogBlockWriter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.storage

import java.io.BufferedOutputStream
import java.io.File
import java.io.FileOutputStream

import org.apache.commons.io.output.CountingOutputStream

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.serializer.SerializationStream
import org.apache.spark.storage.LogBlockType.LogBlockType
import org.apache.spark.util.Utils

/**
* A class for writing logs directly to a file on disk and save as a block in BlockManager if
* there are any logs written.
* `save` or `close` must be called to ensure resources are released properly. `save` will add
* the log block to BlockManager, while `close` will just release the resources without saving
* the log block.
*
* Notes:
* - This class does not support concurrent writes.
* - The writer will be automatically closed when failed to write logs or failed to save the
* log block.
* - Write operations after closing will throw exceptions.
*/
private[spark] class LogBlockWriter(
blockManager: BlockManager,
logBlockType: LogBlockType,
sparkConf: SparkConf,
bufferSize: Int = 32 * 1024) extends Logging {

private[storage] var tmpFile: File = null

private var cos: CountingOutputStream = null
private var objOut: SerializationStream = null
private var hasBeenClosed = false
private var recordsWritten = false
private var totalBytesWritten = 0

initialize()

private def initialize(): Unit = {
try {
val dir = new File(Utils.getLocalDir(sparkConf))
tmpFile = File.createTempFile(s"spark_log_$logBlockType", "", dir)
val fos = new FileOutputStream(tmpFile, false)
val bos = new BufferedOutputStream(fos, bufferSize)
cos = new CountingOutputStream(bos)
val emptyBlockId = LogBlockId.empty(logBlockType)
objOut = blockManager
.serializerManager
.blockSerializationStream(emptyBlockId, cos)(LogLine.getClassTag(logBlockType))
} catch {
case e: Exception =>
logError(log"Failed to initialize LogBlockWriter.", e)
close()
throw e
}
}

def bytesWritten(): Int = {
Option(cos)
.map(_.getCount)
.getOrElse(totalBytesWritten)
}

/**
* Write a log entry to the log block. Exception will be thrown if the writer has been closed
* or if there is an error during writing. Caller needs to deal with the exception. Suggest to
* close the writer when exception is thrown as block data could be corrupted which would lead
* to issues when reading the log block later.
*
* @param logEntry The log entry to write.
*/
def writeLog(logEntry: LogLine): Unit = {
if (hasBeenClosed) {
throw SparkException.internalError(
"Writer already closed. Cannot write more data.",
category = "STORAGE"
)
}

try {
objOut.writeObject(logEntry)
recordsWritten = true
} catch {
case e: Exception =>
logError(log"Failed to write log entry.", e)
throw e
}
}

def save(blockId: LogBlockId): Unit = {
if (hasBeenClosed) {
throw SparkException.internalError(
"Writer already closed. Cannot save.",
category = "STORAGE"
)
}

try {
if (blockId.logBlockType != logBlockType) {
throw SparkException.internalError(
s"LogBlockWriter is for $logBlockType, but got blockId $blockId")
}

objOut.flush()
objOut.close()
objOut = null

if(recordsWritten) {
totalBytesWritten = cos.getCount
// Save log block to BlockManager and delete the tmpFile.
val success = saveToBlockManager(blockId, totalBytesWritten)
if (!success) {
throw SparkException.internalError(s"Failed to save log block $blockId to BlockManager")
}
}
} finally {
close()
}
}

def close(): Unit = {
if (hasBeenClosed) {
return
}

try {
if (objOut != null) {
objOut.close()
}
if (tmpFile != null && tmpFile.exists()) {
tmpFile.delete()
}
} catch {
case e: Exception =>
logWarning(log"Failed to close resources of LogBlockWriter", e)
} finally {
objOut = null
cos = null
hasBeenClosed = true
}
}

// For test only.
private[storage] def flush(): Unit = {
if (objOut != null) {
objOut.flush()
}
}

private[storage] def saveToBlockManager(blockId: LogBlockId, blockSize: Long): Boolean = {
blockManager.
TempFileBasedBlockStoreUpdater(
blockId,
StorageLevel.DISK_ONLY,
LogLine.getClassTag(logBlockType),
tmpFile,
blockSize)
.save()
}
}
Loading