Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3327,24 +3327,15 @@ object SQLConf {
.doc("State format version used by streaming join operations in a streaming query. " +
"State between versions are tend to be incompatible, so state format version shouldn't " +
"be modified after running. Version 3 uses a single state store with virtual column " +
"families instead of four stores and is only supported with RocksDB. NOTE: version " +
"1 is DEPRECATED and should not be explicitly set by users. " +
"Version 4 is under development and only available for testing.")
"families instead of four stores and is only supported with RocksDB. Version 4 optimizes " +
"watermark-based eviction, also using a single state store with virtual " +
"column families and only supported with RocksDB. NOTE: version 1 is DEPRECATED and " +
"should not be explicitly set by users.")
.version("3.0.0")
.intConf
.checkValue(v => Set(1, 2, 3, 4).contains(v), "Valid versions are 1, 2, 3, and 4")
.createWithDefault(2)

val STREAMING_JOIN_STATE_FORMAT_V4_ENABLED =
buildConf("spark.sql.streaming.join.stateFormatV4.enabled")
.internal()
.doc("When true, enables state format version 4 for stream-stream joins. " +
"This config will be removed once V4 is complete.")
.version("4.2.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefaultFunction(() => Utils.isTesting)

val STREAMING_SESSION_WINDOW_MERGE_SESSIONS_IN_LOCAL_PARTITION =
buildConf("spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition")
.doc("When true, streaming session window sorts and merge sessions in local partition " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2045,8 +2045,6 @@ object SymmetricHashJoinStateManager {
joinStoreGenerator: JoinStateManagerStoreGenerator,
joinKeyOrdinalForWatermark: Option[Int] = None): SymmetricHashJoinStateManager = {
if (stateFormatVersion == 4) {
require(SQLConf.get.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_V4_ENABLED),
"State format version 4 is under development.")
new SymmetricHashJoinStateManagerV4(
joinSide, inputValueAttributes, joinKeys, stateInfo, storeConf, hadoopConf,
partitionId, keyToNumValuesStateStoreCkptId, keyWithIndexToValueStateStoreCkptId,
Expand Down