Skip to content

[SPARK-35988][SS] The implementation for RocksDBStateStoreProvider#33187

Closed
xuanyuanking wants to merge 7 commits intoapache:masterfrom
xuanyuanking:SPARK-35988
Closed

[SPARK-35988][SS] The implementation for RocksDBStateStoreProvider#33187
xuanyuanking wants to merge 7 commits intoapache:masterfrom
xuanyuanking:SPARK-35988

Conversation

@xuanyuanking
Copy link
Member

What changes were proposed in this pull request?

Add the implementation for the RocksDBStateStoreProvider. It's the subclass of StateStoreProvider that leverages all the functionalities implemented in the RocksDB instance.

Why are the changes needed?

The interface for the end-user to use the RocksDB state store.

Does this PR introduce any user-facing change?

Yes. New RocksDBStateStore can be used in their applications.

How was this patch tested?

New UT added.

@xuanyuanking xuanyuanking changed the title [SPARK-35988][SS] The implementation for RocksDBStateStoreProvider [WIP][SPARK-35988][SS] The implementation for RocksDBStateStoreProvider Jul 2, 2021
@SparkQA
Copy link

SparkQA commented Jul 2, 2021

Test build #140577 has finished for PR 33187 at commit af24bd9.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class RocksDBStateStore(val lastVersion: Long) extends StateStore
  • class StateEncoder

@SparkQA
Copy link

SparkQA commented Jul 2, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45088/

@SparkQA
Copy link

SparkQA commented Jul 2, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45098/

@SparkQA
Copy link

SparkQA commented Jul 2, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45098/

@SparkQA
Copy link

SparkQA commented Jul 2, 2021

Test build #140587 has finished for PR 33187 at commit f59a9e9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks OK, but I'd like to make sure @viirya is also OK with the change, as I'm familiar with this change and might be biased.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be referenced by version so no need to be val.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in 9ddbf58

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the characters (=,()) being used in storeIdStr are odd, but this works in MacOS and I think it would also work in Linux file systems, so OK.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the dir contains =() works in the Linux file system.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This simply works as there's only one version (version 0) - we'll need to check the version when we add a new state encoding version.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, agree. For the new encoding version, we should have branches here for different versions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can deduplicate here; I can deal with it in prefix scan for RocksDB state store as I'll bring broader change for state encoding.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove one of two empty lines

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in 9ddbf58

@HeartSaVioR
Copy link
Contributor

NOTE: This might have some post-review comments from #32934 .

@HeartSaVioR
Copy link
Contributor

Finally we only have this one for RocksDB state store provider! Please rebase this so that we can continue.

@xuanyuanking xuanyuanking changed the title [WIP][SPARK-35988][SS] The implementation for RocksDBStateStoreProvider [SPARK-35988][SS] The implementation for RocksDBStateStoreProvider Jul 6, 2021
@xuanyuanking
Copy link
Member Author

Yes, finally!
Addressed the comments and rebased the code.

@SparkQA
Copy link

SparkQA commented Jul 6, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45200/

@SparkQA
Copy link

SparkQA commented Jul 6, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45208/

@SparkQA
Copy link

SparkQA commented Jul 6, 2021

Test build #140685 has finished for PR 33187 at commit 9ddbf58.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class RocksDBStateStore(lastVersion: Long) extends StateStore

@SparkQA
Copy link

SparkQA commented Jul 6, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45208/

@SparkQA
Copy link

SparkQA commented Jul 6, 2021

Test build #140697 has finished for PR 33187 at commit a886567.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 6, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45218/

@SparkQA
Copy link

SparkQA commented Jul 6, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45218/

@SparkQA
Copy link

SparkQA commented Jul 6, 2021

Test build #140707 has finished for PR 33187 at commit e63a380.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code change looks good to me. Great!

One thing we haven't been addressed yet is documentation. End users have no idea how to use this provider. Probably we should introduce RocksDB state store provider in SS guide doc.

It's OK if you prefer to file a "blocker" JIRA issue to address documentation separately. I can approve this one then.

assert(store.hasCommitted)
val storeMetrics = store.metrics
assert(storeMetrics.numKeys === 1)
assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_FILES_COPIED) == 1L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I guess we can loose the condition via > 0L. "The number of files is 1" doesn't seem to something we need to check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, will update in the next commit.

assert(getSizeOfStateForCurrentVersion(store.metrics) > noDataMemoryUsed)
}

test("maintenance") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a head-up: "maintenance" and "snapshotting" are moved from StateStoreSuiteBase to StateStoreSuite as these tests cannot be applied to RocksDB state store provider.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, since these tests have the assumption that the provider have a baseDir

