Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Spark's Hadoop token file available to Python method #1532

Merged
merged 4 commits into from
Dec 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion horovod/run/common/service/task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
# ==============================================================================

import os
import threading
import time

Expand Down Expand Up @@ -52,10 +53,11 @@ def __init__(self, result):


class BasicTaskService(network.BasicService):
def __init__(self, name, key, nic):
def __init__(self, name, key, nic, service_env_keys):
super(BasicTaskService, self).__init__(name, key, nic)
self._initial_registration_complete = False
self._wait_cond = threading.Condition()
self._service_env_keys = service_env_keys
self._command_thread = None
self._fn_result = None

Expand All @@ -64,6 +66,14 @@ def _handle(self, req, client_address):
self._wait_cond.acquire()
try:
if self._command_thread is None:
# we inject all these environment variables
# to make them available to the executed command
# NOTE: this will overwrite environment variables that exist in req.env
for key in self._service_env_keys:
value = os.environ.get(key)
if value is not None:
req.env[key] = value

# We only permit executing exactly one command, so this is idempotent.
self._command_thread = threading.Thread(
target=safe_shell_exec.execute,
Expand Down
5 changes: 4 additions & 1 deletion horovod/spark/task/task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

class SparkTaskService(task_service.BasicTaskService):
NAME_FORMAT = 'task service #%d'
SERVICE_ENV_KEYS = ['HADOOP_TOKEN_FILE_LOCATION']

def __init__(self, index, key, nic):
super(SparkTaskService, self).__init__(SparkTaskService.NAME_FORMAT % index, key, nic)
super(SparkTaskService, self).__init__(SparkTaskService.NAME_FORMAT % index,
key, nic,
SparkTaskService.SERVICE_ENV_KEYS)


class SparkTaskClient(task_service.BasicTaskClient):
Expand Down
22 changes: 22 additions & 0 deletions test/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
from __future__ import division
from __future__ import print_function

import contextlib
import os
import shutil
import tempfile


def mpi_env_rank_and_size():
Expand Down Expand Up @@ -54,3 +57,22 @@ def mpi_env_rank_and_size():

# Default to rank zero and size one if there are no environment variables
return 0, 1


@contextlib.contextmanager
def tempdir():
dirpath = tempfile.mkdtemp()
try:
yield dirpath
finally:
shutil.rmtree(dirpath)


@contextlib.contextmanager
def temppath():
path = tempfile.mktemp()
try:
yield path
finally:
if os.path.exists(path):
shutil.rmtree(path)
29 changes: 29 additions & 0 deletions test/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@
import unittest
import warnings

from horovod.run.common.util import secret
from horovod.run.mpi_run import _get_mpi_implementation_flags
import horovod.spark
from horovod.spark.task.task_service import SparkTaskService, SparkTaskClient
import horovod.torch as hvd

from mock import MagicMock

from common import tempdir


@contextlib.contextmanager
def spark(app, cores=2, *args):
Expand Down Expand Up @@ -212,3 +216,28 @@ def fn():
self.assertTrue(len(actual_secret) > 0)
self.assertEqual(actual_stdout, stdout)
self.assertEqual(actual_stderr, stderr)

def test_spark_task_service_env(self):
key = secret.make_secret_key()
service_env = dict([(key, '{} value'.format(key))
for key in SparkTaskService.SERVICE_ENV_KEYS])
service_env.update({"other": "value"})
with os_environ(service_env):
service = SparkTaskService(1, key, None)
client = SparkTaskClient(1, service.addresses(), key, 3)

with tempdir() as d:
file = '{}/env'.format(d)
command = "env | grep -v '^PWD='> {}".format(file)
command_env = {"test": "value"}

try:
client.run_command(command, command_env)
client.wait_for_command_termination()
finally:
service.shutdown()

with open(file) as f:
env = sorted([line.strip() for line in f.readlines()])
expected = ['HADOOP_TOKEN_FILE_LOCATION=HADOOP_TOKEN_FILE_LOCATION value', 'test=value']
self.assertEqual(env, expected)