Skip to content
This repository was archived by the owner on Nov 3, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ script:
# https://github.com/boto/boto/issues/3263
- export BOTO_CONFIG=/tmp/bogusvalue
- travis_wait 30 py.test --cov=dask_ec2 --cov-report term-missing -s -vv --durations=0
- travis_wait 30 py.test --flake8 dask_ec2

after_success:
- pip install coveralls
Expand Down
2 changes: 2 additions & 0 deletions dask_ec2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions

__all__ = ["Cluster", "EC2", "Instance", "__version__"]
57 changes: 22 additions & 35 deletions dask_ec2/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class NotThisMethod(Exception):
HANDLERS = {}


def register_vcs_handler(vcs, method): # decorator
def register_vcs_handler(vcs, method): # decorator

def decorate(f):
if vcs not in HANDLERS:
Expand Down Expand Up @@ -104,10 +104,7 @@ def versions_from_parentdir(parentdir_prefix, root, verbose):
print("guessing rootdir is '%s', but '%s' doesn't start with "
"prefix '%s'" % (root, dirname, parentdir_prefix))
raise NotThisMethod("rootdir doesn't start with parentdir_prefix")
return {"version": dirname[len(parentdir_prefix):],
"full-revisionid": None,
"dirty": False,
"error": None}
return {"version": dirname[len(parentdir_prefix):], "full-revisionid": None, "dirty": False, "error": None}


@register_vcs_handler("git", "get_keywords")
Expand Down Expand Up @@ -167,17 +164,16 @@ def git_versions_from_keywords(keywords, tag_prefix, verbose):
r = ref[len(tag_prefix):]
if verbose:
print("picking %s" % r)
return {"version": r,
"full-revisionid": keywords["full"].strip(),
"dirty": False,
"error": None}
return {"version": r, "full-revisionid": keywords["full"].strip(), "dirty": False, "error": None}
# no suitable tags, so version is "0+unknown", but full hex is still there
if verbose:
print("no suitable tags, using unknown + full revision id")
return {"version": "0+unknown",
"full-revisionid": keywords["full"].strip(),
"dirty": False,
"error": "no suitable tags"}
return {
"version": "0+unknown",
"full-revisionid": keywords["full"].strip(),
"dirty": False,
"error": "no suitable tags"
}


@register_vcs_handler("git", "pieces_from_vcs")
Expand All @@ -197,9 +193,7 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command):
GITS = ["git.cmd", "git.exe"]
# if there is a tag, this yields TAG-NUM-gHEX[-dirty]
# if there are no tags, this yields HEX[-dirty] (no NUM)
describe_out = run_command(GITS,
["describe", "--tags", "--dirty", "--always", "--long"],
cwd=root)
describe_out = run_command(GITS, ["describe", "--tags", "--dirty", "--always", "--long"], cwd=root)
# --long was added in git-1.5.5
if describe_out is None:
raise NotThisMethod("'git describe' failed")
Expand All @@ -211,7 +205,7 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command):

pieces = {}
pieces["long"] = full_out
pieces["short"] = full_out[:7] # maybe improved later
pieces["short"] = full_out[:7] # maybe improved later
pieces["error"] = None

# parse describe_out. It will be like TAG-NUM-gHEX[-dirty] or HEX[-dirty]
Expand Down Expand Up @@ -254,7 +248,7 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command):
# HEX: no tags
pieces["closest-tag"] = None
count_out = run_command(GITS, ["rev-list", "HEAD", "--count"], cwd=root)
pieces["distance"] = int(count_out) # total number of commits
pieces["distance"] = int(count_out) # total number of commits

return pieces

Expand Down Expand Up @@ -389,13 +383,10 @@ def render_git_describe_long(pieces):

def render(pieces, style):
if pieces["error"]:
return {"version": "unknown",
"full-revisionid": pieces.get("long"),
"dirty": None,
"error": pieces["error"]}
return {"version": "unknown", "full-revisionid": pieces.get("long"), "dirty": None, "error": pieces["error"]}

if not style or style == "default":
style = "pep440" # the default
style = "pep440" # the default

if style == "pep440":
rendered = render_pep440(pieces)
Expand All @@ -412,10 +403,7 @@ def render(pieces, style):
else:
raise ValueError("unknown style '%s'" % style)

return {"version": rendered,
"full-revisionid": pieces["long"],
"dirty": pieces["dirty"],
"error": None}
return {"version": rendered, "full-revisionid": pieces["long"], "dirty": pieces["dirty"], "error": None}