@xuanyuanking
Copy link
Member Author

One thing we haven't been addressed yet is documentation. End users have no idea how to use this provider. Probably we should introduce RocksDB state store provider in SS guide doc.

It's OK if you prefer to file a "blocker" JIRA issue to address documentation separately. I can approve this one then.

Sure, create the blocker issue SPARK-36041 for tracking the documentation.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

I'll merge once the test passes.

My apologize on not waiting for reviewing on others, we are far behind on progress (compared to the planned date on RC) and this PR is somehow a blocker for session window stuff. The code has been running on production for years so it won't make some problems, but we are open to deal with post-reviews during QA phase.

@SparkQA
Copy link

SparkQA commented Jul 8, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45291/

@SparkQA
Copy link

SparkQA commented Jul 8, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45291/


override def id: StateStoreId = RocksDBStateStoreProvider.this.stateStoreId

override def version: Long = lastVersion
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend to be consistent with HDFSBackedStateStore where it uses version and newVersion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually for RocksDB, we keep the name newVersion. The major difference here is we have a loadedVersion concept, so that's why for the provider side, we have a latestVersion.

verify(state == UPDATING, "Cannot put after already committed or aborted")
verify(key != null, "Key cannot be null")
require(value != null, "Cannot put a null value")
logDebug(s"Storing $key => $value")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this log debug is necessary. Especially key/value are unsafe rows.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, let me delete this debug log.


/**
* Encodes/decodes UnsafeRows to versioned byte arrays.
* It uses the first byte of the generated byte array to store the version the describes how the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is a typo. "the" -> "that".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yea, thanks!

Platform.BYTE_ARRAY_OFFSET + STATE_ENCODING_NUM_VERSION_BYTES,
keyBytes.length - STATE_ENCODING_NUM_VERSION_BYTES)
keyRow
} else null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I rarely see style like this in Spark.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, done in the next commit.

val CUSTOM_METRIC_CHECKPOINT_TIME = StateStoreCustomTimingMetric(
"rocksdbCommitCheckpointLatency", "RocksDB: commit - checkpoint time")
val CUSTOM_METRIC_FILESYNC_TIME = StateStoreCustomTimingMetric(
"rocksdbCommitPauseBgTime", "RocksDB: commit - file sync time")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rocksdbCommitPauseBgTime? Or FileSyncTime?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, changed to rocksdbFileSyncTime

val rocksDBConfInTask: RocksDBConf = testRDD.mapPartitionsWithStateStore[RocksDBConf](
spark.sqlContext, testStateInfo, testSchema, testSchema, None) {
(store: StateStore, _: Iterator[String]) =>
// Use reflection to get RockDB instance
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: RockDB -> RocksDB

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in the next commit.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Some minor comments. Feel free to address them in follow-up if you prefer to get this in first.

@HeartSaVioR
Copy link
Contributor

I'm OK to wait for addressing these comments - I guess we can still merge this in this week, then it's OK for me.

I'm already working on rebasing #33038 on top of this PR. Looks like I need to bring another bunch of lines in #33038 to address RocksDB state store.

@SparkQA
Copy link

SparkQA commented Jul 8, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45303/

@SparkQA
Copy link

SparkQA commented Jul 8, 2021

Test build #140779 has finished for PR 33187 at commit 1ac5835.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Thanks! Merging to master/3.2!

HeartSaVioR pushed a commit that referenced this pull request Jul 8, 2021
### What changes were proposed in this pull request?
Add the implementation for the RocksDBStateStoreProvider. It's the subclass of StateStoreProvider that leverages all the functionalities implemented in the RocksDB instance.

### Why are the changes needed?
The interface for the end-user to use the RocksDB state store.

### Does this PR introduce _any_ user-facing change?
Yes. New RocksDBStateStore can be used in their applications.

### How was this patch tested?
New UT added.

Closes #33187 from xuanyuanking/SPARK-35988.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 0621e78)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
@HeartSaVioR
Copy link
Contributor

Thanks @xuanyuanking for contribution! Merged to master/3.2 branches.

@xuanyuanking xuanyuanking deleted the SPARK-35988 branch July 8, 2021 13:23
@xuanyuanking
Copy link
Member Author

Thanks @HeartSaVioR and @viirya for all the help and review!
Finally, all the major implementations of the RocksDB state store have been merged!

@SparkQA
Copy link

SparkQA commented Jul 8, 2021

Test build #140790 has finished for PR 33187 at commit b6a9d0a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants