Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43421][SS] Implement Changelog based Checkpointing for RocksDB State Store Provider #41099

Closed
wants to merge 42 commits into from

Conversation

chaoqin-li1123
Copy link
Contributor

@chaoqin-li1123 chaoqin-li1123 commented May 9, 2023

What changes were proposed in this pull request?

In order to reduce the checkpoint duration and end to end latency, we propose Changelog Based Checkpointing for RocksDB State Store Provider. Below is the mechanism.

  1. Changelog checkpoint: Upon each put() delete() call to local rocksdb instance, log the operation to a changelog file. During the state change commit, sync the compressed change log of the current batch to DFS as checkpointDir/{version}.delta.
  2. Version reconstruction: For version j, find latest snapshot i.zip such that i <= j, load snapshot i, and replay i+1.delta ~ j.delta. This is used in loading the initial state as well as creating the latest version snapshot. Note: If a query is shutdown without exception, there won’t be changelog replay during query restart because a maintenance task is executed before the state store instance is unloaded.
  3. Background snapshot: A maintenance thread in executors will launch maintenance tasks periodically. Inside the maintenance task, sync the latest RocksDB local snapshot to DFS as checkpointDir/{version}.zip. Snapshot enables faster failure recovery and allows old versions to be purged.
  4. Garbage collection: Inside the maintenance task, delete snapshot and delta files from DFS for versions that is out of retention range(default retained version number is 100)

Why are the changes needed?

We have identified state checkpointing latency as one of the major performance bottlenecks for stateful streaming queries. Currently, RocksDB state store pauses the RocksDB instances to upload a snapshot to the cloud when committing a batch, which is heavy weight and has unpredictable performance.
With changelog based checkpointing, we allow the RocksDB instance to run uninterruptibly, which improves RocksDB operation performance. This also dramatically reduces the commit time and batch duration because we are uploading a smaller amount of data during state commit. With this change, stateful query with RocksDB state store will have lower and more predictable latency.

How was this patch tested?

Add unit test for changelog checkpointing utility.
Add unit test and integration test that check backward compatibility with existing checkpoint.
Enable RocksDB state store unit test and stateful streaming query integration test to run with changelog checkpointing enabled.

@chaoqin-li1123 chaoqin-li1123 changed the title [SS] Implement Changelog based Checkpointing for RocksDB State Store Provider [SPARK-43421][SS] Implement Changelog based Checkpointing for RocksDB State Store Provider May 9, 2023
@chaoqin-li1123
Copy link
Contributor Author

@HeartSaVioR

assert(changelogWriter.isDefined)
val newVersion = loadedVersion + 1
newVersion - fileManager.getLastUploadedSnapshotVersion() >= conf.minDeltasForSnapshot ||
changelogWriter.get.size > 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1000 entries feel relatively small to trigger a snapshop upload. Consider to remove the limit, or make it very large, such as 1 million.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not trigger a snapshot upload, it simply flush and create a local checkpoint. I can increase it to 10K, which guarantee that the changelog replay between every 2 snapshot is < 50k records.

@@ -56,6 +56,15 @@ class RocksDB(
hadoopConf: Configuration = new Configuration,
loggingId: String = "") extends Logging {

case class RocksDBCheckpoint(checkpointDir: File, version: Long, numKeys: Long) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is case class preferred pattern for something like this ? cc - @HeartSaVioR

@anishshri-db
Copy link
Contributor

@chaoqin-li1123 - is the test failure related to the change ?

 - Handle unsupported input of message type *** FAILED *** (12 milliseconds)

@@ -279,7 +280,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
val timeoutDuration = 1.minute

quietly {
withSpark(new SparkContext(conf)) { sc =>
withSpark(SparkContext.getOrCreate(conf)) { sc =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this change ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because now we use a shared spark session, creating a new spark context will have conflict.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. SparkContext.getOrCreate() does not allow you to set the config if there is active SparkContext. If there is no active SparkContext then config takes effect. Hopefully we don't seem to set any configs so we don't mess up anything, but worth noting.
  2. If you stop active SparkContext (withSpark), SparkSession containing SparkContext will also be merely not functioning till you set the SparkContext again.

That said, if you have a test which intends to spin a new SparkContext, it (test suite) should not extend shared SparkSession. There is another trait named LocalSparkContext which spins up and tears down a new SparkContext per test, but this may be also overkill for this test suite because only a few tests require SparkContext instance to be set.

I'd say let's roll back unless your change requires SparkSession to be set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like you had to do this because withSQLConf may require to have SparkSession. But I suspect this test won't use SQLConf being initialized in shared SparkSession, which makes withSQLConf be no-op.

Have you checked that all tests are running intentionally, testing both incremental checkpointing and changelog checkpointing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mean to override the config here, the reason why I need this is because I extends SharedSparkSession, which already have an active spark context, if I don't modify the test, it will throw an error when trying to create a new spark context.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done first pass in code part. Looking through tests.

One major concern about tests is that we define trait which runs test for two different config values, leveraging withSQLConf, and blindly apply existing test suites. I suspect many tests may not even pick up the change, because they "intentionally" do not use config in shared SparkSession. The test suite even didn't use shared SparkSession at all.

Can we double-check on that?

@@ -2389,6 +2394,12 @@ If you want to cap RocksDB memory usage in your Spark Structured Streaming deplo
You can also determine the max allowed memory for RocksDB instances by setting the `spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB` value to a static number or as a fraction of the physical memory available on the node.
Limits for individual RocksDB instances can also be configured by setting `spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB` and `spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber` to the required values. By default, RocksDB internal defaults are used for these settings.

##### RocksDB State Store Changelog Checkpointing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we provide higher-level of description how this works? We even don't explain what does changelog means.

I understand we have no explanation for changelog checkpointing for HDFS backed state store provider which is unfortunate, but for RocksDB state store provider, users have to make a decision whether to use old one (incremental checkpointing) or new one, which requires them to understand the characteristics of two options before choosing one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drafted a newer version with chatgpt, PTAL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks OK :)

docs/structured-streaming-programming-guide.md Outdated Show resolved Hide resolved
@@ -865,10 +1076,14 @@ class RocksDBSuite extends SparkFunSuite {
}
}

private def sqlConf = SQLConf.get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel like this is a good idea in terms of isolation. The reason we do withSQLConf is simple - we don't want to leave the modified config to be applied for remaining tests. Unlike the intention of using def, this will give the same config instance across multiple calls, because SparkSession is shared among test in this suite, and any modification will affect all remaining tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it is fine in rocksdb unit test because we modify a small number of configs here. I do it so that I can propagate sql conf of changelog checkpointing conveniently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather say, clone the config and use cloned one rather than modifying the config in session directly. If it does break any tests I'd consider it as not properly isolated.

@@ -865,10 +1076,14 @@ class RocksDBSuite extends SparkFunSuite {
}
}

private def sqlConf = SQLConf.get

private def dbConf = RocksDBConf(StateStoreConf(sqlConf))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, doesn't seem to be good in terms of isolation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fine because it creates a new RocksDBConf every time, changed the above code to clone the sqlConf

@@ -279,7 +280,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
val timeoutDuration = 1.minute

quietly {
withSpark(new SparkContext(conf)) { sc =>
withSpark(SparkContext.getOrCreate(conf)) { sc =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like you had to do this because withSQLConf may require to have SparkSession. But I suspect this test won't use SQLConf being initialized in shared SparkSession, which makes withSQLConf be no-op.

Have you checked that all tests are running intentionally, testing both incremental checkpointing and changelog checkpointing?

@chaoqin-li1123
Copy link
Contributor Author

I can verify that all the streaming query integration tests we override inherits StateStoreMetricsTest, which use a shared spark session. And they also use SQL conf to override state format version, so they should also respect the changelog checkpointing config we set.
RocksDBStateStoreIntegrationSuite extends StreamTest, which have a shared spark session and use sql conf.
I do find that RocksDBStateStore suite ignore the sqlconf, have fixed that.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly minors. Thanks for the patience!

SQLConf.get.getConfString(rocksdbChangelogCheckpointingConfKey) == "true"
}

class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with SharedSparkSession {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we extract out tests which do not require executing two times? Either separate suite, or some tag/annotation on the test. I'll comment as "check" which I figure out, but would be nice if you can revisit tests and do more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add testWithChangelogCheckpointingEnabled and testWithChangelogCheckpointingDisabled for tests that don't need to be duplicated.

}
}

test("RocksDBFileManager: read and write changelog") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check

@@ -588,28 +822,6 @@ class RocksDBSuite extends SparkFunSuite {
verifyMetrics(putCount = 2, getCount = 3, metrics = db.metrics)
}
}

// force compaction and check the compaction metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we retain this part of test and be selectively executed for incremental checkpoint? Please try not to remove the test where the functionality is not removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(The config is even applied to the snapshot of changelog checkpointing, although we do not do that in every microbatch hence not easy to test that. OK to skip test for changelog checkpointing.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added back and test with changelog checkpointing disabled.

@@ -865,10 +1076,14 @@ class RocksDBSuite extends SparkFunSuite {
}
}