def get_versions():
Expand All @@ -440,10 +428,12 @@ def get_versions():
for i in cfg.versionfile_source.split('/'):
root = os.path.dirname(root)
except NameError:
return {"version": "0+unknown",
"full-revisionid": None,
"dirty": None,
"error": "unable to find root of source tree"}
return {
"version": "0+unknown",
"full-revisionid": None,
"dirty": None,
"error": "unable to find root of source tree"
}

try:
pieces = git_pieces_from_vcs(cfg.tag_prefix, root, verbose)
Expand All @@ -457,7 +447,4 @@ def get_versions():
except NotThisMethod:
pass

return {"version": "0+unknown",
"full-revisionid": None,
"dirty": None,
"error": "unable to compute version"}
return {"version": "0+unknown", "full-revisionid": None, "dirty": None, "error": "unable to compute version"}
32 changes: 14 additions & 18 deletions dask_ec2/cli/daskd.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ def dask(ctx, filepath, nprocs, source):
def dask_install(ctx, filepath, shell, nprocs, source):
cluster = Cluster.from_filepath(filepath)
scheduler_public_ip = cluster.instances[0].ip
upload_pillar(cluster, "dask.sls", {"dask": {
"scheduler_public_ip": scheduler_public_ip,
"source_install": source,
"dask-worker": {
"nprocs": nprocs
}
}})
upload_pillar(cluster, "dask.sls", {
"dask": {
"scheduler_public_ip": scheduler_public_ip,
"source_install": source,
"dask-worker": {
"nprocs": nprocs
}
}
})

click.echo("Installing scheduler")
cluster.pepper.local("node-0", "grains.append", ["roles", "dask.distributed.scheduler"])
Expand Down Expand Up @@ -134,9 +136,7 @@ def dask_address(ctx, filepath):


@dask.command(
"shell",
short_help=
"Open a python (ipython if available) shell connected to the dask.distributed cluster")
"shell", short_help="Open a python (ipython if available) shell connected to the dask.distributed cluster")
@click.pass_context
@click.option("--file",
"filepath",
Expand All @@ -147,13 +147,12 @@ def dask_address(ctx, filepath):
help="Filepath to the instances metadata")
def dask_shell(ctx, filepath):
try:
import distributed
import distributed # noqa
except:
click.echo("ERROR: `distributed` package not found, not starting the python shell",
err=True)
click.echo("ERROR: `distributed` package not found, not starting the python shell", err=True)
sys.exit(1)
try:
import IPython
import IPython # noqa
shell = "ipython"
except:
shell = "python"
Expand All @@ -169,10 +168,7 @@ def dask_shell(ctx, filepath):
subprocess.call(cmd)


@dask.command(
"ui",
short_help=
"Open a web browser pointing to the Dask UI")
@dask.command("ui", short_help="Open a web browser pointing to the Dask UI")
@click.pass_context
@click.option("--file",
"filepath",
Expand Down
49 changes: 26 additions & 23 deletions dask_ec2/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@

from botocore.exceptions import ClientError

import dask_ec2
from ..salt import Response
from ..cluster import Cluster
from ..exceptions import DaskEc2Exception
from ..config import setup_logging
from ..exceptions import DaskEc2Exception
from ..salt import Response
from .utils import Table
import dask_ec2


def start():
Expand All @@ -28,8 +28,7 @@ def start():
click.echo("Unexpected EC2 error: %s" % e, err=True)
sys.exit(1)
except KeyboardInterrupt:
click.echo(
"Interrupted by Ctrl-C. One or more actions could be still running in the cluster")
click.echo("Interrupted by Ctrl-C. One or more actions could be still running in the cluster")
sys.exit(1)
except Exception as e:
click.echo(traceback.format_exc(), err=True)
Expand All @@ -53,7 +52,10 @@ def cli(ctx):
required=True,
type=click.Path(exists=True),
help="Path to the keypair that matches the keyname")
@click.option("--name", required=False, default="dask-ec2-cluster", help="Tag name on EC2")
@click.option("--name",
required=False,
default="dask-ec2-cluster",
help="Tag name on EC2")
@click.option("--region-name",
default="us-east-1",
show_default=True,
Expand Down Expand Up @@ -139,19 +141,22 @@ def cli(ctx):
help="Install Dask/Distributed from git master")
def up(ctx, name, keyname, keypair, region_name, vpc_id, subnet_id,
iaminstance_name, ami, username, instance_type, count,
security_group_name, security_group_id, volume_type, volume_size, filepath, _provision, anaconda_,
dask, notebook, nprocs, source):
security_group_name, security_group_id, volume_type, volume_size,
filepath, _provision, anaconda_, dask, notebook, nprocs, source):
import os
import yaml
from ..ec2 import EC2

