Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28120][SS] Rocksdb state storage implementation #24922

Open
wants to merge 24 commits into
base: master
from

Conversation

@itsvikramagr
Copy link

commented Jun 20, 2019

What changes were proposed in this pull request?

SPARK-13809 introduced a framework for state management for computing Streaming Aggregates. The default implementation was in-memory hashmap which was backed up in HDFS complaint file system at the end of every micro-batch.

Current implementation suffers from Performance and Latency Issues. It uses Executor JVM memory to store the states. State store size is limited by the size of the executor memory. Also
Executor JVM memory is shared by state storage and other tasks operations. State storage size will impact the performance of task execution

Moreover, GC pauses, executor failures, OOM issues are common when the size of state storage increases which increases overall latency of a micro-batch

RocksDB is a storage engine with key/value interface based on levelDB. New writes are inserted into the memtable; when memtable fills up, it flushes the data on local storage. It supports both point lookups and range scans, and provides different types of ACID guarantees and is optimized for flash storage.

In this PR I have implemented Rocksdb based state storage for Structured streaming which will provide major performance improvements for stateful stream processing.

Implementation details

Creation of new State (For batch x and partition Pi)

  • if Node(Pi, x) = Node(Pi, x-1) : state is already loaded in rocksDb
  • Else if Node(Pi, x) = Node(Pi, x-2) : update rocksDb state using downloaded Delta(Pi, X-1)
    Otherwise create new rocksDB store using checkpointed data (snapshot + delta)

During Batch Execution

  • Open rocksdb in transactional mode
  • Commit the transaction, Upload delta file into checkpoint folder and Create a backup of current Db state in local storage on successful completion of the batch
  • abort the transaction on any error

Snapshot creation (Maintenance task)

  • Create a tarball of last backed up DB state and upload it to the checkpoint folder

A detailed presentation on the implementation can be found here

How was this patch tested?

Added new unit tests which extend existing abstract class StateStoreSuiteBase. (There is one UT failure which is related to metrics reporting in UT framework. Looking at it)

sql/core/pom.xml Show resolved Hide resolved

@itsvikramagr itsvikramagr changed the title [Spark 28120][SS] Rocksdb state storage implementation [SPARK-28120][SS] Rocksdb state storage implementation Jun 21, 2019

@itsvikramagr

This comment has been minimized.

Copy link
Author

commented Jun 22, 2019

@itsvikramagr

This comment has been minimized.

Copy link
Author

commented Jun 22, 2019

@dongjoon-hyun

This comment has been minimized.

Copy link
Member

commented Jun 22, 2019

ok to test

@dongjoon-hyun

This comment has been minimized.

Copy link
Member

commented Jun 22, 2019

Thank you for your first contribution, @itsvikramagr !

@SparkQA

This comment has been minimized.

Copy link

commented Jun 22, 2019

Test build #106795 has finished for PR 24922 at commit 292befe.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@gaborgsomogyi

This comment has been minimized.

Copy link
Contributor

commented Jun 24, 2019

@itsvikramagr thanks for the hard work! The presentation rocks and helps to understand the design but it will take some time to analyze things properly. I think it's a good feature so will review it through.
Initially I have these suggestions/questions:

  • Please resolve style problems (not just the ones which made the build fail, please see contribution guide for details. As an example number of tabs are incorrect consistently when parameters are broken into multiple lines)
  • Do we have some numbers about the performance impact?
  • Related testing part do I understand correctly you've not tested it on cluster with heavy load (and maybe with artificial exceptions)?
@itsvikramagr

This comment has been minimized.

Copy link
Author

commented Jun 24, 2019

Thanks @gaborgsomogyi

  • Will fix the style problem asap and update the PR
  • In my test setup, I was able to scale to more than 250 million keys using just 2 i3.xlarge executor nodes by running a group by aggregation query on campaign data source generated using rate source.
    I stopped my experiment after 5 hours. GC time was about 1.5% of the total task time (see attached). In the same setup, default implementation crashed after creating 35 million new state keys
  • I ran my experiments with varying load and under different stress condition. Please recommend more scenarios which you think I should be testing.

executor-memory-usage

state-store-rows

@felixcheung

This comment has been minimized.

Copy link
Member

commented Jun 25, 2019

@itsvikramagr could you fix the scala style error?

/home/jenkins/workspace/SparkPullRequestBuilder/
sql/core/src/main/scala/org/apache/spark/sql/
execution/streaming/state/RocksDbInstance.scala:208:0: 
Whitespace at end of line
@SparkQA

This comment has been minimized.

Copy link

commented Jun 25, 2019

Test build #106864 has finished for PR 24922 at commit 3f5f6b2.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Jun 25, 2019

Test build #106865 has finished for PR 24922 at commit f0f2f8d.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@HeartSaVioR

This comment has been minimized.

Copy link
Contributor

commented Jun 25, 2019

Thanks for the hard work, @itsvikramagr !

