Skip to content
This repository has been archived by the owner on Aug 13, 2018. It is now read-only.

Commit

Permalink
Merge pull request #108 from martindurant/clearer_files
Browse files Browse the repository at this point in the history
File fixes
  • Loading branch information
martindurant committed Oct 26, 2017
2 parents 80d3dae + b5c6b24 commit 9abf1e9
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 44 deletions.
2 changes: 1 addition & 1 deletion continuous_integration/simple_test.sh
@@ -1,6 +1,6 @@
#!/usr/bin/env bash
source activate test
conda install -c conda-forge -y lxml py4j
conda install -c conda-forge -y py4j hdfs3
cd /knit
python setup.py install mvn
py.test -vv
20 changes: 14 additions & 6 deletions knit/core.py
Expand Up @@ -18,6 +18,7 @@
from .env import CondaCreator
from .exceptions import KnitException, YARNException
from .yarn_api import YARNAPI
from .utils import triple_slash

from py4j.protocol import Py4JError
from py4j.java_gateway import JavaGateway, GatewayClient
Expand Down Expand Up @@ -313,12 +314,15 @@ def preexec_func():
" Check that java is installed and the Knit JAR"
" file exists.")

gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=True)
gateway = JavaGateway(GatewayClient(port=gateway_port),
auto_convert=True)
self.client = gateway.entry_point
self.client_gateway = gateway
files = [(f if self.check_needs_upload(f) else ('hdfs://' + f))
for f in files]
jfiles = ListConverter().convert(files, gateway._gateway_client)
logger.debug("Files submitted: %s" % files)
upfiles = [f for f in files if (not f.startswith('hdfs://')
and self.check_needs_upload(f))]
logger.debug("Files to upload: %s" % upfiles)
jfiles = ListConverter().convert(upfiles, gateway._gateway_client)
jenv = MapConverter().convert(envvars, gateway._gateway_client)

self.app_id = self.client.start(jfiles, jenv, app_name, queue)
Expand Down Expand Up @@ -347,7 +351,11 @@ def preexec_func():
gateway = JavaGateway(GatewayClient(
address=master_rpchost, port=master_rpcport), auto_convert=True)
self.master = gateway.entry_point
jfiles = ListConverter().convert(files, gateway._gateway_client)
rfiles = [triple_slash(f) if f.startswith('hdfs://') else
'/'.join([self.hdfs_home, '.knitDeps', os.path.basename(f)])
for f in files]
logger.debug("Resource files: %s" % rfiles)
jfiles = ListConverter().convert(rfiles, gateway._gateway_client)
jenv = MapConverter().convert(envvars, gateway._gateway_client)
self.master.init(jfiles, jenv, cmd, num_containers,
virtual_cores, memory)
Expand Down Expand Up @@ -585,7 +593,7 @@ def check_needs_upload(self, path):
"""Upload is needed if file does not exist in HDFS or is older"""
if self.upload_always:
return True
fn = (self.hdfs_home + '/.knitDeps/' + os.path.basename(path))
fn = '/'.join([self.hdfs_home, '.knitDeps', os.path.basename(path)])
if self.hdfs and self.hdfs.exists(fn):
st = os.stat(path)
size = st.st_size
Expand Down
11 changes: 8 additions & 3 deletions knit/dask_yarn.py
Expand Up @@ -29,7 +29,10 @@ class DaskYARNCluster(object):
----------
nn, nn_port, rm, rm_port, user, autodetect, lang: see knit.Knit
env: str or None
If provided, the path of a zipped conda env to put in containers
If provided, the path of a zipped conda env to put in containers. This
can be a local zip file to upload, a zip file already on HDFS (hdfs://)
or a directory to zip and upload. If not provided, a default environment
will be built, containing dask.
packages: list of str
Packages to install in the env to provide to containers *if* env is
None. Uses conda spec for pinning versions. dask and distributed will
Expand Down Expand Up @@ -111,7 +114,8 @@ def start(self, n_workers=1, cpus=1, memory=2048, checks=True,
# create env from scratch
self.env = c.create_env(env_name=env_name,
packages=self.packages)
elif not self.env.endswith('.zip'):
elif (not self.env.endswith('.zip') and
not self.env.startswith('hdfs://')):
# given env directory, so zip it
self.env = zip_path(self.env)

Expand All @@ -124,7 +128,8 @@ def start(self, n_workers=1, cpus=1, memory=2048, checks=True,
''.format(cpus=cpus, mem=memory * 1e6, pref=pref,
addr=self.local_cluster.scheduler.address))

app_id = self.knit.start(command, files=[self.env],
files = [self.env] + kwargs.pop('files', [])
app_id = self.knit.start(command, files=files,
num_containers=n_workers, virtual_cores=cpus,
memory=memory, checks=checks, **kwargs)
self.app_id = app_id
Expand Down
22 changes: 16 additions & 6 deletions knit/tests/test_core.py
Expand Up @@ -208,21 +208,17 @@ def test_hdfs_home():
try:
hdfs.mkdir(d)
k = Knit(nn='localhost', rm='localhost', nn_port=8020, rm_port=8088,
replication_factor=1, hdfs_home=d)
replication_factor=1, hdfs_home=d)

env_zip = k.create_env(env_name='dev', packages=['python=2.7'], remove=True)
k.start('env', env=env_zip, memory=128)
k.start('env', files=[env_zip], memory=128)

assert d + '/.knitDeps' in hdfs.ls(d, False)
assert d + "/.knitDeps/knit-1.0-SNAPSHOT.jar" in hdfs.ls(d + '/.knitDeps', False)
assert d + "/.knitDeps/dev.zip" in hdfs.ls(d + '/.knitDeps', False)
if not k.wait_for_completion(30):
k.kill()

time.sleep(5) # log aggregation
logs = k.logs()
assert "PYTHON_BIN=" in str(logs)
assert "CONDA_PREFIX=" in str(logs)
finally:
hdfs.rm(d, True)
k.kill()
Expand Down Expand Up @@ -338,3 +334,17 @@ def test_kill_all(k):
k.yarn_api.kill_all()
time.sleep(2)
assert k.runtime_status() == 'KILLED'


def test_existing_path(k):
# TODO: is this a good test if we noisily log the upload of files?
hdfs3 = pytest.importorskip('hdfs3')
hdfs = hdfs3.HDFileSystem()
k.hdfs = hdfs
cmd = 'ls -l'
hdfs.put(__file__, '/tmp/mytestfile')
k.start(cmd, files=['hdfs://tmp/mytestfile'])
wait_for_status(k, 'FINISHED')
time.sleep(2)
out = k.logs()
assert ' mytestfile ' in str(out)
39 changes: 15 additions & 24 deletions knit/tests/test_dask.py
Expand Up @@ -110,7 +110,7 @@ def test_yarn_cluster_add_stop(loop):

client = Client(cluster)
future = client.submit(lambda x: x + 1, 10)
assert future.result() == 11
assert future.result() == 11 # waits for cluster to have a worker

info = client.scheduler_info()
workers = info['workers']
Expand All @@ -120,37 +120,28 @@ def test_yarn_cluster_add_stop(loop):
num_containers = status['runningContainers']
assert num_containers == 2 # 1 container for the worker and 1 for the AM

# Add a worker
cluster.add_workers(n_workers=1, cpus=1, memory=128)

while num_containers != 3:
status = cluster.knit.status()
num_containers = status['runningContainers']

# wait a tad to let workers connect to scheduler

start = time.time()
while len(client.scheduler_info()['workers']) < 2:
time.sleep(0.1)
assert time.time() < start + 10
time.sleep(1)
assert time.time() < start + 60

status = cluster.knit.status()
num_containers = status['runningContainers']
assert num_containers == 3
info = client.scheduler_info()
workers = info['workers']
assert len(workers) == 2

assert len(cluster.workers) == 2

# Remove a worker
cluster.remove_worker(cluster.workers[1])
while num_containers != 2:
status = cluster.knit.status()
num_containers = status['runningContainers']

time.sleep(2)
assert len(cluster.workers) == 1
start = time.time()
while len(client.scheduler_info()['workers']) == 2:
time.sleep(1)
assert time.time() < start + 10

# STOP ALL WORKERS!
cluster.stop()
time.sleep(2)

workers = client.scheduler_info()['workers']
assert len(workers) == 0
start = time.time()
while len(client.scheduler_info()['workers']) > 0:
time.sleep(2)
assert time.time() < start + 10
8 changes: 7 additions & 1 deletion knit/tests/test_utils.py
@@ -1,6 +1,6 @@
import logging

from knit.utils import set_logging
from knit.utils import set_logging, triple_slash


def test_set_logging():
Expand All @@ -9,3 +9,9 @@ def test_set_logging():
assert logger.level == logging.DEBUG
set_logging(logging.INFO)
assert logger.level == logging.INFO


def test_slashes():
assert triple_slash('hdfs://hello/path') == 'hdfs:///hello/path'
assert triple_slash('hdfs:///hello/path') == 'hdfs:///hello/path'
assert triple_slash('hdfs:////hello/path') == 'hdfs:////hello/path'
7 changes: 7 additions & 0 deletions knit/utils.py
Expand Up @@ -43,3 +43,10 @@ def get_log_content(s):
ind1 = s[ind0:].find("</td>")
out = s[ind0:ind0+ind1]
return out.lstrip('\n <pre>').rstrip('</pre>\n ')


def triple_slash(s):
if s.startswith('hdfs://') and not s.startswith('hdfs:///'):
return 'hdfs:///' + s[7:]
else:
return s
Expand Up @@ -193,9 +193,8 @@ object ApplicationMaster extends Logging with AMRMClientAsync.CallbackHandler wi
val fileUpload = Records.newRecord(classOf[LocalResource])
var p = new Path(fileName)
val name = p.getName
val p2 = new Path(".knitDeps/" + name)
logger.info(f"RESOURCE: $p => $p2 archive=$iszip")
setUpLocalResource(p2, fileUpload, archived=iszip)
logger.info(f"RESOURCE: $p archive=$iszip")
setUpLocalResource(p, fileUpload, archived=iszip)
localResources(name) = fileUpload
}
}
Expand Down

0 comments on commit 9abf1e9

Please sign in to comment.