Skip to content

Conversation

@zifeif2
Copy link
Contributor

@zifeif2 zifeif2 commented Dec 12, 2025

What changes were proposed in this pull request?

Integrate the PartitionKeyExtractor introduced in this PR to StatePartitionAllColumnFamiliesReader. Before this change, StatePartitionAllColumnFamiliesReader returns the entire key value in partition_key field, and SchemaUtil will return keySchema as the partitionKey schema. After this change, StatePartitionAllColumnFamiliesReader will return the actual partition key, and SchemaUtil returns the actual partitionKey schema

Why are the changes needed?

When creating a StatePartitionAllColumnFamiliesReader, we need to pass along the stateFormatVersion and operator name.
In SchemaUtil, we will create a getExtractor helper function. It's used when getSourceSchema is called (for default column family), as well as when StatePartitionAllColumnFamiliesReader is initialized, as the reader will use the extractor to get partitionKey for different column families in iterator
We also added checks of partitionKey in reader suite

Does this PR introduce any user-facing change?

No

How was this patch tested?

See updated StatePartitionAllColumnFamiliesSuite. We have hard-coded function that extract partition key for different column families from normalDF, then we'll compare the extracted partition key against the partition key read from bytesDF

Was this patch authored or co-authored using generative AI tooling?

Yes. claude-4.5-opus

@zifeif2 zifeif2 force-pushed the integrate-key-extraction branch from 996e27b to d9a88b3 Compare December 16, 2025 00:20
@zifeif2 zifeif2 marked this pull request as ready for review December 16, 2025 00:28
@zifeif2 zifeif2 force-pushed the integrate-key-extraction branch from d9a88b3 to d234a97 Compare December 17, 2025 04:56
val stateVarInfo = stateVarInfoList.head
transformWithStateVariableInfoOpt = Some(stateVarInfo)
}
var stateVarInfoList = operatorProperties.stateVariables
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as previous version exception for indentation. We can now assign a transformWithStateVariableInfoOpt because stateVarName will always be a "valid" value after line 323 change

storeMetadata: Array[StateMetadataTableEntry]): Option[Int] = {
if (storeMetadata.nonEmpty &&
storeMetadata.head.operatorName == StatefulOperatorsUtils.SYMMETRIC_HASH_JOIN_EXEC_OP_NAME) {
Some(session.conf.get(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should read this from the current batch offset seq conf instead. buildStateStoreConf does similar.

The session here doesn't include the confs written in checkpoint, so can return wrong value

}
var stateVarInfoList = operatorProperties.stateVariables
.filter(stateVar => stateVar.stateName == stateVarName)
if (stateVarInfoList.isEmpty &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this anymore right. Since it won't be empty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be empty when it's a non-timer internal column. Updated the logic in the new version to make it more explicit

@zifeif2 zifeif2 force-pushed the integrate-key-extraction branch 2 times, most recently from 1738128 to e500018 Compare January 6, 2026 20:40
reduce duplicate code

all tests pass
Copy link
Contributor

@micheal-o micheal-o left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stamped but please address the comments. Thanks

// infos instead of validating a specific stateVarName. This skips the normal validation
// logic because we're not reading a specific state variable - we're reading all of them.
if (sourceOptions.internalOnlyReadAllColumnFamilies) {
} else if (sourceOptions.internalOnlyReadAllColumnFamilies) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not a separate if condition? Right now you are doing:

if {}
else if {}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm either will work, since we don't allow setting both readRegisteredTimers and intenralOnlyReadAllColumnFamilies at the smae time. I can change it in the next version


var stateVarInfoList = operatorProperties.stateVariables
.filter(stateVar => stateVar.stateName == stateVarName)
if (!TimerStateUtils.isTimerCFName(stateVarName) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this doesn't apply to timer CFs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, when readRegisteredTimers is set, stateVarName is set to the timer column. From line 333-334 above, we would have gotten the correct stateVarInfoList, thus won't need to assign it a dummy one like below.

          var stateVarInfoList = operatorProperties.stateVariables
            .filter(stateVar => stateVar.stateName == stateVarName)

val stateFormatVersion = getStateFormatVersion(storeMetadata, sourceOptions.resolvedCpLocation)
val allColFamilyReaderInfoOpt: Option[AllColumnFamiliesReaderInfo] =
if (sourceOptions.internalOnlyReadAllColumnFamilies) {
Option(AllColumnFamiliesReaderInfo(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Some

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious: When do we have one preference over the other? I only know from the style guide that we use Option to guard against null, but I don't think it applies here

@zifeif2 zifeif2 force-pushed the integrate-key-extraction branch from e500018 to 4a0e7cd Compare January 8, 2026 18:36
Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm pending couple nits

Yicong-Huang pushed a commit to Yicong-Huang/spark that referenced this pull request Jan 9, 2026
### What changes were proposed in this pull request?

Integrate the PartitionKeyExtractor introduced in [this PR](https://github.com/apache/spark/pull/53355/files) to StatePartitionAllColumnFamiliesReader. Before this change, StatePartitionAllColumnFamiliesReader returns the entire key value in partition_key field, and SchemaUtil will return `keySchema` as the partitionKey schema. After this change, StatePartitionAllColumnFamiliesReader will return the actual partition key, and SchemaUtil returns the actual partitionKey schema

### Why are the changes needed?

When creating a StatePartitionAllColumnFamiliesReader, we need to pass along the stateFormatVersion and operator name.
In SchemaUtil, we will create a `getExtractor` helper function. It's used when getSourceSchema is called (for default column family), as well as when StatePartitionAllColumnFamiliesReader is initialized, as the reader will use the extractor to get partitionKey for different column families in `iterator`
We also added checks of partitionKey in reader suite

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

See updated StatePartitionAllColumnFamiliesSuite. We have hard-coded function that extract partition key for different column families from normalDF, then we'll compare the extracted partition key against the partition key read from bytesDF

### Was this patch authored or co-authored using generative AI tooling?

Yes. claude-4.5-opus

Closes apache#53459 from zifeif2/integrate-key-extraction.

Authored-by: zifeif2 <zifeifeng11@gmail.com>
Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants