diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9e1e0d5d5f958..f474f0f0b4309 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3327,24 +3327,15 @@ object SQLConf { .doc("State format version used by streaming join operations in a streaming query. " + "State between versions are tend to be incompatible, so state format version shouldn't " + "be modified after running. Version 3 uses a single state store with virtual column " + - "families instead of four stores and is only supported with RocksDB. NOTE: version " + - "1 is DEPRECATED and should not be explicitly set by users. " + - "Version 4 is under development and only available for testing.") + "families instead of four stores and is only supported with RocksDB. Version 4 optimizes " + + "watermark-based eviction, also using a single state store with virtual " + + "column families and only supported with RocksDB. NOTE: version 1 is DEPRECATED and " + + "should not be explicitly set by users.") .version("3.0.0") .intConf .checkValue(v => Set(1, 2, 3, 4).contains(v), "Valid versions are 1, 2, 3, and 4") .createWithDefault(2) - val STREAMING_JOIN_STATE_FORMAT_V4_ENABLED = - buildConf("spark.sql.streaming.join.stateFormatV4.enabled") - .internal() - .doc("When true, enables state format version 4 for stream-stream joins. " + - "This config will be removed once V4 is complete.") - .version("4.2.0") - .withBindingPolicy(ConfigBindingPolicy.SESSION) - .booleanConf - .createWithDefaultFunction(() => Utils.isTesting) - val STREAMING_SESSION_WINDOW_MERGE_SESSIONS_IN_LOCAL_PARTITION = buildConf("spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition") .doc("When true, streaming session window sorts and merge sessions in local partition " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala index 02c9ef11df89c..ea8fc861b1044 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala @@ -2045,8 +2045,6 @@ object SymmetricHashJoinStateManager { joinStoreGenerator: JoinStateManagerStoreGenerator, joinKeyOrdinalForWatermark: Option[Int] = None): SymmetricHashJoinStateManager = { if (stateFormatVersion == 4) { - require(SQLConf.get.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_V4_ENABLED), - "State format version 4 is under development.") new SymmetricHashJoinStateManagerV4( joinSide, inputValueAttributes, joinKeys, stateInfo, storeConf, hadoopConf, partitionId, keyToNumValuesStateStoreCkptId, keyWithIndexToValueStateStoreCkptId,