Skip to content

Commit

Permalink
refactor test suite
Browse files Browse the repository at this point in the history
  • Loading branch information
jingz-db committed Mar 12, 2024
1 parent da1ff6d commit cd8b827
Showing 1 changed file with 28 additions and 20 deletions.
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming

import org.apache.spark.sql.{Encoders, KeyValueGroupedDataset}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.execution.streaming.state.{RocksDBStateStoreProvider, StateStoreMultipleColumnFamiliesNotSupportedException}
import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider, StateStoreMultipleColumnFamiliesNotSupportedException}
import org.apache.spark.sql.internal.SQLConf

case class InitInputRow(key: String, action: String, value: Double)
Expand Down Expand Up @@ -95,7 +95,8 @@ class AccumulateStatefulProcessorWithInitState extends StatefulProcessorWithInit
* Class that adds tests for transformWithState stateful
* streaming operator with user-defined initial state
*/
class TransformWithStateInitialStateSuite extends StateStoreMetricsTest {
class TransformWithStateInitialStateSuite extends StateStoreMetricsTest
with AlsoTestWithChangelogCheckpointingEnabled {

import testImplicits._

Expand All @@ -105,24 +106,6 @@ class TransformWithStateInitialStateSuite extends StateStoreMetricsTest {
.mapValues(x => x)
}

test("transformWithStateWithInitialState - streaming with hdfsStateStoreProvider should fail") {
val inputData = MemoryStream[InitInputRow]
val result = inputData.toDS()
.groupByKey(x => x.key)
.transformWithState(new
StatefulProcessorWithInitialStateTestClass(),
TimeoutMode.NoTimeouts(), OutputMode.Append(), createInitialDfForTest
)
testStream(result, OutputMode.Update())(
AddData(inputData, InitInputRow("a", "update", -1.0)),
ExpectFailure[StateStoreMultipleColumnFamiliesNotSupportedException] {
(t: Throwable) => {
assert(t.getMessage.contains("not supported"))
}
}
)
}

test ("transformWithStateWithInitialState - " +
"correctness test, run with multiple state variables") {
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
Expand Down Expand Up @@ -219,3 +202,28 @@ class TransformWithStateInitialStateSuite extends StateStoreMetricsTest {
checkAnswer(df, Seq(("k1", "getOption", 37.0)).toDF())
}
}

class TransformWithStateInitialStateValidationSuite extends StateStoreMetricsTest {
import testImplicits._

test("transformWithStateWithInitialState - streaming with hdfsStateStoreProvider should fail") {
val inputData = MemoryStream[InitInputRow]
val initDf = Seq(("init_1", 40.0), ("init_2", 100.0)).toDS()
.groupByKey(x => x._1)
.mapValues(x => x)
val result = inputData.toDS()
.groupByKey(x => x.key)
.transformWithState(new
StatefulProcessorWithInitialStateTestClass(),
TimeoutMode.NoTimeouts(), OutputMode.Append(), initDf
)
testStream(result, OutputMode.Update())(
AddData(inputData, InitInputRow("a", "update", -1.0)),
ExpectFailure[StateStoreMultipleColumnFamiliesNotSupportedException] {
(t: Throwable) => {
assert(t.getMessage.contains("not supported"))
}
}
)
}
}

0 comments on commit cd8b827

Please sign in to comment.