Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add horovod.run.run to make horovod notebook friendly (new impl) #1307

Merged
merged 35 commits into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
75812cd
init pr
WeichenXu123 Aug 29, 2019
e5a26fd
address comments
WeichenXu123 Aug 30, 2019
6875f30
update
WeichenXu123 Aug 30, 2019
e46572f
fix
WeichenXu123 Aug 31, 2019
d5bf075
handle exception
WeichenXu123 Sep 2, 2019
46007c3
fix
WeichenXu123 Sep 3, 2019
175794a
update
WeichenXu123 Sep 3, 2019
51f3595
add unit test
WeichenXu123 Sep 3, 2019
79c7f15
fix test
WeichenXu123 Sep 3, 2019
46936df
update test script
WeichenXu123 Sep 4, 2019
6afc7e2
fix test
WeichenXu123 Sep 4, 2019
2de20f1
fix
WeichenXu123 Sep 4, 2019
4b35408
fix
WeichenXu123 Sep 4, 2019
1edc557
fix
WeichenXu123 Sep 4, 2019
51c2c92
fix
WeichenXu123 Sep 4, 2019
479bc0c
remove mpich test
WeichenXu123 Sep 4, 2019
a2f0bad
merge master & resolve conflicts
WeichenXu123 Sep 5, 2019
1226091
address comments
WeichenXu123 Sep 5, 2019
147c384
fix
WeichenXu123 Sep 5, 2019
ce7ff74
Merge branch 'master' into issue1176_3
WeichenXu123 Sep 8, 2019
7212622
add doc and negative test
WeichenXu123 Sep 9, 2019
d73a086
merge master
WeichenXu123 Oct 17, 2019
33024ec
address comments
WeichenXu123 Oct 17, 2019
90e58f2
fix gen-pipeline
WeichenXu123 Oct 17, 2019
b86c6f3
merge master
WeichenXu123 Oct 19, 2019
5b48b13
restore mpich test
WeichenXu123 Oct 19, 2019
7318d61
add mpi_args
WeichenXu123 Oct 19, 2019
6739508
collect all process ret val
WeichenXu123 Oct 19, 2019
78ce593
add mpich support
WeichenXu123 Oct 19, 2019
6e0bd83
improve test
WeichenXu123 Oct 20, 2019
cb0a152
exclude mpich for run_interactiverun
WeichenXu123 Oct 22, 2019
c8a8643
address comments
WeichenXu123 Oct 23, 2019
1b23e7d
fix test
WeichenXu123 Oct 23, 2019
3535ba2
update get_env_rank_and_size
WeichenXu123 Oct 24, 2019
7147aab
fix
WeichenXu123 Oct 24, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions .buildkite/gen-pipeline.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ tests=( \
test-cpu-openmpi-py3_6-tf2_0_0-keras2_2_4-torch1_2_0-mxnet1_5_0-pyspark2_4_0 \
test-cpu-openmpi-py2_7-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
test-cpu-openmpi-py3_6-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
test-cpu-mpich-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
test-cpu-mlsl-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
test-gpu-openmpi-py2_7-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
test-gpu-openmpi-py3_5-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
Expand All @@ -42,7 +41,6 @@ tests=( \
test-gpu-openmpi-py3_6-tf2_0_0-keras2_2_4-torch1_2_0-mxnet1_5_0-pyspark2_4_0 \
test-gpu-openmpi-py2_7-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
test-gpu-openmpi-py3_6-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
test-gpu-mpich-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
test-mixed-openmpi-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
)

Expand Down Expand Up @@ -116,9 +114,16 @@ run_all() {
exclude_keras_if_needed="| sed 's/[a-z_]*keras[a-z_.]*//g'"
fi

local exclude_interactiverun="| sed 's/test_interactiverun.py//g'"

run_test "${test}" "${queue}" \
":pytest: Run PyTests (${test})" \
"bash -c \"cd /horovod/test && (echo test_*.py ${exclude_keras_if_needed} | xargs -n 1 \\\$(cat /mpirun_command) pytest -v --capture=no)\""
"bash -c \"cd /horovod/test && (echo test_*.py ${exclude_keras_if_needed} ${exclude_interactiverun} | xargs -n 1 \\\$(cat /mpirun_command) pytest -v --capture=no)\""

# Run test_interactiverun.py
run_test "${test}" "${queue}" \
":pytest: Run PyTests test_interactiverun (${test})" \
"bash -c \"cd /horovod/test && pytest -v --capture=no test_interactiverun.py\""

# Legacy TensorFlow tests
if [[ ${test} != *"tf2_"* ]]; then
Expand Down Expand Up @@ -185,9 +190,11 @@ run_gloo() {
exclude_spark_if_needed="| sed 's/[a-z_]*spark[a-z_.]*//g'"
fi

local exclude_interactiverun="| sed 's/test_interactiverun.py//g'"

run_test "${test}" "${queue}" \
":pytest: Run PyTests (${test})" \
"bash -c \"cd /horovod/test && (echo test_*.py ${exclude_spark_if_needed} | xargs -n 1 horovodrun -np 2 -H localhost:2 --gloo pytest -v --capture=no)\""
"bash -c \"cd /horovod/test && (echo test_*.py ${exclude_spark_if_needed} ${exclude_interactiverun} | xargs -n 1 horovodrun -np 2 -H localhost:2 --gloo pytest -v --capture=no)\""

run_test "${test}" "${queue}" \
":muscle: Test Keras MNIST (${test})" \
Expand Down
4 changes: 2 additions & 2 deletions bin/horovodrun
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
# ==============================================================================

from horovod.run import run
from horovod.run.run import run_commandline

if __name__ == '__main__':
run.run()
run_commandline()
1 change: 1 addition & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ git+https://github.com/sphinx-doc/sphinx@2.0
sphinxcontrib-napoleon
alabaster
nbsphinx
pyyaml
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
16 changes: 16 additions & 0 deletions horovod/run/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2019 Uber Technologies, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================

from .run import run
7 changes: 6 additions & 1 deletion horovod/run/common/util/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
class Settings(object):

def __init__(self, verbose=0, ssh_port=None, key=None, timeout=None,
num_hosts=None, num_proc=None, hosts=None, command=None):
num_hosts=None, num_proc=None, hosts=None):
"""
:param verbose: level of verbosity
:type verbose: int
Expand All @@ -44,4 +44,9 @@ def __init__(self, verbose=0, ssh_port=None, key=None, timeout=None,
self.num_hosts = num_hosts
self.num_proc = num_proc
self.hosts = hosts
self.command = None
self.run_func_mode = None

def set_command(self, command, run_func_mode):
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
self.command = command
self.run_func_mode = run_func_mode
19 changes: 5 additions & 14 deletions horovod/run/gloo_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import threading
import signal

from horovod.run.rendezvous.http_server import RendezvousServer
from horovod.run.http.http_server import RendezvousServer
from horovod.run.common.util import env as env_util, safe_shell_exec
from horovod.run.util import threads
from psutil import net_if_addrs
Expand Down Expand Up @@ -154,7 +154,7 @@ def set_event_on_sigterm(signum, frame):

args_list = []
for alloc_info in host_alloc_plan:
# generate env for rendezvous
# generate env for http
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
horovod_rendez_env = 'HOROVOD_RANK={rank} HOROVOD_SIZE={size} ' \
'HOROVOD_LOCAL_RANK={local_rank} HOROVOD_LOCAL_SIZE={local_size} ' \
'HOROVOD_CROSS_RANK={cross_rank} HOROVOD_CROSS_SIZE={cross_size} ' \
Expand Down Expand Up @@ -195,25 +195,16 @@ def set_event_on_sigterm(signum, frame):
block_until_all_done=True)


def gloo_run(settings, remote_host_names, common_intfs, env):
def gloo_run(settings, remote_host_names, common_intfs, env, server_ip):
# allocate processes into slots
host_alloc_plan = _allocate(settings.hosts, settings.num_proc)

# create global rendezvous server
# create global http server
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
global_rendezv = RendezvousServer(settings.verbose)
# Start rendezvous server and get port that it is listening
# Start http server and get port that it is listening
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
global_rendezv_port = global_rendezv.start_server(host_alloc_plan)

# get the server IPv4 address
iface = list(common_intfs)[0]
server_ip = None
for addr in net_if_addrs()[iface]:
if addr.family == AF_INET:
server_ip = addr.address

if not server_ip:
raise RuntimeError(
'Cannot find an IPv4 address of the common interface.')

run_command = (
'HOROVOD_GLOO_RENDEZVOUS_ADDR={addr} '
Expand Down
File renamed without changes.
48 changes: 48 additions & 0 deletions horovod/run/http/http_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2019 Uber Technologies, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
import sys
import base64
if sys.version < '3':
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
from urllib2 import urlopen
from urllib2 import Request
from urllib2 import HTTPError, URLError
else:
from urllib.request import urlopen
from urllib.request import Request
from urllib.error import HTTPError, URLError


def read_data_from_kvstore(addr, port, scope, key):
try:
url = "http://{addr}:{port}/{scope}/{key}".format(
addr=addr, port=str(port), scope=scope, key=key
)
req = Request(url)
resp = urlopen(req)
return base64.b64decode(resp.read())
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
except (HTTPError, URLError) as e:
raise RuntimeError("Read data from KVStore server failed.", e)
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved


def put_data_into_kvstore(addr, port, scope, key, value):
try:
url = "http://{addr}:{port}/{scope}/{key}".format(
addr=addr, port=str(port), scope=scope, key=key
)
req = Request(url, data=base64.b64encode(value))
req.get_method = lambda: "PUT" # for urllib2 compatibility
urlopen(req)
except (HTTPError, URLError) as e:
raise RuntimeError("Put data input KVStore server failed.", e)
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
OK = 200


class RendezvousHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
class KVStoreHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
# Set timeout
timeout = SINGLE_REQUEST_TIMEOUT

Expand All @@ -39,7 +39,7 @@ def do_GET(self):
paths = self.path.split('/')
if len(paths) < 3:
print(
'Rendezvous ERROR: Invalid request path: {path}.'.format(
'KVStore ERROR: Invalid request path: {path}.'.format(
path=self.path))
self.send_status_code(BAD_REQUEST)
return
Expand All @@ -61,7 +61,7 @@ def do_PUT(self):
paths = self.path.split('/')
if len(paths) < 3:
print(
'Rendezvous ERROR: Invalid request path: {path}.'.format(
'KVStore ERROR: Invalid request path: {path}.'.format(
path=self.path))
self.send_status_code(BAD_REQUEST)
return
Expand All @@ -75,7 +75,7 @@ def do_PUT(self):
except socket.timeout:
if self.server.verbose:
print(
'Rendezvous ERROR: Timeout when receiving {content_bytes} '
'KVStore ERROR: Timeout when receiving {content_bytes} '
'bytes, aborting this incomplete request.' .format(
content_bytes=content_length))

Expand All @@ -91,6 +91,18 @@ def do_PUT(self):

self.send_status_code(OK)

def send_status_code(self, status_code):
self.send_response(status_code)
self.send_header("Content-Length", 0)
self.end_headers()

# Override this function to prevent SimpleHTTPServer printing every
# request out.
def log_message(self, format, *args):
pass


class RendezvousHandler(KVStoreHandler):
# Override DELETE handler
def do_DELETE(self):
paths = self.path.split('/')
Expand All @@ -108,24 +120,14 @@ def do_DELETE(self):

self.send_status_code(OK)

def send_status_code(self, status_code):
self.send_response(status_code)
self.send_header("Content-Length", 0)
self.end_headers()

# Override this function to prevent SimpleHTTPServer printing every
# request out.
def log_message(self, format, *args):
pass


class RendezvousHTTPServer(BaseHTTPServer.HTTPServer, object):
def __init__(self, addr, handler, verbose):
# This class has to inherit from object since HTTPServer is an old-style
# class that does not inherit from object.
super(RendezvousHTTPServer, self).__init__(addr, handler)

# Lists for finished rendezvous workers
# Lists for finished http workers
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
self.finished_list_lock = threading.Lock()
self.finished_list = collections.defaultdict(list)

Expand All @@ -146,7 +148,7 @@ def extract_scope_size(self, host_alloc_plan):
local_rank = slot_info.local_rank
self.scope_size['cross_' + str(local_rank)] = slot_info.cross_size

# Decide whether all ranks have confirmed rendezvous completion.
# Decide whether all ranks have confirmed http completion.
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
def should_continue(self):
should_continue = False
with self.finished_list_lock:
Expand Down Expand Up @@ -185,7 +187,7 @@ def start_server(self, host_alloc_plan):
addr, RendezvousHandler, self.verbose))
self.httpd.extract_scope_size(host_alloc_plan)
if self.verbose:
print('Rendezvous INFO: HTTP rendezvous server started.')
print('Rendezvous INFO: HTTP http server started.')
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved

# start the listening loop
self.listen_thread = threading.Thread(target=self.listen_loop)
Expand All @@ -199,6 +201,53 @@ def listen_loop(self):
while self.httpd.should_continue():
self.httpd.handle_request()

self.httpd.server_close()

if self.verbose:
print('Rendezvous INFO: Rendezvous finishes.')
# Because this thread is daemonized, no need to join.


class KVStoreHTTPServer(BaseHTTPServer.HTTPServer, object):
def __init__(self, addr, handler, verbose):
super(KVStoreHTTPServer, self).__init__(addr, handler)

# Cache that provides the store
self.cache_lock = threading.Lock()
self.cache = {}

self.verbose = verbose


class KVStoreServer:

WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, verbose):
self.httpd = None
self.listen_thread = None
self.verbose = verbose

# KVStore server finds a available port, create http socket,
# and start listening loop to handle request
def start_server(self):
self.httpd, port = find_port(
lambda addr: KVStoreHTTPServer(
addr, KVStoreHandler, self.verbose))

self.listen_thread = threading.Thread(
target=lambda: self.httpd.serve_forever())
self.listen_thread.daemon = True
self.listen_thread.start()

if self.verbose:
print('KVStoreServer INFO: KVStore server started. Listen on port ' + str(port))

return port

def shutdown_server(self):
self.httpd.shutdown()

self.httpd.server_close()

if self.verbose:
print('KVStoreServer INFO: KVStore server finishes.')
# Because this thread is daemonized, no need to join.
9 changes: 8 additions & 1 deletion horovod/run/mpi_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,12 @@ def mpi_run(settings, common_intfs, env):

if settings.verbose >= 2:
print(mpirun_command)

# Execute the mpirun command.
os.execve('/bin/sh', ['/bin/sh', '-c', mpirun_command], env)
if settings.run_func_mode:
exit_code = safe_shell_exec.execute(mpirun_command, env=env)
if exit_code != 0:
raise RuntimeError("mpirun failed with exit code {exit_code}".format(exit_code=exit_code))
else:
os.execve('/bin/sh', ['/bin/sh', '-c', mpirun_command], env)

Loading