Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Spark Model Training with multiple executors #5330

Closed
dmmiller612 opened this issue Mar 10, 2017 · 7 comments
Closed

Spark Model Training with multiple executors #5330

dmmiller612 opened this issue Mar 10, 2017 · 7 comments

Comments

@dmmiller612
Copy link
Contributor

This is more of a feature request that I may even be willing to work on in my spare time. However, I first wanted to see if anyone is either working on this or had any specific plans? Basically right now, it seems like you can only use one executor for training an mxnet model.

It would be nice if mxnet could do model parameter averaging across spark partitions over each iteration of training (or multiple). This would require each partition to have access to an mxnet model, then apply the training on the subset of data (partition), and after an iteration(s) is completed, we could then average the weights and biases for the central core model. We then would pass the updated model to each of the partitions for another iteration or series of iterations and repeat.

I know there are other deep learning libraries that have something similar, and I wanted to see where you guys were on this.

@piiswrong
Copy link
Contributor

@Javelinjs Do we have distributed training with spark?

@yzhliu
Copy link
Member

yzhliu commented Mar 11, 2017

Yes we have distributed training on spark, while more features are required to bring it to production environment, see #2268
And it does not do model average, currently you need to do it manually.

@dmmiller612
Copy link
Contributor Author

" Also, remember to set --executor-cores 1 to ensure there's only one task run in one Spark executor." Is this outdated? This seems to insinuate that it won't work with multiple partitions per executor.

@yzhliu
Copy link
Member

yzhliu commented Mar 12, 2017

That is used to avoid running multiple threads on one mxnet engine, which may cause problems.

@dmmiller612
Copy link
Contributor Author

    val job = trainData.mapPartitions { partition =>
      val dataIter = new LabeledPointIter(
        partition, params.dimension,
        params.batchSize,
        dataName = params.dataName,
        labelName = params.labelName)

      // TODO: more nature way to get the # of examples?
      var numExamples = 0
      while (dataIter.hasNext) {
        val dataBatch = dataIter.next()
        numExamples += dataBatch.label.head.shape(0)
      }
      logger.debug("Number of samples: {}", numExamples)
      dataIter.reset()

      logger.info("Launching worker ...")
      logger.info("Batch {}", params.batchSize)
      // give enough time for ps-lite to detect the dead nodes
      Thread.sleep(20000)
      KVStoreServer.init(ParameterServer.buildEnv(role = "worker",
        rootUri = schedulerIP, rootPort = schedulerPort,
        numServer = params.numServer,
        numWorker = params.numWorker))
      val kv = KVStore.create("dist_async")
      kv.setBarrierBeforeExit(false)

      val optimizer: Optimizer = new SGD(learningRate = 0.01f,
        momentum = 0.9f, wd = 0.00001f)

      logger.debug("Define model")
      val model = new FeedForward(ctx = params.context,
        symbol = params.getNetwork,
        numEpoch = params.numEpoch,
        optimizer = optimizer,
        initializer = new Xavier(factorType = "in", magnitude = 2.34f),
        argParams = null,
        auxParams = null,
        beginEpoch = 0,
        epochSize = numExamples / params.batchSize / kv.numWorkers)
      logger.info("Start training ...")
      model.fit(trainData = dataIter,
        evalData = null,
        evalMetric = new Accuracy(),
        kvStore = kv)

      logger.info("Training finished, waiting for other workers ...")
      dataIter.dispose()
      kv.setBarrierBeforeExit(true)
      kv.dispose()
      Iterator(new MXNetModel(
        model, params.dimension, params.batchSize,
        dataName = params.dataName, labelName = params.labelName))
    }.cache()

    // force job to run
    job.foreachPartition(() => _)
    // simply the first model
    val mxModel = job.first()

So this is the line of code that brought me to write the ticket. I may be totally misreading it, but it looks like it using a separate model in each of the mapPartitions. Then once you have an rdd of models, it returns the first model which is only representative of one partition of data. So it seems like some of the data would not be represented, if there was more than 1 partition. If this is the case, I guess I am a little confused on how this is distributed, since everything has to be on a single node to work correctly.

@yzhliu
Copy link
Member

yzhliu commented Mar 12, 2017

Model parameters are stored in parameter-server, which is pulled by workers during training. This is how workers 'see' with each other.

@yajiedesign
Copy link
Contributor

This issue is closed due to lack of activity in the last 90 days. Feel free to reopen if this is still an active issue. Thanks!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants