From 6a673f2b7ff70c14e7ef1ea6ca53f1fda42e8edf Mon Sep 17 00:00:00 2001 From: Tengfei Huang Date: Wed, 15 Oct 2025 04:00:18 +0000 Subject: [PATCH 1/3] Add logging support for BlockManager --- .../spark/serializer/SerializerManager.scala | 15 +- .../org/apache/spark/storage/BlockId.scala | 40 ++++ .../apache/spark/storage/BlockManager.scala | 32 ++- .../spark/storage/LogBlockIdGenerator.scala | 49 +++++ .../apache/spark/storage/LogBlockWriter.scala | 184 ++++++++++++++++++ .../org/apache/spark/storage/LogLine.scala | 30 +++ .../spark/storage/RollingLogWriter.scala | 115 +++++++++++ .../spark/storage/BlockManagerSuite.scala | 77 ++++++++ .../spark/storage/LogBlockWriterSuite.scala | 137 +++++++++++++ 9 files changed, 673 insertions(+), 6 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/storage/LogBlockIdGenerator.scala create mode 100644 core/src/main/scala/org/apache/spark/storage/LogBlockWriter.scala create mode 100644 core/src/main/scala/org/apache/spark/storage/LogLine.scala create mode 100644 core/src/main/scala/org/apache/spark/storage/RollingLogWriter.scala create mode 100644 core/src/test/scala/org/apache/spark/storage/LogBlockWriterSuite.scala diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index 640396a69526..d53a4d549782 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -172,9 +172,8 @@ private[spark] class SerializerManager( outputStream: OutputStream, values: Iterator[T]): Unit = { val byteStream = new BufferedOutputStream(outputStream) - val autoPick = !blockId.isInstanceOf[StreamBlockId] - val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance() - ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() + blockSerializationStream[T](blockId, byteStream)(implicitly[ClassTag[T]]) + .writeAll(values).close() } /** Serializes into a chunked byte buffer. */ @@ -212,4 +211,14 @@ private[spark] class SerializerManager( .deserializeStream(wrapForCompression(blockId, stream)) .asIterator.asInstanceOf[Iterator[T]] } + + /** Generate a `SerializationStream` for a block. */ + private[spark] def blockSerializationStream[T]( + blockId: BlockId, + outputStream: OutputStream) + (classTag: ClassTag[T]): SerializationStream = { + val autoPick = !blockId.isInstanceOf[StreamBlockId] + val ser = getSerializer(classTag, autoPick).newInstance() + ser.serializeStream(wrapForCompression(blockId, outputStream)) + } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 6eb015d56b2c..1d22999b94b0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -23,6 +23,7 @@ import org.apache.spark.SparkException import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.network.shuffle.RemoteBlockPushResolver +import org.apache.spark.storage.LogBlockType.LogBlockType /** * :: DeveloperApi :: @@ -175,6 +176,42 @@ case class PythonStreamBlockId(streamId: Int, uniqueId: Long) extends BlockId { override def name: String = "python-stream-" + streamId + "-" + uniqueId } +object LogBlockType extends Enumeration { + type LogBlockType = Value + val TEST = Value +} + +/** + * Identifies a block of log data. + * + * @param lastLogTime the timestamp of the last log entry in this block, used for filtering + * and log management. + * @param executorId the ID of the executor that produced this log block. + */ +abstract sealed class LogBlockId( + val lastLogTime: Long, + val executorId: String) extends BlockId { + def logBlockType: LogBlockType +} + +object LogBlockId { + def empty(logBlockType: LogBlockType): LogBlockId = { + logBlockType match { + case LogBlockType.TEST => TestLogBlockId(0L, "") + case _ => throw new SparkException(s"Unsupported log block type: $logBlockType") + } + } +} + +// Used for test purpose only. +case class TestLogBlockId(override val lastLogTime: Long, override val executorId: String) + extends LogBlockId(lastLogTime, executorId) { + override def name: String = + "test_log_" + lastLogTime + "_" + executorId + + override def logBlockType: LogBlockType = LogBlockType.TEST +} + /** Id associated with temporary local data managed as blocks. Not serializable. */ private[spark] case class TempLocalBlockId(id: UUID) extends BlockId { override def name: String = "temp_local_" + id @@ -222,6 +259,7 @@ object BlockId { val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r val TEST = "test_(.*)".r + val TEST_LOG_BLOCK = "test_log_([0-9]+)_(.*)".r def apply(name: String): BlockId = name match { case RDD(rddId, splitIndex) => @@ -264,6 +302,8 @@ object BlockId { TempShuffleBlockId(UUID.fromString(uuid)) case TEST(value) => TestBlockId(value) + case TEST_LOG_BLOCK(lastLogTime, executorId) => + TestLogBlockId(lastLogTime.toLong, executorId) case _ => throw SparkCoreErrors.unrecognizedBlockIdError(name) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1e1cb8bf9fd5..602e8d068fb6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -58,6 +58,7 @@ import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.serializer.{SerializerInstance, SerializerManager} import org.apache.spark.shuffle.{IndexShuffleBlockResolver, MigratableResolver, ShuffleManager, ShuffleWriteMetricsReporter} import org.apache.spark.storage.BlockManagerMessages.{DecommissionBlockManager, ReplicateBlock} +import org.apache.spark.storage.LogBlockType.LogBlockType import org.apache.spark.storage.memory._ import org.apache.spark.unsafe.Platform import org.apache.spark.util._ @@ -294,12 +295,17 @@ private[spark] class BlockManager( decommissioner.isDefined } - @inline final private def checkShouldStore(blockId: BlockId) = { + @inline final private def checkShouldStore(blockId: BlockId, level: StorageLevel) = { // Don't reject broadcast blocks since they may be stored during task exec and // don't need to be migrated. if (isDecommissioning() && !blockId.isBroadcast) { throw SparkCoreErrors.cannotSaveBlockOnDecommissionedExecutorError(blockId) } + if (blockId.isInstanceOf[LogBlockId] && level != StorageLevel.DISK_ONLY) { + throw SparkException.internalError( + s"Cannot store log block $blockId with storage level $level. " + + "Log blocks must be stored with DISK_ONLY.") + } } // This is a lazy val so someone can migrating RDDs even if they don't have a MigratableResolver @@ -763,7 +769,7 @@ private[spark] class BlockManager( level: StorageLevel, classTag: ClassTag[_]): StreamCallbackWithID = { - checkShouldStore(blockId) + checkShouldStore(blockId, level) if (blockId.isShuffle) { logDebug(s"Putting shuffle block ${blockId}") @@ -1483,6 +1489,26 @@ private[spark] class BlockManager( syncWrites, writeMetrics, blockId) } + /** + * To get a log block writer that can write logs directly to a disk block. Either `save` or + * `close` should be called to finish the writing and release opened resources. + * `save` would write the block to the block manager, while `close` would just close the writer. + */ + def getLogBlockWriter( + logBlockType: LogBlockType): LogBlockWriter = { + new LogBlockWriter(this, logBlockType, conf) + } + + /** + * To get a rolling log writer that can write logs to block manager and split the logs + * to multiple blocks if the log size exceeds the threshold. + */ + def getRollingLogWriter( + blockIdGenerator: LogBlockIdGenerator, + rollingSize: Long = 33554432L): RollingLogWriter = { + new RollingLogWriter(this, blockIdGenerator, rollingSize) + } + /** * Put a new block of serialized bytes to the block manager. * @@ -1540,7 +1566,7 @@ private[spark] class BlockManager( require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") - checkShouldStore(blockId) + checkShouldStore(blockId, level) val putBlockInfo = { val newInfo = new BlockInfo(level, classTag, tellMaster) diff --git a/core/src/main/scala/org/apache/spark/storage/LogBlockIdGenerator.scala b/core/src/main/scala/org/apache/spark/storage/LogBlockIdGenerator.scala new file mode 100644 index 000000000000..4a2b90677ba3 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/LogBlockIdGenerator.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.apache.spark.SparkException +import org.apache.spark.storage.LogBlockType.LogBlockType + +/** + * LogBlockIdGenerator is responsible for generating unique LogBlockIds for log blocks. + */ +trait LogBlockIdGenerator { + // The log block type that this generator supports. + def logBlockType: LogBlockType + + // Generates a unique LogBlockId based on the last log time and executor ID. + protected def genUniqueBlockId(lastLogTime: Long, executorId: String): LogBlockId + + /** + * Generates a new LogBlockId based on the last log time and executor ID. Make sure that + * the generated LogBlockId has the same log block type as this generator. + * + * @param lastLogTime The timestamp of the last log entry. + * @param executorId The ID of the executor generating the log block. + */ + final def nextBlockId(lastLogTime: Long, executorId: String): LogBlockId = { + val blockId = genUniqueBlockId(lastLogTime, executorId) + if (blockId.logBlockType != this.logBlockType) { + throw SparkException.internalError( + "BlockId generated by LogBlockIdGenerator does not match " + + s"the expected log block type: ${blockId.logBlockType} != ${this.logBlockType}") + } + blockId + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/LogBlockWriter.scala b/core/src/main/scala/org/apache/spark/storage/LogBlockWriter.scala new file mode 100644 index 000000000000..c406447fca7e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/LogBlockWriter.scala @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import java.io.BufferedOutputStream +import java.io.File +import java.io.FileOutputStream + +import scala.reflect.ClassTag + +import org.apache.commons.io.output.CountingOutputStream + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.internal.Logging +import org.apache.spark.serializer.SerializationStream +import org.apache.spark.storage.LogBlockType.LogBlockType +import org.apache.spark.util.Utils + +/** + * A class for writing logs directly to a file on disk and save as a block in BlockManager if + * there are any logs written. + * `save` or `close` must be called to ensure resources are released properly. `save` will add + * the log block to BlockManager, while `close` will just release the resources without saving + * the log block. + * + * Notes: + * - This class does not support concurrent writes. + * - The writer will be automatically closed when failed to write logs or failed to save the + * log block. + * - Write operations after closing will throw exceptions. + */ +private[spark] class LogBlockWriter( + blockManager: BlockManager, + logBlockType: LogBlockType, + sparkConf: SparkConf, + bufferSize: Int = 32 * 1024) extends Logging { + + private[storage] var tmpFile: File = null + + private var cos: CountingOutputStream = null + private var objOut: SerializationStream = null + private var hasBeenClosed = false + private var recordsWritten = false + private var totalBytesWritten = 0 + + initialize() + + private def initialize(): Unit = { + try { + val dir = new File(Utils.getLocalDir(sparkConf)) + tmpFile = File.createTempFile(s"spark_log_$logBlockType", "", dir) + val fos = new FileOutputStream(tmpFile, false) + val bos = new BufferedOutputStream(fos, bufferSize) + cos = new CountingOutputStream(bos) + val emptyBlockId = LogBlockId.empty(logBlockType) + objOut = blockManager + .serializerManager + .blockSerializationStream(emptyBlockId, cos)(implicitly[ClassTag[LogLine]]) + } catch { + case e: Exception => + logError(log"Failed to initialize LogBlockWriter.", e) + close() + throw e + } + } + + def bytesWritten(): Int = { + Option(cos) + .map(_.getCount) + .getOrElse(totalBytesWritten) + } + + /** + * Write a log entry to the log block. Exception will be thrown if the writer has been closed + * or if there is an error during writing. Caller needs to deal with the exception. Suggest to + * close the writer when exception is thrown as block data could be corrupted which would lead + * to issues when reading the log block later. + * + * @param logEntry The log entry to write. + */ + def writeLog(logEntry: LogLine): Unit = { + if (hasBeenClosed) { + throw SparkException.internalError( + "Writer already closed. Cannot write more data.", + category = "STORAGE" + ) + } + + try { + objOut.writeObject(logEntry) + recordsWritten = true + } catch { + case e: Exception => + logError(log"Failed to write log entry.", e) + throw e + } + } + + def save(blockId: LogBlockId): Unit = { + if (hasBeenClosed) { + throw SparkException.internalError( + "Writer already closed. Cannot save.", + category = "STORAGE" + ) + } + + try { + if (blockId.logBlockType != logBlockType) { + throw SparkException.internalError( + s"LogBlockWriter is for $logBlockType, but got blockId $blockId") + } + + objOut.flush() + objOut.close() + objOut = null + + if(recordsWritten) { + totalBytesWritten = cos.getCount + // Save log block to BlockManager and delete the tmpFile. + val success = saveToBlockManager(blockId, totalBytesWritten) + if (!success) { + throw SparkException.internalError(s"Failed to save log block $blockId to BlockManager") + } + } + } finally { + close() + } + } + + def close(): Unit = { + if (hasBeenClosed) { + return + } + + try { + if (objOut != null) { + objOut.close() + } + if (tmpFile != null && tmpFile.exists()) { + tmpFile.delete() + } + } catch { + case e: Exception => + logWarning(log"Failed to close resources of LogBlockWriter", e) + } finally { + objOut = null + cos = null + hasBeenClosed = true + } + } + + // For test only. + private[storage] def flush(): Unit = { + if (objOut != null) { + objOut.flush() + } + } + + private[storage] def saveToBlockManager(blockId: LogBlockId, blockSize: Long): Boolean = { + blockManager. + TempFileBasedBlockStoreUpdater( + blockId, + StorageLevel.DISK_ONLY, + implicitly[ClassTag[LogLine]], + tmpFile, + blockSize) + .save() + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/LogLine.scala b/core/src/main/scala/org/apache/spark/storage/LogLine.scala new file mode 100644 index 000000000000..7ba66209feaf --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/LogLine.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +/** + * Base class representing a log line. + * + * @param eventTime timestamp in milliseconds when the log is written + * @param sequenceId sequence ID of the log line + * @param message log message + */ +case class LogLine( + eventTime: Long, + sequenceId: Long, + message: String) diff --git a/core/src/main/scala/org/apache/spark/storage/RollingLogWriter.scala b/core/src/main/scala/org/apache/spark/storage/RollingLogWriter.scala new file mode 100644 index 000000000000..d925d7b79f46 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/RollingLogWriter.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.LogKeys.{BLOCK_ID, BYTE_SIZE} +import org.apache.spark.storage.LogBlockType.LogBlockType + +/** + * Rolling log writer that writes log entries to blocks in a rolling manner. Here we split + * log blocks based on size limit. + * + * @param blockManager BlockManager to manage log blocks. + * @param blockIdGenerator BlockId generator to generate unique block IDs for log blocks. + * @param rollingSize Size limit for each log block. Default is 32MB (33554432 bytes). + */ +private[spark] class RollingLogWriter( + blockManager: BlockManager, + blockIdGenerator: LogBlockIdGenerator, + rollingSize: Long = 33554432L) extends Logging { + private var currentBlockWriter: Option[LogBlockWriter] = None + private var lastLogTime: Long = 0L + private val logBlockType: LogBlockType = blockIdGenerator.logBlockType + + private def shouldRollOver: Boolean = { + currentBlockWriter match { + case Some(writer) => writer.bytesWritten() >= rollingSize + case None => false + } + } + + /** + * Write a log entry. If the current block writer is empty, it will create a new one. + * If the current block exceeds the rolling size, it will roll over to a new block for + * the next log entry. + * + * @param logEntry log entry to write. + * @param removeBlockOnException if true, current log block will be deleted without saving to + * BlockManager. Otherwise, not action will be taken on current + * block which might be corrupted. + */ + def writeLog(logEntry: LogLine, removeBlockOnException: Boolean = false): Unit = { + // Create a new log writer if it's empty + if (currentBlockWriter.isEmpty) { + currentBlockWriter = Some(blockManager.getLogBlockWriter(logBlockType)) + } + + try { + currentBlockWriter.foreach { writer => + writer.writeLog(logEntry) + lastLogTime = logEntry.eventTime + } + } catch { + case e: Exception => + if (removeBlockOnException) { + logError(log"Failed to write log, closing block without saving.", e) + currentBlockWriter.foreach(_.close()) + currentBlockWriter = None + } + + throw e + } + + if (shouldRollOver) { + rollOver() + } + } + + def rollOver(): Unit = { + // Save current block and reset the writer + try { + saveCurrentBlock() + } finally { + currentBlockWriter = None + } + } + + def close(): Unit = { + try { + saveCurrentBlock() + } finally { + currentBlockWriter = None + lastLogTime = 0L + } + } + + // For test purpose. + private[storage] def flush(): Unit = { + currentBlockWriter.foreach(_.flush()) + } + + private def saveCurrentBlock(): Unit = { + currentBlockWriter.foreach { writer => + val blockId = blockIdGenerator.nextBlockId(lastLogTime, blockManager.executorId) + logInfo(log"Saving log block ${MDC(BLOCK_ID, blockId)} with " + + log"approximate size: ${MDC(BYTE_SIZE, writer.bytesWritten())} bytes.") + writer.save(blockId) + } + } +} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 5b86345dd5f9..5046ed98edaa 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -64,6 +64,8 @@ import org.apache.spark.serializer.{DeserializationStream, JavaSerializer, KryoD import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo, ShuffleBlockResolver, ShuffleManager} import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage.BlockManagerMessages._ +import org.apache.spark.storage.LogBlockType.LogBlockType +import org.apache.spark.storage.StorageLevel._ import org.apache.spark.util._ import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.collection.Utils.createArray @@ -2504,6 +2506,81 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe assert(acc.value === 6) } + test("SPARK-53755: LogBlock should be DISK_ONLY") { + val store = makeBlockManager(8000, "executor1") + val data = Seq("log line 1", "log line 2") + val logBlockId = TestLogBlockId(1234L, store.executorId) + + Seq(DISK_ONLY_2, DISK_ONLY_3, + MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_ONLY_SER, + MEMORY_ONLY_SER_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, + MEMORY_AND_DISK_SER, MEMORY_AND_DISK_SER_2, OFF_HEAP).foreach { level => + val exception = intercept[SparkException] { + store.putIterator[String](logBlockId, data.iterator, level, tellMaster = true) + } + assert(exception.getMessage.contains("Log blocks must be stored with DISK_ONLY.")) + } + + assert(store.putIterator[String](logBlockId, data.iterator, DISK_ONLY, tellMaster = true)) + } + + test("SPARK-53755: Log block write/read") { + val store = makeBlockManager(8000, "executor1") + val logBlockWriter = store.getLogBlockWriter(LogBlockType.TEST) + val logBlockId = TestLogBlockId(1L, store.executorId) + val log1 = LogLine(0L, 1, "Log message 1") + val log2 = LogLine(1L, 2, "Log message 2") + + logBlockWriter.writeLog(log1) + logBlockWriter.writeLog(log2) + logBlockWriter.save(logBlockId) + + val status = store.getStatus(logBlockId) + assert(status.isDefined) + status.foreach { s => + assert(s.storageLevel === DISK_ONLY) + assert(s.memSize === 0) + assert(s.diskSize > 0) + } + + val data = store.get[LogLine](logBlockId).get.data.toSeq + assert(data === Seq(log1, log2)) + } + + test("SPARK-53755: rolling log block write/read") { + val store = makeBlockManager(8000, "executor1") + + val logBlockIdGenerator = new LogBlockIdGenerator { + override def logBlockType: LogBlockType = LogBlockType.TEST + + override protected def genUniqueBlockId( + lastLogTime: Long, executorId: String): LogBlockId = { + TestLogBlockId(lastLogTime, executorId) + } + } + + val logBlockWriter = store.getRollingLogWriter(logBlockIdGenerator, 100) + val log1 = LogLine(0L, 1, "Log message 1") + val log2 = LogLine(1L, 2, "Log message 2") + val log3 = LogLine(2L, 3, "Log message 3") + val log4 = LogLine(3L, 4, "Log message 4") + + // 65 bytes for each log line, 2 log lines for each block + logBlockWriter.writeLog(log1) + logBlockWriter.writeLog(log2) + // Flush and update bytes written, so that the next write will go to a new block. + logBlockWriter.flush() + logBlockWriter.writeLog(log3) + logBlockWriter.writeLog(log4) + logBlockWriter.close() + + val logBlockId1 = TestLogBlockId(2L, store.executorId) + val logBlockId2 = TestLogBlockId(3L, store.executorId) + val logBlockIds = store.getMatchingBlockIds(_.isInstanceOf[TestLogBlockId]) + assert(logBlockIds.size === 2) + assert(logBlockIds.contains(logBlockId1) && logBlockIds.contains(logBlockId2)) + } + private def createKryoSerializerWithDiskCorruptedInputStream(): KryoSerializer = { class TestDiskCorruptedInputStream extends InputStream { override def read(): Int = throw new IOException("Input/output error") diff --git a/core/src/test/scala/org/apache/spark/storage/LogBlockWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/LogBlockWriterSuite.scala new file mode 100644 index 000000000000..02b627f66a1a --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/LogBlockWriterSuite.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.storage + +import java.io.File +import java.nio.file.Files + +import org.mockito.ArgumentMatchers.{any, anyLong} +import org.mockito.Mockito.{doAnswer, doThrow, mock, spy, times, verify} + +import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} +import org.apache.spark.serializer.{JavaSerializer, SerializerManager} +import org.apache.spark.util.Utils + +class LogBlockWriterSuite extends SparkFunSuite { + var tempDir: File = _ + var sparkConf: SparkConf = _ + + override def beforeEach(): Unit = { + super.beforeEach() + tempDir = Utils.createTempDir() + sparkConf = new SparkConf(false) + .set("spark.local.dir", tempDir.getAbsolutePath) + } + + override def afterEach(): Unit = { + try { + Utils.deleteRecursively(tempDir) + } finally { + Utils.clearLocalRootDirs() + super.afterEach() + } + } + + test("SPARK-53755: close resources when failed to initialize") { + val blockManager = mock(classOf[BlockManager]) + val serializerManager = + spy(new SerializerManager(new JavaSerializer(sparkConf), sparkConf, None)) + doAnswer(_ => serializerManager).when(blockManager).serializerManager + doThrow(new RuntimeException("Initialization failed")) + .when(serializerManager).blockSerializationStream(any, any)(any) + + intercept[RuntimeException] { + new LogBlockWriter( + blockManager, LogBlockType.TEST, sparkConf) + } + verify(serializerManager, times(1)).blockSerializationStream(any, any)(any) + val leafFiles = Files.walk(tempDir.toPath) + .filter(Files.isRegularFile(_)) + .toArray + assert(leafFiles.isEmpty, "Temporary file should be deleted.") + } + + test("SPARK-53755: bytes written stats") { + val logBlockWriter = makeLogBlockWriter() + + val log1 = LogLine(0L, 1, "Log message 1") + val log2 = LogLine(1L, 2, "Log message 2") + try { + logBlockWriter.writeLog(log1) + logBlockWriter.writeLog(log2) + logBlockWriter.flush() + assert(logBlockWriter.bytesWritten() === logBlockWriter.tmpFile.length()) + } finally { + logBlockWriter.close() + } + } + + test("SPARK-53755: writeLog/save operations should fail on closed LogBlockWriter") { + val logBlockWriter = makeLogBlockWriter() + val log = LogLine(0L, 1, "Log message 1") + + logBlockWriter.writeLog(log) + logBlockWriter.close() + + val exception1 = intercept[SparkException] { + logBlockWriter.writeLog(log) + } + assert(exception1.getMessage.contains("Writer already closed. Cannot write more data.")) + + val exception2 = intercept[SparkException] { + logBlockWriter.save(TestLogBlockId(0L, "1")) + } + assert(exception2.getMessage.contains("Writer already closed. Cannot save.")) + } + + test("SPARK-53755: close writer after saving to block manager") { + val log = LogLine(0L, 1, "Log message 1") + + Seq(true, false).foreach { success => + val logBlockWriter = spy(makeLogBlockWriter()) + doAnswer(_ => success).when(logBlockWriter) + .saveToBlockManager(any[LogBlockId], anyLong) + + logBlockWriter.writeLog(log) + if (success) { + logBlockWriter.save(TestLogBlockId(0L, "1")) + } else { + val exception = intercept[SparkException] { + logBlockWriter.save(TestLogBlockId(0L, "1")) + } + assert(exception.getMessage.contains("Failed to save log block")) + } + + verify(logBlockWriter, times(1)).close() + } + } + + test("SPARK-53755: skip saving to block manager if no logs written") { + val logBlockWriter = spy(makeLogBlockWriter()) + logBlockWriter.save(TestLogBlockId(0L, "1")) + assert(logBlockWriter.bytesWritten() === 0L) + verify(logBlockWriter, times(0)).saveToBlockManager(any[LogBlockId], anyLong) + } + + private def makeLogBlockWriter(): LogBlockWriter = { + val serializerManager = new SerializerManager(new JavaSerializer(sparkConf), sparkConf, None) + val blockManager = mock(classOf[BlockManager]) + doAnswer(_ => serializerManager).when(blockManager).serializerManager + new LogBlockWriter( + blockManager, LogBlockType.TEST, sparkConf) + } +} From 3f5a7d302a98a44a9ef9b94594c863d8522660e6 Mon Sep 17 00:00:00 2001 From: Tengfei Huang Date: Fri, 17 Oct 2025 02:29:11 +0000 Subject: [PATCH 2/3] code refactor --- .../apache/spark/storage/LogBlockWriter.scala | 6 ++--- .../org/apache/spark/storage/LogLine.scala | 26 ++++++++++++++++--- .../spark/storage/BlockManagerSuite.scala | 14 +++++----- .../spark/storage/LogBlockWriterSuite.scala | 8 +++--- 4 files changed, 35 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/LogBlockWriter.scala b/core/src/main/scala/org/apache/spark/storage/LogBlockWriter.scala index c406447fca7e..5645f59d383d 100644 --- a/core/src/main/scala/org/apache/spark/storage/LogBlockWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/LogBlockWriter.scala @@ -21,8 +21,6 @@ import java.io.BufferedOutputStream import java.io.File import java.io.FileOutputStream -import scala.reflect.ClassTag - import org.apache.commons.io.output.CountingOutputStream import org.apache.spark.{SparkConf, SparkException} @@ -70,7 +68,7 @@ private[spark] class LogBlockWriter( val emptyBlockId = LogBlockId.empty(logBlockType) objOut = blockManager .serializerManager - .blockSerializationStream(emptyBlockId, cos)(implicitly[ClassTag[LogLine]]) + .blockSerializationStream(emptyBlockId, cos)(LogLine.getClassTag(logBlockType)) } catch { case e: Exception => logError(log"Failed to initialize LogBlockWriter.", e) @@ -176,7 +174,7 @@ private[spark] class LogBlockWriter( TempFileBasedBlockStoreUpdater( blockId, StorageLevel.DISK_ONLY, - implicitly[ClassTag[LogLine]], + LogLine.getClassTag(logBlockType), tmpFile, blockSize) .save() diff --git a/core/src/main/scala/org/apache/spark/storage/LogLine.scala b/core/src/main/scala/org/apache/spark/storage/LogLine.scala index 7ba66209feaf..dc646f289e37 100644 --- a/core/src/main/scala/org/apache/spark/storage/LogLine.scala +++ b/core/src/main/scala/org/apache/spark/storage/LogLine.scala @@ -17,6 +17,9 @@ package org.apache.spark.storage +import scala.reflect.ClassTag +import scala.reflect.classTag + /** * Base class representing a log line. * @@ -24,7 +27,22 @@ package org.apache.spark.storage * @param sequenceId sequence ID of the log line * @param message log message */ -case class LogLine( - eventTime: Long, - sequenceId: Long, - message: String) +trait LogLine { + val eventTime: Long + val sequenceId: Long + val message: String +} + +object LogLine { + def getClassTag(logBlockType: LogBlockType.LogBlockType): ClassTag[_<:LogLine] = + logBlockType match { + case LogBlockType.TEST => + classTag[TestLogLine] + case unsupportedLogBlockType => + throw new RuntimeException("Not supported log type " + unsupportedLogBlockType) + } +} + +case class TestLogLine(eventTime: Long, sequenceId: Long, message: String) + extends LogLine { +} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 5046ed98edaa..102845d7f05d 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -2528,8 +2528,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe val store = makeBlockManager(8000, "executor1") val logBlockWriter = store.getLogBlockWriter(LogBlockType.TEST) val logBlockId = TestLogBlockId(1L, store.executorId) - val log1 = LogLine(0L, 1, "Log message 1") - val log2 = LogLine(1L, 2, "Log message 2") + val log1 = TestLogLine(0L, 1, "Log message 1") + val log2 = TestLogLine(1L, 2, "Log message 2") logBlockWriter.writeLog(log1) logBlockWriter.writeLog(log2) @@ -2543,7 +2543,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe assert(s.diskSize > 0) } - val data = store.get[LogLine](logBlockId).get.data.toSeq + val data = store.get[TestLogLine](logBlockId).get.data.toSeq assert(data === Seq(log1, log2)) } @@ -2560,10 +2560,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe } val logBlockWriter = store.getRollingLogWriter(logBlockIdGenerator, 100) - val log1 = LogLine(0L, 1, "Log message 1") - val log2 = LogLine(1L, 2, "Log message 2") - val log3 = LogLine(2L, 3, "Log message 3") - val log4 = LogLine(3L, 4, "Log message 4") + val log1 = TestLogLine(0L, 1, "Log message 1") + val log2 = TestLogLine(1L, 2, "Log message 2") + val log3 = TestLogLine(2L, 3, "Log message 3") + val log4 = TestLogLine(3L, 4, "Log message 4") // 65 bytes for each log line, 2 log lines for each block logBlockWriter.writeLog(log1) diff --git a/core/src/test/scala/org/apache/spark/storage/LogBlockWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/LogBlockWriterSuite.scala index 02b627f66a1a..2e06e5d150d5 100644 --- a/core/src/test/scala/org/apache/spark/storage/LogBlockWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/LogBlockWriterSuite.scala @@ -68,8 +68,8 @@ class LogBlockWriterSuite extends SparkFunSuite { test("SPARK-53755: bytes written stats") { val logBlockWriter = makeLogBlockWriter() - val log1 = LogLine(0L, 1, "Log message 1") - val log2 = LogLine(1L, 2, "Log message 2") + val log1 = TestLogLine(0L, 1, "Log message 1") + val log2 = TestLogLine(1L, 2, "Log message 2") try { logBlockWriter.writeLog(log1) logBlockWriter.writeLog(log2) @@ -82,7 +82,7 @@ class LogBlockWriterSuite extends SparkFunSuite { test("SPARK-53755: writeLog/save operations should fail on closed LogBlockWriter") { val logBlockWriter = makeLogBlockWriter() - val log = LogLine(0L, 1, "Log message 1") + val log = TestLogLine(0L, 1, "Log message 1") logBlockWriter.writeLog(log) logBlockWriter.close() @@ -99,7 +99,7 @@ class LogBlockWriterSuite extends SparkFunSuite { } test("SPARK-53755: close writer after saving to block manager") { - val log = LogLine(0L, 1, "Log message 1") + val log = TestLogLine(0L, 1, "Log message 1") Seq(true, false).foreach { success => val logBlockWriter = spy(makeLogBlockWriter()) From cd369cf84a4cfe629b5f6a7cc65a6c42ecdbb9f0 Mon Sep 17 00:00:00 2001 From: Tengfei Huang Date: Fri, 17 Oct 2025 09:40:16 +0000 Subject: [PATCH 3/3] fix log block id while apply --- core/src/main/scala/org/apache/spark/storage/BlockId.scala | 4 ++-- .../scala/org/apache/spark/storage/BlockManagerSuite.scala | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 1d22999b94b0..a15426783ebe 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -300,10 +300,10 @@ object BlockId { TempLocalBlockId(UUID.fromString(uuid)) case TEMP_SHUFFLE(uuid) => TempShuffleBlockId(UUID.fromString(uuid)) - case TEST(value) => - TestBlockId(value) case TEST_LOG_BLOCK(lastLogTime, executorId) => TestLogBlockId(lastLogTime.toLong, executorId) + case TEST(value) => + TestBlockId(value) case _ => throw SparkCoreErrors.unrecognizedBlockIdError(name) } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 102845d7f05d..95a315a486ff 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -2576,7 +2576,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe val logBlockId1 = TestLogBlockId(2L, store.executorId) val logBlockId2 = TestLogBlockId(3L, store.executorId) - val logBlockIds = store.getMatchingBlockIds(_.isInstanceOf[TestLogBlockId]) + val logBlockIds = store + .getMatchingBlockIds(_.isInstanceOf[TestLogBlockId]) + .distinct assert(logBlockIds.size === 2) assert(logBlockIds.contains(logBlockId1) && logBlockIds.contains(logBlockId2)) }