I agree keeping state in memory is not scalable, and the result looks promising. It might be better to have another kind of benchmark here, like stress test, to see the performance on stateful operations and let end users guide whether they're mostly encouraged to use this implementation, or use this selectively.

What I did for my patch was following:
https://issues.apache.org/jira/browse/SPARK-21271
#21733 (comment)

Btw, it would take some time to review your patch as the diff is 2000+ lines, as well as I also have some works on my plate. You might want to spend time to get familiar with style guide if you haven't - there're some rules which are not checked via scala style check but reviewers will point out.

@HeartSaVioR

This comment has been minimized.

Copy link
Contributor

commented Jun 25, 2019

And please take a deep look at build result if it fails, and try to fix if build failure is related to your patch. In some cases, build output log has a guide message to fix the issue, like this case.

Spark's published dependencies DO NOT MATCH the manifest file (dev/spark-deps).
To update the manifest file, run './dev/test-dependencies.sh --replace-manifest'.
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/pr-deps/spark-deps-hadoop-2.7
index 62b00f3..7e33e82 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/pr-deps/spark-deps-hadoop-2.7
@@ -171,6 +171,7 @@ parquet-jackson-1.10.1.jar
 protobuf-java-2.5.0.jar
 py4j-0.10.8.1.jar
 pyrolite-4.23.jar
+rocksdbjni-6.0.1.jar
 scala-compiler-2.12.8.jar
 scala-library-2.12.8.jar
 scala-parser-combinators_2.12-1.1.0.jar
@itsvikramagr

This comment has been minimized.

Copy link
Author

commented Jun 25, 2019

Thanks, @HeartSaVioR. I understand it a very big change. As suggested let me create a stress test suite and paste some benchmark numbers.

@HeartSaVioR

This comment has been minimized.

Copy link
Contributor

commented Jun 25, 2019

FYI: Just think out loud since I'm being cc-ed first, I'm just a one of contributors, not committers or PMC members of Apache Spark. In case of you might get confused due to "MEMBER" badge - the badge just means I'm one of committers in "any" of ASF projects.

@SparkQA

This comment has been minimized.

Copy link

commented Jun 25, 2019

Test build #106869 has finished for PR 24922 at commit 827ace4.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.
@itsvikramagr

This comment has been minimized.

Copy link
Author

commented Jun 25, 2019

Looking at the unit test failures. It's related to the rocksDbPath folder name. Will make it configurable and update the PR.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 31, 2019

Test build #108461 has finished for PR 24922 at commit 818f716.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@itsvikramagr

This comment has been minimized.

Copy link
Author

commented Aug 2, 2019

I agree keeping state in memory is not scalable, and the result looks promising. It might be better to have another kind of benchmark here, like stress test, to see the performance on stateful operations and let end users guide whether they're mostly encouraged to use this implementation, or use this selectively.

What I did for my patch was following:
https://issues.apache.org/jira/browse/SPARK-21271
#21733 (comment)

I have created the following repo in similar lines to what @HeartSaVioR has done for this patch.

Setup

  • Used Qubole's distribution of Apache Spark 2.4.0 for my tests.
  • Master Instance Type = i3.xlarge
  • Driver Memory = 2g
  • num-executors = 1
  • max-executors = 1
  • spark.sql.shuffle.partitions = 8
  • Run time = 30 mins
  • Source = Rate Source
  • executor Memory = 7g
  • spark.executor.memoryOverhead=3g
  • Processing Time = 30 sec

Executor Instance type = i3.xlarge
cores per executor = 4
ratePerSec = 20k

State Storage Type Mode Total Trigger Execution Time Records Processed Total State Rows Comments
HDFS Append ~7 mins 8.6 million 2 million Application failed before 30 mins
RockSB Append ~30 minutes 34.6 million 7 million

Executor Instance type = C5d.2xlarge
cores per executor = 8
ratePerSec = 30k

State Storage Type Mode Total Trigger Execution Time Records Processed Total State Rows Comments
HDFS Append 8 mins 12.6 million 3.1 million Application was stuck because of GC
RockSB Complete ~30 minutes 47.34 million 12.5 million

Executor info when HDFS state storage is used
Screenshot 2019-08-02 at 10 58 21 AM

cc @gaborgsomogyi @felixcheung @skonto @HeartSaVioR

I will try to add more stress and longevity tests

@SparkQA

This comment has been minimized.

Copy link

commented Aug 7, 2019

Test build #108768 has finished for PR 24922 at commit 603958b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@itsvikramagr

This comment has been minimized.

Copy link
Author

commented Aug 7, 2019

Longevity run results

Setup
Same as above

Executor Instance type = C5d.2xlarge
cores per executor = 8
ratePerSec = 20k

State Storage Type Mode Total Trigger Execution Time Records Processed Total State Rows Number of Micro-batch Comments
RockSB Append ~1.5 hrs 104.3 million 10.5 million 114

