Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SYSTEMML-2420] Initial version of distributed spark ps #805

Closed
wants to merge 15 commits into from

Conversation

EdgarLGB
Copy link
Member

Hi @mboehm7 ,

Here is the PR for implementing the initial version of spark paramserv function. And just for information, in the spark ps test, there are some FIXMEs indicating that some error will be reproduced when specifying the epochs with 10. I'm hoping that you could maybe have a look on them because currently I have little idea to understand the problem.

Thanks for the review,
Guobao

Copy link
Contributor

@mboehm7 mboehm7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @EdgarLGB. Sure - I'm happy to help debugging this. However, the tests are currently not running because paramserv-test.dml is not included in the PR - could you please fix this. In addition, below you'll find some additional suggestions from a glance over the PR.

DMLScript.RUNTIME_PLATFORM oldRtPlatform = DMLScript.rtplatform;
DMLScript.rtplatform = DMLScript.RUNTIME_PLATFORM.SINGLE_NODE;
Recompiler.recompileProgramBlockHierarchy2Forced(program.getProgramBlocks(), 0, new HashSet<>(), LopProperties.ExecType.CP);
DMLScript.rtplatform = oldRtPlatform;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be some outdated code. During the merge of a previous PR, I fixed the recompilation to CP which does not require modifying the global static flags (which could conflict with concurrent recompilation if paramserv is used inside parfor) - instead we can simply call Recompiler.recompileProgramBlockHierarchy2Forced with a given ExecType et to force all instructions into this et .

Copy link
Member Author

@EdgarLGB EdgarLGB Jul 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I have a question about how to launch the script on spark mode. I found that we could execute the script by creating MLContext or by using java SystemML.jar -exec spark. So what is the difference here? And the reason that I did so is that I used java -exec spark to launch scripts on spark and it was not able to correctly convert all the instructions into CP. But currently, creating MLContext works well for this conversion.

private static final String TEST_NAME5 = "paramserv-nn-bsp-batch-drr";
private static final String TEST_NAME6 = "paramserv-nn-bsp-batch-dr";
private static final String TEST_NAME7 = "paramserv-nn-bsp-batch-or";
private static final String TEST_NAME = "paramserv-test";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not included in the PR - please add.

@@ -73,5 +88,18 @@ private void configureWorker(Tuple2<Integer, Tuple2<MatrixBlock, MatrixBlock>> i

// Initialize the buffer pool and register it in the jvm shutdown hook in order to be cleanuped at the end
RemoteParForUtils.setupBufferPool(_workerID);

// Create the ps proxy
_ps = PSRpcFactory.createSparkPSProxy(_host);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On configuring the worker, you might want to double check that the list objects are deserialized with the right update status. Otherwise, this might be one of the reasons of invalid cleanups again.

public class SparkPSProxy extends ParamServer {

private TransportClient _client;
private static final long RPC_TIME_OUT = 1000 * 60 * 5; // 5 minute of timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, try to get the spark.rpc.* (e.g. spark.rpc.lookupTimeout) from the current Spark configuration (e.g., create a new SparkConf() which reads the configuration inside the executors from their system properties).

@EdgarLGB
Copy link
Member Author

Thanks @mboehm7 for your early feedback. And I've made some modifications according to it.

@mboehm7
Copy link
Contributor

mboehm7 commented Jul 22, 2018

To answer your question on spark execution, I have to separate two things here (1) APIs and (2) execution modes, both of which are orthogonal except for certain APIs that only support a limited set of execution modes.

Regarding APIs, we support command line, MLContext, JMLC, ML pipelines, and Keras2DML/Caffe2DML. Command lines itself covers several types: you can run it standalone through Java (as you show above), via the spark-submit script (where the SystemML's driver runs in Spark's driver process) and through the hadoop script (where SystemML's driver runs in a client process or YARN container).

However, you're actual question is more about execution modes, which you can influence with the command line flag -exec. There we support singlenode (all operations in CP), hadoop (all matrix operations in MR), spark (all matrix operations in SPARK), hybrid (CP or MR per operation), and hybrid_spark (CP or SPARK per operation). Note that hybrid_spark is set as the default if you come through MLContext or spark_submit command line, while singlenode is the default in JMLC. These three configurations of API/exec modes are what most applications use.

@mboehm7
Copy link
Contributor

mboehm7 commented Jul 22, 2018

Furthermore, thanks for catching the issue with the in-place binary operations. As it turned out these issues occurred in special cases where dense matrices where converted to sparse matrices in CSR format. However, the in-place sparse binary operation implementation assumed our default MCSR. With the fix in SYSTEMML-2462 all your tests run perfectly fine. The tests with update per batch run very long so I'll likely reduce the number of epochs there.

Copy link
Contributor

@mboehm7 mboehm7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - thanks @EdgarLGB. Overall this is a very good start and because it's working correctly in local mode, we can already merge it in. However, I would suggest (1) reworking the communication to a deep serialization of the actual matrix blocks, and (2) using accumulators to collect the statistics from remote workers. Both can be done in subsequent PR.

sb.append(EMPTY);
} else {
flushListObject(_data);
sb.append(ProgramConverter.serializeDataObject(DATA_KEY, _data));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perf: This serialization / deserialization approach should be replaced with serialization of in-memory matrix blocks. Right now, this PR exports (serializes and writes) the matrices to HDFS, which replicates them to multiple nodes via RPC, on the other side we load (read and deserialize) them again which causes another RPC for remote lookups if the data was not already replicated to the target node. Instead we could simply use the existing matrix serializers and deserializers to send the matrices via RPC. For that I would recommend to define a simple binary format (e.g., 4 byte int method, 4 byte int worker id, followed by name-value pairs of matrices where value is the byte sequence of a serialized matrix). The size of the output buffer can be exactly determined via MatrixBlock.getExactSizeOnDisk.

sb.append(EMPTY);
} else {
flushListObject((ListObject) _data);
sb.append(ProgramConverter.serializeDataObject(DATA_KEY, (ListObject) _data));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perf: Same as the previous comment on serialization/deserialization.

protected String bufferToString(ByteBuffer buffer) {
byte[] result = new byte[buffer.limit()];
buffer.get(result, 0, buffer.limit());
return new String(result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be removed once we modified the serialization/deserialization. Usually it's not a good idea to convert anything other than meta data into string representations as its conversion and parsing is very expensive for floating point data.

//TODO should be able to configure the port by users
public class PSRpcFactory {

private static final int PORT = 5055;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please have a look how Spark assigns ports for RPC communication. It would be great if we can use a similar approach to have it consistent and ensure we're not conflicting with Spark and other daemon processes.


try {
ParamservUtils.doPartitionOnSpark(sec, features, labels, scheme, workerNum) // Do data partitioning
.foreach(worker); // Run remote workers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use accumulators to collect the statistics (that we show on -stats) from all executor processes unless we're running in local spark mode as indicated by SparkExecutionContext.isLocalMaster() (parfor can serve as an example). Right now the statistics collection would only work in local mode where all "remote" tasks are executed in the driver process and hence correctly update the static statistics. In cluster mode, all statistics from the executors would be lost.

Also, it might be a good idea to use additional accumulators for the number of executed batches and epochs. This gives users a rough progress indicator in the Spark UI which will be very useful for long running paramserv instances.

@asfgit asfgit closed this in 15ecb72 Jul 22, 2018
@EdgarLGB
Copy link
Member Author

Thanks @mboehm7 for the final feedback. And I will work on it for the following PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants