Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5718,6 +5718,15 @@
},
"sqlState" : "0A000"
},
"STREAMING_STATEFUL_OPERATOR_MISSING_STATE_DIRECTORY" : {
"message" : [
"Cannot restart streaming query with stateful operators because the state directory is empty or missing.",
"Stateful operators in current batch: [<OpsInCurBatchSeq>].",
"This typically occurs when state files have been deleted or the streaming query was previously run without stateful operators but restarted with stateful operators.",
"Please remove the stateful operators, use a new checkpoint location, or restore the missing state files."
],
"sqlState" : "42K03"
},
"STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA" : {
"message" : [
"Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user adds/removes/changes stateful operator of existing streaming query.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.streaming

import org.apache.spark.SparkException
import org.apache.spark.{SparkException, SparkRuntimeException}

/**
* Object for grouping error messages from streaming query exceptions
Expand All @@ -39,4 +39,17 @@ object StreamingErrors {
cause = err
)
}

def statefulOperatorMissingStateDirectory(
opsInCurBatch: Map[Long, String]): Throwable = {
def formatPairString(pair: (Long, String)): String =
s"(OperatorId: ${pair._1} -> OperatorName: ${pair._2})"

new SparkRuntimeException(
errorClass = "STREAMING_STATEFUL_OPERATOR_MISSING_STATE_DIRECTORY",
messageParameters = Map(
"OpsInCurBatchSeq" -> opsInCurBatch.map(formatPairString).mkString(", ")
)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, MergingSessi
import org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadataPartitionReader
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
import org.apache.spark.sql.execution.python.streaming.{FlatMapGroupsInPandasWithStateExec, TransformWithStateInPySparkExec}
import org.apache.spark.sql.execution.streaming.StreamingQueryPlanTraverseHelper
import org.apache.spark.sql.execution.streaming.{StreamingErrors, StreamingQueryPlanTraverseHelper}
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, OffsetSeqMetadata}
import org.apache.spark.sql.execution.streaming.operators.stateful.{SessionWindowStateStoreRestoreExec, SessionWindowStateStoreSaveExec, StatefulOperator, StatefulOperatorStateInfo, StateStoreRestoreExec, StateStoreSaveExec, StateStoreWriter, StreamingDeduplicateExec, StreamingDeduplicateWithinWatermarkExec, StreamingGlobalLimitExec, StreamingLocalLimitExec, UpdateEventTimeColumnExec}
import org.apache.spark.sql.execution.streaming.operators.stateful.flatmapgroupswithstate.FlatMapGroupsWithStateExec
Expand Down Expand Up @@ -563,6 +563,18 @@ class IncrementalExecution(
stateStoreWriter.getStateInfo.operatorId -> stateStoreWriter.shortName
}.toMap

// Check if state directory is empty when we have stateful operators
if (opMapInPhysicalPlan.nonEmpty) {
val stateDirPath = new Path(new Path(checkpointLocation).getParent, "state")
val fileManager = CheckpointFileManager.create(stateDirPath, hadoopConf)

val stateDirectoryEmpty = !fileManager.exists(stateDirPath) ||
fileManager.list(stateDirPath).isEmpty
if (stateDirectoryEmpty) {
throw StreamingErrors.statefulOperatorMissingStateDirectory(opMapInPhysicalPlan)
}
}

// A map of all (operatorId -> operatorName) in the state metadata
val opMapInMetadata: Map[Long, String] = {
var ret = Map.empty[Long, String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,4 +472,77 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
}
}
}

test("Restart with stateful operator but empty state directory triggers error") {
withTempDir { checkpointDir =>
val inputData = MemoryStream[Int]
val stream = inputData.toDF()

// Run a streaming query with stateful operator
testStream(stream.dropDuplicates())(
StartStream(checkpointLocation = checkpointDir.toString),
AddData(inputData, 1, 2, 3),
ProcessAllAvailable(),
StopStream)

// Delete the state directory to simulate deleted state files
val stateDir = new Path(checkpointDir.toString, "state")
val fileManager = CheckpointFileManager.create(stateDir, hadoopConf)
fileManager.delete(stateDir)

// Restart the query - should fail with empty state directory error
testStream(stream.dropDuplicates())(
StartStream(checkpointLocation = checkpointDir.toString),
AddData(inputData, 4),
ExpectFailure[SparkRuntimeException] { t =>
def formatPairString(pair: (Long, String)): String =
s"(OperatorId: ${pair._1} -> OperatorName: ${pair._2})"

checkError(
t.asInstanceOf[SparkRuntimeException],
"STREAMING_STATEFUL_OPERATOR_MISSING_STATE_DIRECTORY",
"42K03",
Map("OpsInCurBatchSeq" -> formatPairString(0L -> "dedupe")))
}
)
}
}

test("Restart with stateful operator added to previously stateless query triggers error") {
withTempDir { checkpointDir =>
val inputData = MemoryStream[Int]

// Run a stateless streaming query first
testStream(inputData.toDF().select($"value" * 2 as "doubled"))(
StartStream(checkpointLocation = checkpointDir.toString),
AddData(inputData, 1, 2, 3),
ProcessAllAvailable(),
AddData(inputData, 1, 2, 3),
ProcessAllAvailable(),
StopStream)

// Delete the state directory if it exists (it shouldn't for stateless query)
val stateDir = new Path(checkpointDir.toString, "state")
val fileManager = CheckpointFileManager.create(stateDir, hadoopConf)
if (fileManager.exists(stateDir)) {
fileManager.delete(stateDir)
}

// Restart with a stateful operator added - should fail
testStream(inputData.toDF().dropDuplicates())(
StartStream(checkpointLocation = checkpointDir.toString),
AddData(inputData, 4),
ExpectFailure[SparkRuntimeException] { t =>
def formatPairString(pair: (Long, String)): String =
s"(OperatorId: ${pair._1} -> OperatorName: ${pair._2})"

checkError(
t.asInstanceOf[SparkRuntimeException],
"STREAMING_STATEFUL_OPERATOR_MISSING_STATE_DIRECTORY",
"42K03",
Map("OpsInCurBatchSeq" -> formatPairString(0L -> "dedupe")))
}
)
}
}
}