Streaming Metrics
Screenshot 2019-08-07 at 8 08 32 PM

Executor info
Screenshot 2019-08-07 at 8 18 10 PM

@felixcheung

This comment has been minimized.

Copy link
Member

commented Aug 14, 2019

cool. is the issue here #24922 (comment) resolved?

@itsvikramagr

This comment has been minimized.

Copy link
Author

commented Aug 14, 2019

cool. is the issue here #24922 (comment) resolved?

Yes this is now resolved. I have made the following changes to resolve it

  • There was a typo in setting dataBlockSize. Instead of KBs, I was setting it to Bytes.
  • For range scan, I was creating a RocksIterator which was not closed cleanly.
  • I fixed some of the rocksDB configs. Taken help from here. In particular, set Max open files, strictly limited caches (No pinning of metadata in cache), Disable FillCache for range scan operations.

Overall the memory usage is now contained and it's around 1-3 GB depending upon the data.

@felixcheung

This comment has been minimized.

Copy link
Member

commented Aug 14, 2019

great! @HeartSaVioR could you review?

@gaborgsomogyi

This comment has been minimized.

Copy link
Contributor

commented Aug 14, 2019

@felixcheung there are couple of big items on the table but coming and going to continue...

@HeartSaVioR

This comment has been minimized.

Copy link
Contributor

commented Aug 16, 2019

Recently I also start working on some task so it might take time for me back to review this. I'll try to review (even partially) when I get some time.

@gaborgsomogyi
Copy link
Contributor

left a comment

Another round.

pom.xml Outdated
@@ -190,6 +190,7 @@
<jpam.version>1.1</jpam.version>
<selenium.version>2.52.0</selenium.version>
<htmlunit.version>2.22</htmlunit.version>
<rocksdb.version>6.0.1</rocksdb.version>

This comment has been minimized.

Copy link
@gaborgsomogyi

gaborgsomogyi Aug 21, 2019

Contributor

6.2.2 is the latest. Is there a reason why not using it?

This comment has been minimized.

Copy link
@itsvikramagr

itsvikramagr Aug 28, 2019

Author

No particular reason. I used 6.0.1 because that was the latest when I started the project. I looked at the release notes for 6.2.2. I couldn't find anything which might be immediately useful in this project but it makes sense to keep the latest version here.

sql/core/pom.xml Show resolved Hide resolved
}
this.localDirectory = this.rocksDbConf
.getOrElse(
"spark.sql.streaming.stateStore.rocksDb.localDirectory".toLowerCase(Locale.ROOT),

This comment has been minimized.

Copy link
@gaborgsomogyi

gaborgsomogyi Aug 21, 2019

Contributor

Since RocksDb would be a built-in store provider we need to use ConfigBuilder? That way the parameters will be documented implicitly.

This comment has been minimized.

Copy link
@itsvikramagr

itsvikramagr Aug 28, 2019

Author

All the conf have some default which need not need to be changed. These are extra confs if someone would like to tune it further.

If ConfigBuilder is required, can it be done as a separate PR so that we can do not increase the scope of this PR.

This comment has been minimized.

Copy link
@gaborgsomogyi

gaborgsomogyi Sep 2, 2019

Contributor

I don't think it's a scope increase. Instead of these 3-4 lines a different 3-4 line has to be added. ConfigBuilder has an internal flag and that way it can be marked as developer only.

@SparkQA

This comment has been minimized.

Copy link

commented Aug 28, 2019

Test build #109874 has finished for PR 24922 at commit 562f755.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Aug 28, 2019

Test build #109875 has finished for PR 24922 at commit 4f42068.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class FileUtilitySuite
@SparkQA

This comment has been minimized.

Copy link

commented Aug 28, 2019

Test build #109876 has finished for PR 24922 at commit 7d4d5d1.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Aug 28, 2019

Test build #109877 has finished for PR 24922 at commit 0b129f3.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@itsvikramagr

This comment has been minimized.

Copy link
Author

commented Aug 28, 2019

@gaborgsomogyi - I have addressed some of your comments and replied to the remaining ones. Thanks again for reviewing the PR

@SparkQA

This comment has been minimized.

Copy link

commented Aug 28, 2019

Test build #109878 has finished for PR 24922 at commit c38bd6c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Aug 29, 2019

Test build #109897 has finished for PR 24922 at commit fb86f0d.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Aug 29, 2019

Test build #109904 has finished for PR 24922 at commit 4544abc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@itsvikramagr

This comment has been minimized.

Copy link
Author

commented Sep 13, 2019

ping @gaborgsomogyi @dongjoon-hyun @HeartSaVioR

I have a comment from @gaborgsomogyi to resolve. Is there anything else I should be doing to get this patch into 3.0 release

@AmplabJenkins

This comment has been minimized.

Copy link

commented Sep 16, 2019

Can one of the admins verify this patch?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants
You can’t perform that action at this time.