From cd8de2d86018bba1742c6ec318bc205a01ba3ee4 Mon Sep 17 00:00:00 2001 From: Livia Zhu Date: Tue, 11 Nov 2025 14:23:04 -0800 Subject: [PATCH] add error if state dir is empty --- .../resources/error/error-conditions.json | 9 +++ .../execution/streaming/StreamingErrors.scala | 15 +++- .../runtime/IncrementalExecution.scala | 14 +++- .../state/OperatorStateMetadataSuite.scala | 73 +++++++++++++++++++ 4 files changed, 109 insertions(+), 2 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 57ed891087f2..264fd09a0da8 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -5718,6 +5718,15 @@ }, "sqlState" : "0A000" }, + "STREAMING_STATEFUL_OPERATOR_MISSING_STATE_DIRECTORY" : { + "message" : [ + "Cannot restart streaming query with stateful operators because the state directory is empty or missing.", + "Stateful operators in current batch: [].", + "This typically occurs when state files have been deleted or the streaming query was previously run without stateful operators but restarted with stateful operators.", + "Please remove the stateful operators, use a new checkpoint location, or restore the missing state files." + ], + "sqlState" : "42K03" + }, "STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA" : { "message" : [ "Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user adds/removes/changes stateful operator of existing streaming query.", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingErrors.scala index 98b8832ee2a8..38361686d083 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingErrors.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.execution.streaming -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, SparkRuntimeException} /** * Object for grouping error messages from streaming query exceptions @@ -39,4 +39,17 @@ object StreamingErrors { cause = err ) } + + def statefulOperatorMissingStateDirectory( + opsInCurBatch: Map[Long, String]): Throwable = { + def formatPairString(pair: (Long, String)): String = + s"(OperatorId: ${pair._1} -> OperatorName: ${pair._2})" + + new SparkRuntimeException( + errorClass = "STREAMING_STATEFUL_OPERATOR_MISSING_STATE_DIRECTORY", + messageParameters = Map( + "OpsInCurBatchSeq" -> opsInCurBatch.map(formatPairString).mkString(", ") + ) + ) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala index cf0c297efbf0..810823399017 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, MergingSessi import org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadataPartitionReader import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike import org.apache.spark.sql.execution.python.streaming.{FlatMapGroupsInPandasWithStateExec, TransformWithStateInPySparkExec} -import org.apache.spark.sql.execution.streaming.StreamingQueryPlanTraverseHelper +import org.apache.spark.sql.execution.streaming.{StreamingErrors, StreamingQueryPlanTraverseHelper} import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, OffsetSeqMetadata} import org.apache.spark.sql.execution.streaming.operators.stateful.{SessionWindowStateStoreRestoreExec, SessionWindowStateStoreSaveExec, StatefulOperator, StatefulOperatorStateInfo, StateStoreRestoreExec, StateStoreSaveExec, StateStoreWriter, StreamingDeduplicateExec, StreamingDeduplicateWithinWatermarkExec, StreamingGlobalLimitExec, StreamingLocalLimitExec, UpdateEventTimeColumnExec} import org.apache.spark.sql.execution.streaming.operators.stateful.flatmapgroupswithstate.FlatMapGroupsWithStateExec @@ -563,6 +563,18 @@ class IncrementalExecution( stateStoreWriter.getStateInfo.operatorId -> stateStoreWriter.shortName }.toMap + // Check if state directory is empty when we have stateful operators + if (opMapInPhysicalPlan.nonEmpty) { + val stateDirPath = new Path(new Path(checkpointLocation).getParent, "state") + val fileManager = CheckpointFileManager.create(stateDirPath, hadoopConf) + + val stateDirectoryEmpty = !fileManager.exists(stateDirPath) || + fileManager.list(stateDirPath).isEmpty + if (stateDirectoryEmpty) { + throw StreamingErrors.statefulOperatorMissingStateDirectory(opMapInPhysicalPlan) + } + } + // A map of all (operatorId -> operatorName) in the state metadata val opMapInMetadata: Map[Long, String] = { var ret = Map.empty[Long, String] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala index f34ae3d32888..01b9ae4dc82d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala @@ -472,4 +472,77 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession { } } } + + test("Restart with stateful operator but empty state directory triggers error") { + withTempDir { checkpointDir => + val inputData = MemoryStream[Int] + val stream = inputData.toDF() + + // Run a streaming query with stateful operator + testStream(stream.dropDuplicates())( + StartStream(checkpointLocation = checkpointDir.toString), + AddData(inputData, 1, 2, 3), + ProcessAllAvailable(), + StopStream) + + // Delete the state directory to simulate deleted state files + val stateDir = new Path(checkpointDir.toString, "state") + val fileManager = CheckpointFileManager.create(stateDir, hadoopConf) + fileManager.delete(stateDir) + + // Restart the query - should fail with empty state directory error + testStream(stream.dropDuplicates())( + StartStream(checkpointLocation = checkpointDir.toString), + AddData(inputData, 4), + ExpectFailure[SparkRuntimeException] { t => + def formatPairString(pair: (Long, String)): String = + s"(OperatorId: ${pair._1} -> OperatorName: ${pair._2})" + + checkError( + t.asInstanceOf[SparkRuntimeException], + "STREAMING_STATEFUL_OPERATOR_MISSING_STATE_DIRECTORY", + "42K03", + Map("OpsInCurBatchSeq" -> formatPairString(0L -> "dedupe"))) + } + ) + } + } + + test("Restart with stateful operator added to previously stateless query triggers error") { + withTempDir { checkpointDir => + val inputData = MemoryStream[Int] + + // Run a stateless streaming query first + testStream(inputData.toDF().select($"value" * 2 as "doubled"))( + StartStream(checkpointLocation = checkpointDir.toString), + AddData(inputData, 1, 2, 3), + ProcessAllAvailable(), + AddData(inputData, 1, 2, 3), + ProcessAllAvailable(), + StopStream) + + // Delete the state directory if it exists (it shouldn't for stateless query) + val stateDir = new Path(checkpointDir.toString, "state") + val fileManager = CheckpointFileManager.create(stateDir, hadoopConf) + if (fileManager.exists(stateDir)) { + fileManager.delete(stateDir) + } + + // Restart with a stateful operator added - should fail + testStream(inputData.toDF().dropDuplicates())( + StartStream(checkpointLocation = checkpointDir.toString), + AddData(inputData, 4), + ExpectFailure[SparkRuntimeException] { t => + def formatPairString(pair: (Long, String)): String = + s"(OperatorId: ${pair._1} -> OperatorName: ${pair._2})" + + checkError( + t.asInstanceOf[SparkRuntimeException], + "STREAMING_STATEFUL_OPERATOR_MISSING_STATE_DIRECTORY", + "42K03", + Map("OpsInCurBatchSeq" -> formatPairString(0L -> "dedupe"))) + } + ) + } + } }