Skip to content

Conversation

@zifeif2
Copy link

@zifeif2 zifeif2 commented Nov 18, 2025

What changes were proposed in this pull request?

Introducing a new StatePartitionReader - StatePartitionReaderAllColumnFamilies to support offline repartition.
StatePartitionReaderAllColumnFamilies is invoked when user specify option readAllColumnFamilies to true.

We have the StateDataSource Reader, which allows customers to read the rows in an operator state store using the DataFrame API, just like they read a normal table. But it currently only supports reading one column family in the state store at a time.

We would introduce a change to allow reading all the state rows in all the column families, so that we can repartition them at once. This would allow us to read the entire state store, repartition the rows, and then save the new repartition state rows to the cloud. This also has a perf impact, since we don’t have to read each column family separately. We would read the state based on the last committed batch version.

Since each column family can have a different schema, the DataFrame we will return will treat the key and value row as bytes -

  • partition_key (string)
  • key_bytes (binary)
  • value_bytes (binary)
  • column_family_name (string)

Why are the changes needed?

See above

Does this PR introduce any user-facing change?

No

How was this patch tested?

See unit test. It not only verify the schema, but also validate the data are serialized to bytes correctly by comparing them against the normal queried data frame

Was this patch authored or co-authored using generative AI tooling?

Yes. haiku, sonnet.

@zifeif2 zifeif2 changed the title [WIP] [SPARK-54388][SS] Introduce StatePartitionReader that scan raw bytes for Single ColFamily [SPARK-54388][SS] Introduce StatePartitionReader that scan raw bytes for Single ColFamily Nov 18, 2025
Copy link
Contributor

@micheal-o micheal-o left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a quick pass. It is in the right direction. Just needs some changes.

@zifeif2 zifeif2 force-pushed the repartition-reader-single-cf branch from 42540b1 to 0a878e9 Compare November 19, 2025 02:54
@zifeif2 zifeif2 force-pushed the repartition-reader-single-cf branch from c10eed0 to 99e2412 Compare November 24, 2025 20:08
@micheal-o
Copy link
Contributor

Also CI is failing due to linter error for your changes. PTAL

@zifeif2 zifeif2 force-pushed the repartition-reader-single-cf branch from 194364d to b46e8d1 Compare November 26, 2025 08:18
Copy link
Contributor

@micheal-o micheal-o left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did another round of review. It is almost there. Thanks

@micheal-o
Copy link
Contributor

@zifeif2 please also fix the CI failure.

@zifeif2 zifeif2 force-pushed the repartition-reader-single-cf branch from 0f8f7d3 to 392d498 Compare December 1, 2025 17:07
@zifeif2 zifeif2 force-pushed the repartition-reader-single-cf branch from 392d498 to 48521c3 Compare December 1, 2025 22:33
Copy link
Contributor

@micheal-o micheal-o left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stamped with some minor comments. Mostly looks good now. Thanks

val STATE_VAR_NAME = newOption("stateVarName")
val READ_REGISTERED_TIMERS = newOption("readRegisteredTimers")
val FLATTEN_COLLECTION_TYPES = newOption("flattenCollectionTypes")
val INTERNAL_ONLY_READ_ALL_COLUMN_FAMILIES = newOption("internalOnlyReadAllColumnFamilies")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to expose an option at the state data source ? We only need to expose an interface at the partition reader level right ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anishshri-db it allows us to reuse alot of the existing state data source code for impl and testing. So we don't have to duplicate code. It also allows dataframe transformation on the data easier.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain a bit more regarding to exposing an interface at the partition reader level?
I am thinking about the interface of using StatePartitionAllColumnFamiliesReader as the following

            spark.read
              .format("statestore")
              .option(StateSourceOptions.PATH, tempDir.getAbsolutePath)
              .option(StateSourceOptions.INTERNAL_ONLY_READ_ALL_COLUMN_FAMILIES, "true")
              .load()
              .collect()

Therefore, we need to expose this option in StateDataSource.

Seq(INTERNAL_ONLY_READ_ALL_COLUMN_FAMILIES, JOIN_SIDE))
}

if (internalOnlyReadAllColumnFamilies && readChangeFeed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prob don't need this if we dont expose an option ?

"Boolean value is expected")
}

if (internalOnlyReadAllColumnFamilies && stateVarName.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets also note here that this path should only be invoked by internal callers - for eg - repartitioning

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok! Added a comment above it

@zifeif2 zifeif2 force-pushed the repartition-reader-single-cf branch from 582b0da to 2047eec Compare December 2, 2025 23:13
@zifeif2 zifeif2 force-pushed the repartition-reader-single-cf branch from 2047eec to f6c7b6c Compare December 2, 2025 23:14
Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm pending green CI

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants