Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Horovod in PySpark #606

Merged
merged 54 commits into from Dec 31, 2018

Conversation

3 participants
@alsrgv
Copy link
Collaborator

alsrgv commented Nov 2, 2018

This PR adds convenient helper function horovod.spark.run(fn) that runs Horovod in PySpark environment.

Under the hood, we set up a little TCP service on Driver and each Task that perform few functions:

  1. Horovod Driver asks Spark Driver to spin up Spark Tasks which run Horovod Tasks.
  2. Horovod Tasks register with Horovod Driver. They provide information about their network interfaces and hostname.
  3. After all the Horovod Tasks have registered, each of them contacts one other Horovod Task to determine which network interfaces are routable, and report discoveries back to Horovod Driver.
  4. Horovod Driver computes the intersection of routable interfaces, reorders tasks ranks to optimize for host colocation and executes mpirun with the custom remote executor (via -mca plm_rsh_agent). Custom remote executor allows us to avoid using ssh.
  5. The custom remote executor contacts Horovod Task to execute the command requested by mpirun, in case of Open MPI it's orted.
  6. Horovod Task executes orted and will terminate when it finishes.
  7. orted establishes the handshake with mpirun and starts running mpirun_exec_fn.py script. That script calls back to Horovod Driver, fetches user code definition, executes it, and sends Horovod Task execution result when user code finishes.
  8. mpirun terminates when the program finishes.
  9. Horovod Tasks report execution results to Spark Tasks.
  10. Horovod Driver waits for Spark Driver to collect execution results from Spark Tasks.
  11. The execution results are surfaced to the user in the rank order.

This flow is depicted on the following sequence diagram (also available in editor):

image

All external programs, such as mpirun, orted and python are executed in a safe_shell_exec.py wrapper which guarantees children termination if the parent process is killed.

One detail not depicted in the diagram above: if there are multiple tasks on the same host, mpirun will launch only one orted process per host - on the first task running on that host. In order to accommodate that mpirun behavior, tasks other than the first task on the host poll the first task for its command completion. This behavior does not cause RPC polling explosion since maximum cardinality is the number of tasks per host.

The data is expected to be saved in Parquet format and ingested using Petastorm. This has been found to be more reliable and scalable than directly ingesting Spark RDDs.

This PR is a work in progress.

Example:

>>> def fn(magic_number):
...   import horovod.torch as hvd
...   hvd.init()
...   print('Hello, rank = %d, local_rank = %d, size = %d, local_size = %d, magic_number = %d' % (hvd.rank(), hvd.local_rank(), hvd.size(), hvd.local_size(), magic_number))
...   return hvd.rank()
...
>>> import horovod.spark
>>> horovod.spark.run(fn, args=(42,))
[Stage 0:>                                                        (0 + 16) / 16]
Hello, rank = 15, local_rank = 3, size = 16, local_size = 4, magic_number = 42
Hello, rank = 13, local_rank = 1, size = 16, local_size = 4, magic_number = 42
Hello, rank = 8, local_rank = 0, size = 16, local_size = 4, magic_number = 42
Hello, rank = 9, local_rank = 1, size = 16, local_size = 4, magic_number = 42
Hello, rank = 10, local_rank = 2, size = 16, local_size = 4, magic_number = 42
Hello, rank = 11, local_rank = 3, size = 16, local_size = 4, magic_number = 42
Hello, rank = 6, local_rank = 2, size = 16, local_size = 4, magic_number = 42
Hello, rank = 4, local_rank = 0, size = 16, local_size = 4, magic_number = 42
Hello, rank = 0, local_rank = 0, size = 16, local_size = 4, magic_number = 42
Hello, rank = 1, local_rank = 1, size = 16, local_size = 4, magic_number = 42
Hello, rank = 2, local_rank = 2, size = 16, local_size = 4, magic_number = 42
Hello, rank = 5, local_rank = 1, size = 16, local_size = 4, magic_number = 42
Hello, rank = 3, local_rank = 3, size = 16, local_size = 4, magic_number = 42
Hello, rank = 12, local_rank = 0, size = 16, local_size = 4, magic_number = 42
Hello, rank = 7, local_rank = 3, size = 16, local_size = 4, magic_number = 42
Hello, rank = 14, local_rank = 2, size = 16, local_size = 4, magic_number = 42
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
>>>

@alsrgv alsrgv requested a review from tgaddair Nov 2, 2018

@alsrgv alsrgv self-assigned this Nov 2, 2018

@alsrgv alsrgv force-pushed the spark branch 21 times, most recently from 2d858f8 to a0e372d Nov 2, 2018

@alsrgv alsrgv force-pushed the spark branch 3 times, most recently from c87c2a4 to f7cbc8a Nov 18, 2018

@alsrgv alsrgv force-pushed the spark branch 2 times, most recently from a7c18a7 to 0213124 Nov 25, 2018

@alsrgv alsrgv changed the title Horovod on PySpark Horovod in PySpark Nov 26, 2018

@alsrgv alsrgv force-pushed the spark branch from b96ef2e to 96e5b75 Dec 26, 2018

@alsrgv alsrgv requested a review from sblotner Dec 26, 2018

$ python keras_spark_rossmann.py
```

### Spark cluster setup

This comment has been minimized.

@alsrgv

alsrgv Dec 26, 2018

Author Collaborator

@felixcheung, do you have better recommendations for users to set up GPU Spark cluster at this point?

verbose=verbose,
epochs=100)

# TODO (@yevgeni): Petastorm currently displays a wall of errors upon termination.

This comment has been minimized.

@alsrgv

alsrgv Dec 26, 2018

Author Collaborator

@selitvin, this is the current hack that I have to avoid Petastorm errors upon process exit.


In order to run the example, please install the following dependencies:
* `pyspark`
* `petastorm >= 0.5.1`

This comment has been minimized.

@alsrgv

alsrgv Dec 26, 2018

Author Collaborator

@selitvin, thanks for uber/petastorm#272! Is the release version that includes it going to be called 0.5.1 as I speculated here?

@@ -247,88 +349,3 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

NVIDIA/cutlass

This comment has been minimized.

@alsrgv

alsrgv Dec 26, 2018

Author Collaborator

These are just moved above.

@alsrgv alsrgv requested a review from abditag2 Dec 26, 2018

@alsrgv alsrgv force-pushed the spark branch 2 times, most recently from 9277733 to 1df053c Dec 27, 2018

@sblotner
Copy link
Collaborator

sblotner left a comment

Nice docs! Suggested some text edits.

The `horovod.spark` package provides a convenient wrapper around Open
MPI that makes running Horovod jobs in Spark clusters easy.

In situations where training data originates from Spark this enables

This comment has been minimized.

@sblotner

sblotner Dec 31, 2018

Collaborator

Add a comma after "Spark"

MPI that makes running Horovod jobs in Spark clusters easy.

In situations where training data originates from Spark this enables
tight model design loop in which data processing, model training and

This comment has been minimized.

@sblotner

sblotner Dec 31, 2018

Collaborator

tight model --> a tight model

add a comma after "training"

tight model design loop in which data processing, model training and
model evaluation are all done in Spark.

The toy example of running a Horovod job in Spark is provided below:

This comment has been minimized.

@sblotner

sblotner Dec 31, 2018

Collaborator

The --> A

and leverages the code of the notebook referenced in the article.

The example is split into three parts:
1. The first part performs complicated data preprocessing over initial set

This comment has been minimized.

@sblotner

sblotner Dec 31, 2018

Collaborator

over initial --> over an initial


The example is split into three parts:
1. The first part performs complicated data preprocessing over initial set
of CSV files provided by competition and gathered by the community.

This comment has been minimized.

@sblotner

sblotner Dec 31, 2018

Collaborator

by competition --> by the competition


### Security

Horovod in Spark uses Open MPI to run the Horovod jobs in Spark, and so

This comment has been minimized.

@sblotner

sblotner Dec 31, 2018

Collaborator

remove "and"

### Environment knobs

* `HOROVOD_SPARK_START_TIMEOUT` - sets the default timeout for Spark
tasks to spawn, register and start running the code. If executors for

This comment has been minimized.

@sblotner

sblotner Dec 31, 2018

Collaborator

Add comma after "register"


* `HOROVOD_SPARK_START_TIMEOUT` - sets the default timeout for Spark
tasks to spawn, register and start running the code. If executors for
Spark tasks are scheduled on-demand and can take long time to start,

This comment has been minimized.

@sblotner

sblotner Dec 31, 2018

Collaborator

take long --> take a long

"""
Used for serialization/deserialization of objects over the wire.
We use HMAC to protect services from unauthorized use. Key used for

This comment has been minimized.

@sblotner

sblotner Dec 31, 2018

Collaborator

Key --> The key

os.close(stderr_w)

# Redirect command stdout & stderr to provided streams or sys.stdout/sys.stderr.
# This is useful for Jupyter Notebook that uses custom sys.stdout/sys.stderr or

This comment has been minimized.

@sblotner

sblotner Dec 31, 2018

Collaborator

Notebook --> Notebooks
uses --> use

This comment has been minimized.

@alsrgv

alsrgv Dec 31, 2018

Author Collaborator

"Jupyter Notebook" is a name of software

@alsrgv alsrgv merged commit 3cc0f1e into master Dec 31, 2018

3 checks passed

License Compliance All checks passed.
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
license/cla Contributor License Agreement is signed.
Details

@alsrgv alsrgv deleted the spark branch Dec 31, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.