Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Jul 29, 2024
1 parent bf3d9b5 commit 27e83d7
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 26 deletions.
4 changes: 2 additions & 2 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3805,7 +3805,7 @@
},
"STATEFUL_PROCESSOR_DUPLICATE_STATE_VARIABLE_DEFINED" : {
"message" : [
"State variable with name <stateName> has already been defined in the StatefulProcessor."
"State variable with name <stateVarName> has already been defined in the StatefulProcessor."
],
"sqlState" : "42802"
},
Expand Down Expand Up @@ -3872,7 +3872,7 @@
},
"STATE_STORE_INVALID_VARIABLE_TYPE_CHANGE" : {
"message" : [
"Cannot change <stateName> to <newType> between query restarts. Please set <stateName> to <oldType>, or restart with a new checkpoint directory."
"Cannot change <stateVarName> to <newType> between query restarts. Please set <stateVarName> to <oldType>, or restart with a new checkpoint directory."
],
"sqlState" : "42K06"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,25 +443,6 @@ case class TransformWithStateExec(
new Path(new Path(storeNamePath, "_metadata"), "schema")
}

private def checkStateVariableEquality(
oldStateVariableInfos: List[TransformWithStateVariableInfo]): Unit = {
val newStateVariableInfos = getStateVariableInfos()
oldStateVariableInfos.foreach { oldInfo =>
val newInfo = newStateVariableInfos.get(oldInfo.stateName)
newInfo match {
case Some(stateVarInfo) =>
if (oldInfo.stateVariableType != stateVarInfo.stateVariableType) {
throw StateStoreErrors.invalidVariableTypeChange(
stateVarInfo.stateName,
oldInfo.stateVariableType.toString,
stateVarInfo.stateVariableType.toString
)
}
case None =>
}
}
}

override def validateNewMetadata(
oldOperatorMetadata: OperatorStateMetadata,
newOperatorMetadata: OperatorStateMetadata): Unit = {
Expand All @@ -475,7 +456,6 @@ case class TransformWithStateExec(
newMetadataV2.operatorPropertiesJson)
TransformWithStateOperatorProperties.validateOperatorProperties(
oldOperatorProps, newOperatorProps)
checkStateVariableEquality(oldOperatorProps.stateVariables)
case (_, _) =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,23 @@ object TransformWithStateOperatorProperties extends Logging {
throw StateStoreErrors.invalidConfigChangedAfterRestart(
"outputMode", oldOperatorProperties.outputMode, newOperatorProperties.outputMode)
}
val oldStateVariableInfos = oldOperatorProperties.stateVariables
val newStateVariableInfos = newOperatorProperties.stateVariables.map { stateVarInfo =>
stateVarInfo.stateName -> stateVarInfo
}.toMap
oldStateVariableInfos.foreach { oldInfo =>
val newInfo = newStateVariableInfos.get(oldInfo.stateName)
newInfo match {
case Some(stateVarInfo) =>
if (oldInfo.stateVariableType != stateVarInfo.stateVariableType) {
throw StateStoreErrors.invalidVariableTypeChange(
stateVarInfo.stateName,
oldInfo.stateVariableType.toString,
stateVarInfo.stateVariableType.toString
)
}
case None =>
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,11 @@ object StateStoreErrors {
}
}

class StateStoreDuplicateStateVariableDefined(stateName: String)
class StateStoreDuplicateStateVariableDefined(stateVarName: String)
extends SparkRuntimeException(
errorClass = "STATEFUL_PROCESSOR_DUPLICATE_STATE_VARIABLE_DEFINED",
messageParameters = Map(
"stateName" -> stateName
"stateVarName" -> stateVarName
)
)

Expand All @@ -209,15 +209,16 @@ class StateStoreInvalidConfigAfterRestart(configName: String, oldConfig: String,
)
)

class StateStoreInvalidVariableTypeChange(stateName: String, oldType: String, newType: String)
class StateStoreInvalidVariableTypeChange(stateVarName: String, oldType: String, newType: String)
extends SparkUnsupportedOperationException(
errorClass = "STATE_STORE_INVALID_VARIABLE_TYPE_CHANGE",
messageParameters = Map(
"stateName" -> stateName,
"stateVarName" -> stateVarName,
"oldType" -> oldType,
"newType" -> newType
)
)

class StateStoreMultipleColumnFamiliesNotSupportedException(stateStoreProvider: String)
extends SparkUnsupportedOperationException(
errorClass = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES",
Expand Down

0 comments on commit 27e83d7

Please sign in to comment.