if os.path.exists(filepath):
if not click.confirm("A file named {} already exists, proceding will overwrite this file. Continue?".format(filepath)):
msg = "A file named {} already exists, proceding will overwrite this file. Continue?".format(filepath)
if not click.confirm(msg):
click.echo("Not doing anything")
sys.exit(0)

driver = EC2(region=region_name, vpc_id=vpc_id, subnet_id=subnet_id,
default_vpc=not(vpc_id), default_subnet=not(subnet_id),
driver = EC2(region=region_name,
vpc_id=vpc_id,
subnet_id=subnet_id,
default_vpc=not (vpc_id),
default_subnet=not (subnet_id),
iaminstance_name=iaminstance_name)
click.echo("Launching nodes")
instances = driver.launch(name=name,
Expand All @@ -171,8 +176,8 @@ def up(ctx, name, keyname, keypair, region_name, vpc_id, subnet_id,
cluster.to_file(filepath)

if _provision:
ctx.invoke(provision, filepath=filepath, anaconda_=anaconda_, dask=dask,
notebook=notebook, nprocs=nprocs, source=source)
ctx.invoke(provision, filepath=filepath, anaconda_=anaconda_,
dask=dask, notebook=notebook, nprocs=nprocs, source=source)


@cli.command(short_help="Destroy cluster")
Expand All @@ -186,15 +191,16 @@ def up(ctx, name, keyname, keypair, region_name, vpc_id, subnet_id,
required=False,
help="Filepath to the instances metadata")
@click.option('--yes', '-y', is_flag=True, default=False, help='Answers yes to questions')
@click.option("--region-name", default="us-east-1", show_default=True, required=False, help="AWS region")
def destroy(ctx, filepath, yes):
import os
from ..ec2 import EC2
cluster = Cluster.from_filepath(filepath)

question = 'Are you sure you want to destroy the cluster?'
if yes or click.confirm(question):
driver = EC2(region=cluster.region, default_vpc=False, default_subnet=False)
#needed if there is no default vpc or subnet
driver = EC2(region=cluster.region_name, default_vpc=False, default_subnet=False)
# needed if there is no default vpc or subnet
ids = [i.uid for i in cluster.instances]
click.echo("Terminating instances")
driver.destroy(ids)
Expand Down Expand Up @@ -313,17 +319,14 @@ def provision(ctx, filepath, ssh_check, master, minions, upload, anaconda_, dask
upload_formulas(cluster)
click.echo("Uploading conda and cluster settings")
upload_pillar(cluster, "conda.sls", {"conda": {"pyversion": 2 if six.PY2 else 3}})
upload_pillar(cluster, "cluster.sls",
{"cluster": {
"username": cluster.instances[0].username
}
})
upload_pillar(cluster, "cluster.sls", {"cluster": {"username": cluster.instances[0].username}})
if anaconda_:
ctx.invoke(anaconda, filepath=filepath)
if dask:
from .daskd import dask_install
ctx.invoke(dask_install, filepath=filepath, nprocs=nprocs, source=source)
if notebook:
from .notebook import notebook_install
ctx.invoke(notebook_install, filepath=filepath)


Expand Down Expand Up @@ -363,5 +366,5 @@ def print_state(output):
return response


from .daskd import *
from .notebook import *
from .daskd import * # noqa
from .notebook import * # noqa
14 changes: 3 additions & 11 deletions dask_ec2/cli/notebook.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ..cluster import Cluster
from ..salt import upload_pillar


@cli.group('notebook', invoke_without_command=True, short_help='Provision the Jupyter notebook')
@click.pass_context
@click.option("--file",
Expand All @@ -30,21 +31,13 @@ def notebook(ctx, filepath):
show_default=True,
required=False,
help="Filepath to the instances metadata")
@click.option("--password",
default="jupyter",
show_default=True,
required=False,
help="Password for Jupyter Notebook")
@click.option("--password", default="jupyter", show_default=True, required=False, help="Password for Jupyter Notebook")
@click.pass_context
def notebook_install(ctx, filepath, password):
click.echo("Installing Jupyter notebook on the head node")
cluster = Cluster.from_filepath(filepath)

upload_pillar(cluster, "jupyter.sls",
{"jupyter": {
"password": password
}
})
upload_pillar(cluster, "jupyter.sls", {"jupyter": {"password": password}})

# only install on head node
output = cluster.salt_call("node-0", "state.sls", ["jupyter.notebook"])
Expand All @@ -59,7 +52,6 @@ def notebook_install(ctx, filepath, password):
(cluster.head.ip, password))



@notebook.command("open", short_help="Open a web browser pointing to the Notebook UI")
@click.pass_context
@click.option("--file",
Expand Down
Loading