private def sqlConf = SQLConf.get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather say, clone the config and use cloned one rather than modifying the config in session directly. If it does break any tests I'd consider it as not properly isolated.

extends StreamingAggregationSuite with RocksDBStateStoreTest {
import testImplicits._

def snapshotVersionsPresent(dir: File): Seq[Long] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to have an helper object for RocksDB test to deduplicate this and below methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to a TestUtil trait.

.sorted
}

test("Streaming aggregation RocksDB State Store backward compatibility.") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can move to RocksDB side suite. Let's not put provider-specific test to the operator test suite. RocksDBStateStoreIntegrationSuite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to RocksDBStateStoreIntegrationSuite.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending CI. Thanks for the work!

@HeartSaVioR
Copy link
Contributor

@chaoqin-li1123
Could you please look at how long the module [Run / Build modules: sql - other tests](https://github.com/chaoqin-li1123/spark/actions/runs/5135977794/jobs/9242562743#logs) takes in latest master branch?

I think tests we are adding here should not increase the build time like an hour. If it has been already exceeding 5.5 hours or around then we will probably have to look at how to deal with (one probably easy way is moving test suites to the module for sql - slow tests), but if it doesn't take like so, I would doubt the tests we touch are functioning really well.

@HeartSaVioR
Copy link
Contributor

@chaoqin-li1123
I see you've made another attempt and it passed. Thanks for doing that!
I'll merge to unblock, but let's file a JIRA ticket and investigate to ensure that this change is NOT an offending commit.

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

@chaoqin-li1123
Copy link
Contributor Author

I suspect 5.5 hours is an accident in the test runner side(it has never happened before), but sure we can do some investigation.

@HeartSaVioR
Copy link
Contributor

Yeah... it would be awesome if we can do some manual comparison between attempt #1 vs #2 via looking into log file and track the elapsed time for test suite. We actually saw the timeout from previous commits in this PR as well (what we did for fixing that btw?), so we take the responsibility for looking into it, at least giving a try.

@chaoqin-li1123
Copy link
Contributor Author

I don't know why there was timeout in the previous commit. But it was gone after I removed the new streaming join suite that uses rocksdb state store.
I will try to see what happen to the 5 hours ci, I suspect that is because of unavailability in github testing infra. @HeartSaVioR

czxm pushed a commit to czxm/spark that referenced this pull request Jun 12, 2023
… State Store Provider

### What changes were proposed in this pull request?
In order to reduce the checkpoint duration and end to end latency, we propose Changelog Based Checkpointing for RocksDB State Store Provider. Below is the mechanism.
1. Changelog checkpoint: Upon each put() delete() call to local rocksdb instance, log the operation to a changelog file. During the state change commit,  sync the compressed change log of the current batch to DFS as checkpointDir/{version}.delta.
2. Version reconstruction: For version j, find latest snapshot i.zip such that i <= j, load snapshot i, and replay i+1.delta ~ j.delta. This is used in loading the initial state as well as creating the latest version snapshot. Note: If a query is shutdown without exception, there won’t be changelog replay during query restart because a maintenance task is executed before the state store instance is unloaded.
3. Background snapshot: A maintenance thread in executors will launch maintenance tasks periodically. Inside the maintenance task, sync the latest RocksDB local snapshot to DFS as checkpointDir/{version}.zip. Snapshot enables faster failure recovery and allows old versions to be purged.
4. Garbage collection: Inside the maintenance task, delete snapshot and delta files from DFS for versions that is out of retention range(default retained version number is 100)

### Why are the changes needed?
We have identified state checkpointing latency as one of the major performance bottlenecks for stateful streaming queries. Currently, RocksDB state store pauses the RocksDB instances to upload a snapshot to the cloud when committing a batch, which is heavy weight and has unpredictable performance.
With changelog based checkpointing, we allow the RocksDB instance to run uninterruptibly, which improves RocksDB operation performance. This also dramatically reduces the commit time and batch duration because we are uploading a smaller amount of data during state commit. With this change, stateful query with RocksDB state store will have lower and more predictable latency.

### How was this patch tested?
Add unit test for changelog checkpointing utility.
Add unit test and integration test that check backward compatibility with existing checkpoint.
Enable RocksDB state store unit test and stateful streaming query integration test to run with changelog checkpointing enabled.

Closes apache#41099 from chaoqin-li1123/changelog.

Authored-by: Chaoqin Li <chaoqin.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants