Skip to content
Permalink
Browse files

Updated docs to include Horovod Spark Estimators and Keras callbacks (#…

…1656)

Signed-off-by: Travis Addair <taddair@uber.com>
  • Loading branch information
tgaddair committed Jan 10, 2020
1 parent 4fc321f commit d40169415ca92c6a923003dbb27e508ba4202426
@@ -8,10 +8,14 @@ horovod.tensorflow
horovod.tensorflow.keras
------------------------
.. automodule:: horovod.tensorflow.keras
.. automodule:: horovod.tensorflow.keras.callbacks
:special-members: __init__

horovod.keras
-------------
.. automodule:: horovod.keras
.. automodule:: horovod.keras.callbacks
:special-members: __init__

horovod.torch
-------------
@@ -25,6 +29,29 @@ horovod.spark
-------------
.. automodule:: horovod.spark

horovod.spark.keras
-------------------
.. autoclass:: horovod.spark.keras.KerasEstimator
:show-inheritance:
.. autoclass:: horovod.spark.keras.KerasModel
:show-inheritance:

horovod.spark.torch
-------------------
.. autoclass:: horovod.spark.torch.TorchEstimator
:show-inheritance:
.. autoclass:: horovod.spark.torch.TorchModel
:show-inheritance:

horovod.spark.common
--------------------
.. autoclass:: horovod.spark.common.estimator.HorovodEstimator
.. autoclass:: horovod.spark.common.estimator.HorovodModel
.. automodule:: horovod.spark.common.backend
:show-inheritance:
.. automodule:: horovod.spark.common.store
:show-inheritance:

horovod.run
-------------
.. automodule:: horovod.run
@@ -61,9 +61,7 @@
autodoc_default_options = {
'members': None,
'member-order': 'bysource',
'special-members': '__init__',
'imported-members': None,
'undoc-members': None,
'exclude-members': 'contextmanager, LooseVersion, tf, keras, torch, mx, pyspark',
}

@@ -17,11 +17,42 @@
from unittest.mock import MagicMock


class Empty(object):
pass


class HasOutputCols(object):
pass


class Params(object):
@staticmethod
def _dummy():
return MagicMock()


MOCK_MODULES = [
'cloudpickle',
'ctypes',
'h5py',
'psutil',

'pyarrow',
'pyarrow.parquet',

'numpy',
'numpy.core.multiarray',
'numpy.dtype',

'pyspark',
'pyspark.ml',
'pyspark.ml.linalg',
'pyspark.ml.param',
'pyspark.ml.param.shared',
'pyspark.ml.util',
'pyspark.sql',
'pyspark.sql.functions',
'pyspark.sql.types',

'tensorflow',
'tensorflow.python',
@@ -34,6 +65,9 @@
'keras.backend',

'torch',
'torch.utils',
'torch.utils.data',
'torch.utils.tensorboard',

'mxnet',
'mxnet.base',
@@ -67,12 +101,44 @@
'torch': {
'__version__': '1.0.0',
},
'pyspark': {
'ml': {
'Estimator': Empty,
'Model': Empty,
'param': {
'shared': {
'HasOutputCols': HasOutputCols,
'Param': MagicMock,
'Params': Params,
'TypeConverters': MagicMock(),
},
},
'util': {
'MLReadable': Empty,
'MLWritable': Empty,
}
},
},
'horovod': {
'common': {
'util': {
'get_ext_suffix': lambda: 'xyz',
},
},
'spark': {
'keras': {
'estimator': {
'KerasEstimatorParamsReadable': MagicMock,
'KerasEstimatorParamsWritable': MagicMock,
},
},
'torch': {
'estimator': {
'TorchEstimatorParamsReadable': MagicMock,
'TorchEstimatorParamsWritable': MagicMock,
},
},
},
},
}

@@ -126,7 +126,7 @@ def load_model(filepath, custom_optimizers=None, custom_objects=None, compressio
and wrapped without needing to specify any `custom_optimizers` or
`custom_objects`.
# Arguments
Arguments:
filepath: One of the following:
- string, path to the saved model, or
- h5py.File object from which to load the model
@@ -138,10 +138,10 @@ def load_model(filepath, custom_optimizers=None, custom_objects=None, compressio
sent and received by each worker node. Defaults to not
using compression.
# Returns
Returns:
A Keras model instance.
# Raises
Raises:
ImportError: If h5py is not available.
ValueError: In case of an invalid savefile.
"""
@@ -116,19 +116,16 @@ class LearningRateWarmupCallback(_impl.LearningRateWarmupCallbackImpl, keras.cal
ImageNet in 1 Hour". See https://arxiv.org/pdf/1706.02677.pdf for details.
Math recap:
batch
epoch = full_epochs + ---------------
steps_per_epoch
lr size - 1
lr'(epoch) = ---- * (-------- * epoch + 1)
size warmup
.. math::
lr
lr'(epoch = 0) = ----
size
epoch &= full\\_epochs + \\frac{batch}{steps\\_per\\_epoch}
lr'(epoch = warmup) = lr
lr'(epoch) &= \\frac{lr}{size} * (\\frac{size - 1}{warmup} * epoch + 1)
lr'(epoch = 0) &= \\frac{lr}{size}
lr'(epoch = warmup) &= lr
"""

def __init__(self, warmup_epochs=5, momentum_correction=True, steps_per_epoch=None,
@@ -30,23 +30,41 @@ def default_num_proc():


class Backend(object):
"""Interface for remote execution of the distributed training function.
A custom backend can be used in cases where the training environment running Horovod is different
from the Spark application running the HorovodEstimator.
"""

def run(self, fn, args=(), kwargs={}, env=None):
"""Executes the training `fn` and returns results from each worker in a list (ordered by ascending rank).
Args:
fn: Function to run.
args: Arguments to pass to `fn`.
kwargs: Keyword arguments to pass to `fn`.
env: Environment dictionary to use in Horovod run. Defaults to `os.environ`.
Returns:
List of results returned by running `fn` on each rank.
"""
raise NotImplementedError()

def num_processes(self):
"""Returns the number of processes to use for training."""
raise NotImplementedError()


class SparkBackend(Backend):
"""
Uses `horovod.spark.run` to execute the distributed training `fn`.
"""Uses `horovod.spark.run` to execute the distributed training `fn`."""

Args:
num_proc: Number of Horovod processes. Defaults to `spark.default.parallelism`.
env: Environment dictionary to use in Horovod run. Defaults to `os.environ`.
**kwargs: Additional arguments passed to `horovod.spark.run` at training time.
"""
def __init__(self, num_proc=None, env=None, **kwargs):
"""
Args:
num_proc: Number of Horovod processes. Defaults to `spark.default.parallelism`.
env: Environment dictionary to use in Horovod run. Defaults to `os.environ`.
**kwargs: Additional arguments passed to `horovod.spark.run` at training time.
"""
self._num_proc = num_proc or default_num_proc()
self._env = env
self._kwargs = kwargs
@@ -17,15 +17,34 @@

import horovod.spark.common._namedtuple_fix

from pyspark.ml import Estimator
from pyspark.ml import Estimator, Model

from horovod.spark.common import util
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.params import EstimatorParams
from horovod.spark.common.params import EstimatorParams, ModelParams


class HorovodEstimator(Estimator, EstimatorParams):
def fit(self, df, params=None):
"""Fits the model to the DataFrame.
Args:
df: Input dataset, which is an instance of :py:class:`pyspark.sql.DataFrame`.
params: An optional param map that overrides embedded params.
Returns:
`HorovodModel` transformer wrapping the trained model.
"""
return super(HorovodEstimator, self).fit(df, params)

def fit_on_parquet(self, params=None):
"""Trains the model on a saved Parquet file at `store.get_train_path()`.
Args:
params: An optional param map that overrides embedded params.
Returns:
Trained HorovodModel transformer of the appropriate subclass wrapping the trained model.
"""
if params:
return self.copy(params)._fit_on_parquet()
else:
@@ -75,3 +94,21 @@ def _has_checkpoint(self, run_id):
store = self.getStore()
last_ckpt_path = store.get_checkpoint_path(run_id)
return last_ckpt_path is not None and store.exists(last_ckpt_path)


class HorovodModel(Model, ModelParams):
def transform(self, df, params=None):
"""
Transforms the input dataset with prediction columns representing model predictions.
Prediction column names default to <label_column>__output. Override column names
by calling `transformer.setOutputCols(col_names)`.
Args:
df: Input dataset, which is an instance of :py:class:`pyspark.sql.DataFrame`.
params: An optional param map that overrides embedded params.
Returns:
Transformed dataset.
"""
return super(HorovodModel, self).transform(df, params)

0 comments on commit d401694

Please sign in to comment.
You can’t perform that action at this time.