Skip to content
Splash, a flexible Spark shuffle manager that supports user-defined storage backends for shuffle data storage and exchange
Scala Java Shell
Branch: master
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
doc
src [GH-60] Support fast merge in storage plugin iface. (#61) Nov 11, 2019
.gitignore
.travis.yml
CONTRIBUTING.md
LICENSE
README.md
batch
clean Setup external storage interface. Nov 30, 2018
integration-test Setup external storage interface. Nov 30, 2018
make
make.notest
pom.xml

README.md

Splash

travis-ci codecov license

A shuffle manager for Spark that supports different storage plugins.

The motivation of this project is to supply a fast, flexible and reliable shuffle manager that allows the user to plug in his/her favorite backend storage and network frameworks for holding and exchanging shuffle data.

In general, the current shuffle manager in Spark has some shortcomings.

  • The local shuffle data have limitations on reliability and performance.
    • Losing a single node can break the data integrity of the entire cluster.
    • It is difficult to containerize the application.
    • In order to improve the shuffle read/write performance, you must upgrade each server in the cluster.
    • the overall performance of the shuffle stage is affected by the performance of local disk IO when there is heavy shuffling.
  • There is no easy/general solution to plugin external storage to the shuffle service.

We want to address these issues in this shuffle manager.



License

Apache License Version 2.0

Deployment

By default, we support Spark 2.3.2_2.11 with Hadoop 2.7.
If you want to generate a build with a different Spark version, you need to modify these version parameters in pom.xml

  • spark.version
  • hadoop.version
  • scala.version

Check the Build section for how to generate your customized jar.

Spark

  • You need to include the Splash jar file in your spark default configuration or task configuration. Make sure you choose the one that is aligned with your Spark and Scala version. Typically, you only need to add two configurations in your spark-defaults.conf
spark.driver.extraClassPath /path/to/splash.jar
spark.executor.extraClassPath /path/to/splash.jar
  • You can include the plugin jar in the same way.
  • You can configure your Spark application to use the Splash shuffle manager by adding the following option:
spark.shuffle.manager org.apache.spark.shuffle.SplashShuffleManager
  • The storage plugin is tunable at the application level. The user can specify different storage implementations for different applications.
  • Support both on-premise and cloud deployments.

Release

Upgrade

Although the basic functionality of the project has been verified, we still feel that the public API might be modified when more storage plugins are developed. Therefore:

  • The public API may change until we reach version 1.0.0.

According to the definition of semantic versioning 2.0.0, we do not promise backward compatibility if the first digit in the version is changed.

Service & Support

  • Please raise your question in the project's issue page and tag it with question.
  • Project documents are available in the doc folder.

Community

You can communicate with us in following ways:

  • Start a new thread in Github issues, recommended.
  • Request to join the WeChat group through email and make sure you include your WeChat ID in the mail.

Contributing

Please check the Contributing document for details.

Build

  • Use mvn install to build the project. Optionally, you could use -DskipTests=true to disable the unit tests.

    When the build process completes:

    • A standard jar will be generated at: ./target/splash-<version>.jar. This jar is what you need to deploy to your Spark environment.
    • A fat jar will be generated at: ./target/splash-<version>-shaded.jar
    • You can find the unit test result in: ./target/surefire-reports
    • You can find the coverage report in: ./target/site/jacoco
  • Use mvn clean to clean the build output.

  • Use integration-test or mvn failsafe:integration-test -DskipIT=false to run the integration tests. Those tests should connect to the actual File System. You could also modify the test source code to test your own storage plugin.

    • Once the tests complete, the results are available in: ./target/failsafe-reports
  • Use mvn pmd:pmd to run static code analysis.

    • Analysis report is available in: ./target/site/pmd.html

Options

  • spark.shuffle.splash.storageFactory specifies the class name of your factory. This class must implement StorageFactory
  • spark.shuffle.splash.clearShuffleOutput is a boolean value telling the shuffle manager whether to clear the shuffle output when the shuffle stage completes.

Plugin Development

Splash uses plugins to support different types of storage systems. The user can develop their own storage plugins for the shuffle manager. The user can use different types of storage system based on the usage of the file. For details, please check our design document.

The Splash project is currently released with a default plugin:

  • the plugin for shared file systems like NFS is implemented by: com.memverge.splash.shared.SharedFSFactory

This plugin serves as an example for developers to develop their own storage plugins.

Deploy Shared Folder Storage Plugin

Take NFS as an example, here are the steps to configure Splash with the shared folder plugin.

  • Update the configurations in spark-defaults.conf:
# add the Splash jar to the classpath
spark.driver.extraClassPath /path/to/splash.jar
spark.executor.extraClassPath /path/to/splash.jar

# set shuffle manager and storage plugin
spark.shuffle.manager org.apache.spark.shuffle.SplashShuffleManager
spark.shuffle.splash.storageFactory com.memverge.splash.shared.SharedFSFactory

# set the location of your shared folder
spark.shuffle.splash.folder /your/share/folder
  • Make sure that all your Spark nodes can access the shared folder you specified in the configuration file.
  • Run some sample Spark applications and you should be able to observe that the application folder is created in a shared folder.

Shuffle Performance Tool

Use this tool to verify the performance of the storage plugin. Users could also use this tool to compare different storage plugin implementations or find the regressions of the storage plugin.

Note that this tool bases on the storage interface. It does not require a Spark environment.

It writes the shuffle output and read them with configured arguments. See the configuration details below:

  • -h or --help: display the usage
  • -f or --factory: specify the name of the storage factory
  • -i or --shuffleId: the test shuffle ID, default to 1
  • -t or --tasks: the number of concurrent tasks, default to 5
  • -m or --mappers: the number of mappers, default to 10
  • -r or --reducers: the number of reducers, default to 10
  • -d or --data: the number of data blocks, default to 1K
  • -b or --blockSize: the block/buffer size of each data block, default to 256K
  • -o or --overwrite: overwrite existing outputs

Sample command:

java -cp target/splash-shaded.jar com.memverge.splash.ShufflePerfTool 
-d 64 -m 200 -r 200 -t 8 -o

Sample output

overwrite, removing existing shuffle for shuffleTest-1                                        
==========================================                                                    
Writing 200 shuffle with 8 threads: 100% (200/200)                                            
Write shuffle data completed in 7440 milliseconds                                             
    Reading index file:  0 ms                                                                 
    storage factory:     com.memverge.splash.shared.SharedFSFactory                           
    shuffle folder:      \tmp\splash\shuffleTest-1\shuffle 
    number of mappers:   200                                                                  
    number of reducers:  200                                                                  
    total shuffle size:  3GB                                                                  
    bytes written:       3GB                                                                  
    bytes read:          0B                                                                   
    number of blocks:    64                                                                   
    blocks size:         256KB                                                                
    partition size:      81KB                                                                 
    concurrent tasks:    8                                                                    
    bandwidth:           430MB/s                                                              
                                                                                              
==========================================                                                    
Reading 40000 partitions with 8 threads   100% (40000/40000)                                   
Read shuffle data completed in 35525 milliseconds                                             
    Reading index file:  15907 ms                                                             
    storage factory:     com.memverge.splash.shared.SharedFSFactory                           
    shuffle folder:      \tmp\splash\shuffleTest-1\shuffle 
    number of mappers:   200                                                                  
    number of reducers:  200                                                                  
    total shuffle size:  3GB                                                                  
    bytes written:       3GB                                                                  
    bytes read:          3GB                                                                  
    number of blocks:    64                                                                   
    blocks size:         256KB                                                                
    partition size:      81KB                                                                 
    concurrent tasks:    8                                                                    
    bandwidth:           90MB/s                                                               
You can’t perform that action at this time.