diff --git a/continuous_integration/simple_test.sh b/continuous_integration/simple_test.sh index 5fe39ac..74980f3 100644 --- a/continuous_integration/simple_test.sh +++ b/continuous_integration/simple_test.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash source activate test -conda install -c conda-forge -y lxml py4j +conda install -c conda-forge -y py4j hdfs3 cd /knit python setup.py install mvn py.test -vv \ No newline at end of file diff --git a/knit/core.py b/knit/core.py index 949e896..b3b4d23 100644 --- a/knit/core.py +++ b/knit/core.py @@ -18,6 +18,7 @@ from .env import CondaCreator from .exceptions import KnitException, YARNException from .yarn_api import YARNAPI +from .utils import triple_slash from py4j.protocol import Py4JError from py4j.java_gateway import JavaGateway, GatewayClient @@ -313,12 +314,15 @@ def preexec_func(): " Check that java is installed and the Knit JAR" " file exists.") - gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=True) + gateway = JavaGateway(GatewayClient(port=gateway_port), + auto_convert=True) self.client = gateway.entry_point self.client_gateway = gateway - files = [(f if self.check_needs_upload(f) else ('hdfs://' + f)) - for f in files] - jfiles = ListConverter().convert(files, gateway._gateway_client) + logger.debug("Files submitted: %s" % files) + upfiles = [f for f in files if (not f.startswith('hdfs://') + and self.check_needs_upload(f))] + logger.debug("Files to upload: %s" % upfiles) + jfiles = ListConverter().convert(upfiles, gateway._gateway_client) jenv = MapConverter().convert(envvars, gateway._gateway_client) self.app_id = self.client.start(jfiles, jenv, app_name, queue) @@ -347,7 +351,11 @@ def preexec_func(): gateway = JavaGateway(GatewayClient( address=master_rpchost, port=master_rpcport), auto_convert=True) self.master = gateway.entry_point - jfiles = ListConverter().convert(files, gateway._gateway_client) + rfiles = [triple_slash(f) if f.startswith('hdfs://') else + '/'.join([self.hdfs_home, '.knitDeps', os.path.basename(f)]) + for f in files] + logger.debug("Resource files: %s" % rfiles) + jfiles = ListConverter().convert(rfiles, gateway._gateway_client) jenv = MapConverter().convert(envvars, gateway._gateway_client) self.master.init(jfiles, jenv, cmd, num_containers, virtual_cores, memory) @@ -585,7 +593,7 @@ def check_needs_upload(self, path): """Upload is needed if file does not exist in HDFS or is older""" if self.upload_always: return True - fn = (self.hdfs_home + '/.knitDeps/' + os.path.basename(path)) + fn = '/'.join([self.hdfs_home, '.knitDeps', os.path.basename(path)]) if self.hdfs and self.hdfs.exists(fn): st = os.stat(path) size = st.st_size diff --git a/knit/dask_yarn.py b/knit/dask_yarn.py index 0452572..9c1217a 100644 --- a/knit/dask_yarn.py +++ b/knit/dask_yarn.py @@ -29,7 +29,10 @@ class DaskYARNCluster(object): ---------- nn, nn_port, rm, rm_port, user, autodetect, lang: see knit.Knit env: str or None - If provided, the path of a zipped conda env to put in containers + If provided, the path of a zipped conda env to put in containers. This + can be a local zip file to upload, a zip file already on HDFS (hdfs://) + or a directory to zip and upload. If not provided, a default environment + will be built, containing dask. packages: list of str Packages to install in the env to provide to containers *if* env is None. Uses conda spec for pinning versions. dask and distributed will @@ -111,7 +114,8 @@ def start(self, n_workers=1, cpus=1, memory=2048, checks=True, # create env from scratch self.env = c.create_env(env_name=env_name, packages=self.packages) - elif not self.env.endswith('.zip'): + elif (not self.env.endswith('.zip') and + not self.env.startswith('hdfs://')): # given env directory, so zip it self.env = zip_path(self.env) @@ -124,7 +128,8 @@ def start(self, n_workers=1, cpus=1, memory=2048, checks=True, ''.format(cpus=cpus, mem=memory * 1e6, pref=pref, addr=self.local_cluster.scheduler.address)) - app_id = self.knit.start(command, files=[self.env], + files = [self.env] + kwargs.pop('files', []) + app_id = self.knit.start(command, files=files, num_containers=n_workers, virtual_cores=cpus, memory=memory, checks=checks, **kwargs) self.app_id = app_id diff --git a/knit/tests/test_core.py b/knit/tests/test_core.py index e17ffaf..ac0d9ef 100644 --- a/knit/tests/test_core.py +++ b/knit/tests/test_core.py @@ -208,10 +208,10 @@ def test_hdfs_home(): try: hdfs.mkdir(d) k = Knit(nn='localhost', rm='localhost', nn_port=8020, rm_port=8088, - replication_factor=1, hdfs_home=d) + replication_factor=1, hdfs_home=d) env_zip = k.create_env(env_name='dev', packages=['python=2.7'], remove=True) - k.start('env', env=env_zip, memory=128) + k.start('env', files=[env_zip], memory=128) assert d + '/.knitDeps' in hdfs.ls(d, False) assert d + "/.knitDeps/knit-1.0-SNAPSHOT.jar" in hdfs.ls(d + '/.knitDeps', False) @@ -219,10 +219,6 @@ def test_hdfs_home(): if not k.wait_for_completion(30): k.kill() - time.sleep(5) # log aggregation - logs = k.logs() - assert "PYTHON_BIN=" in str(logs) - assert "CONDA_PREFIX=" in str(logs) finally: hdfs.rm(d, True) k.kill() @@ -338,3 +334,17 @@ def test_kill_all(k): k.yarn_api.kill_all() time.sleep(2) assert k.runtime_status() == 'KILLED' + + +def test_existing_path(k): + # TODO: is this a good test if we noisily log the upload of files? + hdfs3 = pytest.importorskip('hdfs3') + hdfs = hdfs3.HDFileSystem() + k.hdfs = hdfs + cmd = 'ls -l' + hdfs.put(__file__, '/tmp/mytestfile') + k.start(cmd, files=['hdfs://tmp/mytestfile']) + wait_for_status(k, 'FINISHED') + time.sleep(2) + out = k.logs() + assert ' mytestfile ' in str(out) diff --git a/knit/tests/test_dask.py b/knit/tests/test_dask.py index a1873f6..9a25124 100644 --- a/knit/tests/test_dask.py +++ b/knit/tests/test_dask.py @@ -110,7 +110,7 @@ def test_yarn_cluster_add_stop(loop): client = Client(cluster) future = client.submit(lambda x: x + 1, 10) - assert future.result() == 11 + assert future.result() == 11 # waits for cluster to have a worker info = client.scheduler_info() workers = info['workers'] @@ -120,37 +120,28 @@ def test_yarn_cluster_add_stop(loop): num_containers = status['runningContainers'] assert num_containers == 2 # 1 container for the worker and 1 for the AM + # Add a worker cluster.add_workers(n_workers=1, cpus=1, memory=128) - - while num_containers != 3: - status = cluster.knit.status() - num_containers = status['runningContainers'] - - # wait a tad to let workers connect to scheduler - start = time.time() while len(client.scheduler_info()['workers']) < 2: - time.sleep(0.1) - assert time.time() < start + 10 + time.sleep(1) + assert time.time() < start + 60 + status = cluster.knit.status() + num_containers = status['runningContainers'] assert num_containers == 3 - info = client.scheduler_info() - workers = info['workers'] - assert len(workers) == 2 - assert len(cluster.workers) == 2 + # Remove a worker cluster.remove_worker(cluster.workers[1]) - while num_containers != 2: - status = cluster.knit.status() - num_containers = status['runningContainers'] - - time.sleep(2) - assert len(cluster.workers) == 1 + start = time.time() + while len(client.scheduler_info()['workers']) == 2: + time.sleep(1) + assert time.time() < start + 10 # STOP ALL WORKERS! cluster.stop() - time.sleep(2) - - workers = client.scheduler_info()['workers'] - assert len(workers) == 0 + start = time.time() + while len(client.scheduler_info()['workers']) > 0: + time.sleep(2) + assert time.time() < start + 10 diff --git a/knit/tests/test_utils.py b/knit/tests/test_utils.py index 4b49841..38bd352 100644 --- a/knit/tests/test_utils.py +++ b/knit/tests/test_utils.py @@ -1,6 +1,6 @@ import logging -from knit.utils import set_logging +from knit.utils import set_logging, triple_slash def test_set_logging(): @@ -9,3 +9,9 @@ def test_set_logging(): assert logger.level == logging.DEBUG set_logging(logging.INFO) assert logger.level == logging.INFO + + +def test_slashes(): + assert triple_slash('hdfs://hello/path') == 'hdfs:///hello/path' + assert triple_slash('hdfs:///hello/path') == 'hdfs:///hello/path' + assert triple_slash('hdfs:////hello/path') == 'hdfs:////hello/path' diff --git a/knit/utils.py b/knit/utils.py index 8b951ee..ff0bec6 100644 --- a/knit/utils.py +++ b/knit/utils.py @@ -43,3 +43,10 @@ def get_log_content(s): ind1 = s[ind0:].find("") out = s[ind0:ind0+ind1] return out.lstrip('\n
').rstrip('
\n ') + + +def triple_slash(s): + if s.startswith('hdfs://') and not s.startswith('hdfs:///'): + return 'hdfs:///' + s[7:] + else: + return s diff --git a/knit_jvm/src/main/scala/io/continuum/knit/ApplicationMaster.scala b/knit_jvm/src/main/scala/io/continuum/knit/ApplicationMaster.scala index 876fc7f..d67e279 100644 --- a/knit_jvm/src/main/scala/io/continuum/knit/ApplicationMaster.scala +++ b/knit_jvm/src/main/scala/io/continuum/knit/ApplicationMaster.scala @@ -193,9 +193,8 @@ object ApplicationMaster extends Logging with AMRMClientAsync.CallbackHandler wi val fileUpload = Records.newRecord(classOf[LocalResource]) var p = new Path(fileName) val name = p.getName - val p2 = new Path(".knitDeps/" + name) - logger.info(f"RESOURCE: $p => $p2 archive=$iszip") - setUpLocalResource(p2, fileUpload, archived=iszip) + logger.info(f"RESOURCE: $p archive=$iszip") + setUpLocalResource(p, fileUpload, archived=iszip) localResources(name) = fileUpload } }