diff --git a/.travis.yml b/.travis.yml index 0f4134a..b4d4fa9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -62,6 +62,7 @@ script: # https://github.com/boto/boto/issues/3263 - export BOTO_CONFIG=/tmp/bogusvalue - travis_wait 30 py.test --cov=dask_ec2 --cov-report term-missing -s -vv --durations=0 + - travis_wait 30 py.test --flake8 dask_ec2 after_success: - pip install coveralls diff --git a/dask_ec2/__init__.py b/dask_ec2/__init__.py index 5a52496..cc91401 100644 --- a/dask_ec2/__init__.py +++ b/dask_ec2/__init__.py @@ -5,3 +5,5 @@ from ._version import get_versions __version__ = get_versions()['version'] del get_versions + +__all__ = ["Cluster", "EC2", "Instance", "__version__"] diff --git a/dask_ec2/_version.py b/dask_ec2/_version.py index 3e73593..066627f 100644 --- a/dask_ec2/_version.py +++ b/dask_ec2/_version.py @@ -50,7 +50,7 @@ class NotThisMethod(Exception): HANDLERS = {} -def register_vcs_handler(vcs, method): # decorator +def register_vcs_handler(vcs, method): # decorator def decorate(f): if vcs not in HANDLERS: @@ -104,10 +104,7 @@ def versions_from_parentdir(parentdir_prefix, root, verbose): print("guessing rootdir is '%s', but '%s' doesn't start with " "prefix '%s'" % (root, dirname, parentdir_prefix)) raise NotThisMethod("rootdir doesn't start with parentdir_prefix") - return {"version": dirname[len(parentdir_prefix):], - "full-revisionid": None, - "dirty": False, - "error": None} + return {"version": dirname[len(parentdir_prefix):], "full-revisionid": None, "dirty": False, "error": None} @register_vcs_handler("git", "get_keywords") @@ -167,17 +164,16 @@ def git_versions_from_keywords(keywords, tag_prefix, verbose): r = ref[len(tag_prefix):] if verbose: print("picking %s" % r) - return {"version": r, - "full-revisionid": keywords["full"].strip(), - "dirty": False, - "error": None} + return {"version": r, "full-revisionid": keywords["full"].strip(), "dirty": False, "error": None} # no suitable tags, so version is "0+unknown", but full hex is still there if verbose: print("no suitable tags, using unknown + full revision id") - return {"version": "0+unknown", - "full-revisionid": keywords["full"].strip(), - "dirty": False, - "error": "no suitable tags"} + return { + "version": "0+unknown", + "full-revisionid": keywords["full"].strip(), + "dirty": False, + "error": "no suitable tags" + } @register_vcs_handler("git", "pieces_from_vcs") @@ -197,9 +193,7 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command): GITS = ["git.cmd", "git.exe"] # if there is a tag, this yields TAG-NUM-gHEX[-dirty] # if there are no tags, this yields HEX[-dirty] (no NUM) - describe_out = run_command(GITS, - ["describe", "--tags", "--dirty", "--always", "--long"], - cwd=root) + describe_out = run_command(GITS, ["describe", "--tags", "--dirty", "--always", "--long"], cwd=root) # --long was added in git-1.5.5 if describe_out is None: raise NotThisMethod("'git describe' failed") @@ -211,7 +205,7 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command): pieces = {} pieces["long"] = full_out - pieces["short"] = full_out[:7] # maybe improved later + pieces["short"] = full_out[:7] # maybe improved later pieces["error"] = None # parse describe_out. It will be like TAG-NUM-gHEX[-dirty] or HEX[-dirty] @@ -254,7 +248,7 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command): # HEX: no tags pieces["closest-tag"] = None count_out = run_command(GITS, ["rev-list", "HEAD", "--count"], cwd=root) - pieces["distance"] = int(count_out) # total number of commits + pieces["distance"] = int(count_out) # total number of commits return pieces @@ -389,13 +383,10 @@ def render_git_describe_long(pieces): def render(pieces, style): if pieces["error"]: - return {"version": "unknown", - "full-revisionid": pieces.get("long"), - "dirty": None, - "error": pieces["error"]} + return {"version": "unknown", "full-revisionid": pieces.get("long"), "dirty": None, "error": pieces["error"]} if not style or style == "default": - style = "pep440" # the default + style = "pep440" # the default if style == "pep440": rendered = render_pep440(pieces) @@ -412,10 +403,7 @@ def render(pieces, style): else: raise ValueError("unknown style '%s'" % style) - return {"version": rendered, - "full-revisionid": pieces["long"], - "dirty": pieces["dirty"], - "error": None} + return {"version": rendered, "full-revisionid": pieces["long"], "dirty": pieces["dirty"], "error": None} def get_versions(): @@ -440,10 +428,12 @@ def get_versions(): for i in cfg.versionfile_source.split('/'): root = os.path.dirname(root) except NameError: - return {"version": "0+unknown", - "full-revisionid": None, - "dirty": None, - "error": "unable to find root of source tree"} + return { + "version": "0+unknown", + "full-revisionid": None, + "dirty": None, + "error": "unable to find root of source tree" + } try: pieces = git_pieces_from_vcs(cfg.tag_prefix, root, verbose) @@ -457,7 +447,4 @@ def get_versions(): except NotThisMethod: pass - return {"version": "0+unknown", - "full-revisionid": None, - "dirty": None, - "error": "unable to compute version"} + return {"version": "0+unknown", "full-revisionid": None, "dirty": None, "error": "unable to compute version"} diff --git a/dask_ec2/cli/daskd.py b/dask_ec2/cli/daskd.py index d121210..8589a65 100644 --- a/dask_ec2/cli/daskd.py +++ b/dask_ec2/cli/daskd.py @@ -58,13 +58,15 @@ def dask(ctx, filepath, nprocs, source): def dask_install(ctx, filepath, shell, nprocs, source): cluster = Cluster.from_filepath(filepath) scheduler_public_ip = cluster.instances[0].ip - upload_pillar(cluster, "dask.sls", {"dask": { - "scheduler_public_ip": scheduler_public_ip, - "source_install": source, - "dask-worker": { - "nprocs": nprocs - } - }}) + upload_pillar(cluster, "dask.sls", { + "dask": { + "scheduler_public_ip": scheduler_public_ip, + "source_install": source, + "dask-worker": { + "nprocs": nprocs + } + } + }) click.echo("Installing scheduler") cluster.pepper.local("node-0", "grains.append", ["roles", "dask.distributed.scheduler"]) @@ -134,9 +136,7 @@ def dask_address(ctx, filepath): @dask.command( - "shell", - short_help= - "Open a python (ipython if available) shell connected to the dask.distributed cluster") + "shell", short_help="Open a python (ipython if available) shell connected to the dask.distributed cluster") @click.pass_context @click.option("--file", "filepath", @@ -147,13 +147,12 @@ def dask_address(ctx, filepath): help="Filepath to the instances metadata") def dask_shell(ctx, filepath): try: - import distributed + import distributed # noqa except: - click.echo("ERROR: `distributed` package not found, not starting the python shell", - err=True) + click.echo("ERROR: `distributed` package not found, not starting the python shell", err=True) sys.exit(1) try: - import IPython + import IPython # noqa shell = "ipython" except: shell = "python" @@ -169,10 +168,7 @@ def dask_shell(ctx, filepath): subprocess.call(cmd) -@dask.command( - "ui", - short_help= - "Open a web browser pointing to the Dask UI") +@dask.command("ui", short_help="Open a web browser pointing to the Dask UI") @click.pass_context @click.option("--file", "filepath", diff --git a/dask_ec2/cli/main.py b/dask_ec2/cli/main.py index 6497965..f550b14 100644 --- a/dask_ec2/cli/main.py +++ b/dask_ec2/cli/main.py @@ -5,12 +5,12 @@ from botocore.exceptions import ClientError -import dask_ec2 -from ..salt import Response from ..cluster import Cluster -from ..exceptions import DaskEc2Exception from ..config import setup_logging +from ..exceptions import DaskEc2Exception +from ..salt import Response from .utils import Table +import dask_ec2 def start(): @@ -28,8 +28,7 @@ def start(): click.echo("Unexpected EC2 error: %s" % e, err=True) sys.exit(1) except KeyboardInterrupt: - click.echo( - "Interrupted by Ctrl-C. One or more actions could be still running in the cluster") + click.echo("Interrupted by Ctrl-C. One or more actions could be still running in the cluster") sys.exit(1) except Exception as e: click.echo(traceback.format_exc(), err=True) @@ -53,7 +52,10 @@ def cli(ctx): required=True, type=click.Path(exists=True), help="Path to the keypair that matches the keyname") -@click.option("--name", required=False, default="dask-ec2-cluster", help="Tag name on EC2") +@click.option("--name", + required=False, + default="dask-ec2-cluster", + help="Tag name on EC2") @click.option("--region-name", default="us-east-1", show_default=True, @@ -139,19 +141,22 @@ def cli(ctx): help="Install Dask/Distributed from git master") def up(ctx, name, keyname, keypair, region_name, vpc_id, subnet_id, iaminstance_name, ami, username, instance_type, count, - security_group_name, security_group_id, volume_type, volume_size, filepath, _provision, anaconda_, - dask, notebook, nprocs, source): + security_group_name, security_group_id, volume_type, volume_size, + filepath, _provision, anaconda_, dask, notebook, nprocs, source): import os - import yaml from ..ec2 import EC2 if os.path.exists(filepath): - if not click.confirm("A file named {} already exists, proceding will overwrite this file. Continue?".format(filepath)): + msg = "A file named {} already exists, proceding will overwrite this file. Continue?".format(filepath) + if not click.confirm(msg): click.echo("Not doing anything") sys.exit(0) - driver = EC2(region=region_name, vpc_id=vpc_id, subnet_id=subnet_id, - default_vpc=not(vpc_id), default_subnet=not(subnet_id), + driver = EC2(region=region_name, + vpc_id=vpc_id, + subnet_id=subnet_id, + default_vpc=not (vpc_id), + default_subnet=not (subnet_id), iaminstance_name=iaminstance_name) click.echo("Launching nodes") instances = driver.launch(name=name, @@ -171,8 +176,8 @@ def up(ctx, name, keyname, keypair, region_name, vpc_id, subnet_id, cluster.to_file(filepath) if _provision: - ctx.invoke(provision, filepath=filepath, anaconda_=anaconda_, dask=dask, - notebook=notebook, nprocs=nprocs, source=source) + ctx.invoke(provision, filepath=filepath, anaconda_=anaconda_, + dask=dask, notebook=notebook, nprocs=nprocs, source=source) @cli.command(short_help="Destroy cluster") @@ -186,6 +191,7 @@ def up(ctx, name, keyname, keypair, region_name, vpc_id, subnet_id, required=False, help="Filepath to the instances metadata") @click.option('--yes', '-y', is_flag=True, default=False, help='Answers yes to questions') +@click.option("--region-name", default="us-east-1", show_default=True, required=False, help="AWS region") def destroy(ctx, filepath, yes): import os from ..ec2 import EC2 @@ -193,8 +199,8 @@ def destroy(ctx, filepath, yes): question = 'Are you sure you want to destroy the cluster?' if yes or click.confirm(question): - driver = EC2(region=cluster.region, default_vpc=False, default_subnet=False) - #needed if there is no default vpc or subnet + driver = EC2(region=cluster.region_name, default_vpc=False, default_subnet=False) + # needed if there is no default vpc or subnet ids = [i.uid for i in cluster.instances] click.echo("Terminating instances") driver.destroy(ids) @@ -313,17 +319,14 @@ def provision(ctx, filepath, ssh_check, master, minions, upload, anaconda_, dask upload_formulas(cluster) click.echo("Uploading conda and cluster settings") upload_pillar(cluster, "conda.sls", {"conda": {"pyversion": 2 if six.PY2 else 3}}) - upload_pillar(cluster, "cluster.sls", - {"cluster": { - "username": cluster.instances[0].username - } - }) + upload_pillar(cluster, "cluster.sls", {"cluster": {"username": cluster.instances[0].username}}) if anaconda_: ctx.invoke(anaconda, filepath=filepath) if dask: from .daskd import dask_install ctx.invoke(dask_install, filepath=filepath, nprocs=nprocs, source=source) if notebook: + from .notebook import notebook_install ctx.invoke(notebook_install, filepath=filepath) @@ -363,5 +366,5 @@ def print_state(output): return response -from .daskd import * -from .notebook import * +from .daskd import * # noqa +from .notebook import * # noqa diff --git a/dask_ec2/cli/notebook.py b/dask_ec2/cli/notebook.py index c6eaad6..076e275 100644 --- a/dask_ec2/cli/notebook.py +++ b/dask_ec2/cli/notebook.py @@ -6,6 +6,7 @@ from ..cluster import Cluster from ..salt import upload_pillar + @cli.group('notebook', invoke_without_command=True, short_help='Provision the Jupyter notebook') @click.pass_context @click.option("--file", @@ -30,21 +31,13 @@ def notebook(ctx, filepath): show_default=True, required=False, help="Filepath to the instances metadata") -@click.option("--password", - default="jupyter", - show_default=True, - required=False, - help="Password for Jupyter Notebook") +@click.option("--password", default="jupyter", show_default=True, required=False, help="Password for Jupyter Notebook") @click.pass_context def notebook_install(ctx, filepath, password): click.echo("Installing Jupyter notebook on the head node") cluster = Cluster.from_filepath(filepath) - upload_pillar(cluster, "jupyter.sls", - {"jupyter": { - "password": password - } - }) + upload_pillar(cluster, "jupyter.sls", {"jupyter": {"password": password}}) # only install on head node output = cluster.salt_call("node-0", "state.sls", ["jupyter.notebook"]) @@ -59,7 +52,6 @@ def notebook_install(ctx, filepath, password): (cluster.head.ip, password)) - @notebook.command("open", short_help="Open a web browser pointing to the Notebook UI") @click.pass_context @click.option("--file", diff --git a/dask_ec2/cli/utils.py b/dask_ec2/cli/utils.py index 4834e5a..31df411 100644 --- a/dask_ec2/cli/utils.py +++ b/dask_ec2/cli/utils.py @@ -13,7 +13,8 @@ def __init__(self, data, tabletype=0): ok = True if not ok: self.tabletype = 0 - #self.tabletype = tabletypes.index(tabletype) + # self.tabletype = tabletypes.index(tabletype) + def formatRowBorder(self, colLengths): s = "+" # TODO: Simplify? @@ -27,8 +28,9 @@ def formatRow(self, row, columns, colLengths): colsInRow = len(row) for n in range(columns): if n <= (colsInRow - 1): - s += "| " + row[n] + (" " * ((colLengths[n] - len(row[n])) + 1)) - else: # index out of bounds + s += "| " + row[n] + s += (" " * ((colLengths[n] - len(row[n])) + 1)) + else: # index out of bounds s += "|" + (" " * (colLengths[n] + 2)) s += "|" return s @@ -45,14 +47,14 @@ def write(self): for i, r in enumerate(data): for j, c in enumerate(r): data[i][j] = str(c) - #DEBUG: print data + # DEBUG: print data # Find max length in each column maxColLengths = [0] * columns for i, r in enumerate(data): for j, c in enumerate(r): if (len(str(c)) > maxColLengths[j]): maxColLengths[j] = len(str(c)) - #DEBUG: print maxColLengths + # DEBUG: print maxColLengths # Print rable if self.tabletype == 0: border = self.formatRowBorder(maxColLengths) diff --git a/dask_ec2/cluster.py b/dask_ec2/cluster.py index 4c0e674..905b611 100644 --- a/dask_ec2/cluster.py +++ b/dask_ec2/cluster.py @@ -2,14 +2,14 @@ import logging -import six import yaml from . import libpepper -from .compatibility import URLError from .exceptions import DaskEc2Exception from .instance import Instance +from six.moves.urllib.error import URLError + logger = logging.getLogger(__name__) @@ -53,9 +53,8 @@ def get_pepper_client(self): try: self._pepper = libpepper.Pepper(url, ignore_ssl_errors=True) self._pepper.login('saltdev', 'saltdev', 'pam') - except URLError as e: - raise DaskEc2Exception( - "Could not connect to salt server. Try `dask-ec2 provision` and try again") + except URLError: + raise DaskEc2Exception("Could not connect to salt server. Try `dask-ec2 provision` and try again") return self._pepper pepper = property(get_pepper_client, None, None) @@ -64,9 +63,8 @@ def salt_call(self, target, module, args=None): args = [] or args try: return self.pepper.local(target, module, args) - except URLError as e: - raise DaskEc2Exception( - "Could not connect to salt server. Try `dask-ec2 provision` and try again") + except URLError: + raise DaskEc2Exception("Could not connect to salt server. Try `dask-ec2 provision` and try again") def append(self, instance): if isinstance(instance, Instance): diff --git a/dask_ec2/compatibility.py b/dask_ec2/compatibility.py deleted file mode 100644 index c633606..0000000 --- a/dask_ec2/compatibility.py +++ /dev/null @@ -1,7 +0,0 @@ -import six - - -if six.PY2: - from urllib2 import URLError -else: - from urllib.error import URLError diff --git a/dask_ec2/ec2.py b/dask_ec2/ec2.py index 397a285..4945d89 100644 --- a/dask_ec2/ec2.py +++ b/dask_ec2/ec2.py @@ -25,8 +25,9 @@ def __init__(self, region, vpc_id=None, subnet_id=None, default_vpc=True, self.iaminstance_name = iaminstance_name if test: - collection = self.ec2.instances.filter(Filters=[{"Name": "instance-state-name", "Values": ["running"]}]) - _ = list(collection) + filters = [{"Name": "instance-state-name", "Values": ["running"]}] + collection = self.ec2.instances.filter(Filters=filters) + list(collection) def get_default_vpc(self): """ @@ -62,6 +63,8 @@ def get_default_subnet(self, availability_zone=None): if subnet.availability_zone == availability_zone and subnet.default_for_az: logger.debug("Default subnet found - Using Subnet ID: %s", subnet.id) return subnet.id + if not self.vpc_id: + raise DaskEc2Exception("There is no VPC, please pass VPC ID or assign a default VPC") raise DaskEc2Exception("There is no default subnet on VPC %s, please pass a subnet ID" % self.vpc_id) def check_keyname(self, keyname): @@ -74,11 +77,12 @@ def check_keyname(self, keyname): logger.debug("Checking that keyname '%s' exists on EC2", keyname) try: key_pair = self.client.describe_key_pairs(KeyNames=[keyname]) - _ = [i for i in key_pair] + [i for i in key_pair] except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "InvalidKeyPair.NotFound": - raise DaskEc2Exception("The keyname '%s' does not exist, please create it in the EC2 console" % keyname) + raise DaskEc2Exception("The keyname '%s' does not exist, " + "please create it in the EC2 console" % keyname) else: raise e @@ -119,7 +123,7 @@ def check_sg(self, security_group): else: raise DaskEc2Exception("Security group '%s' not found, please create or use the default '%s'" % (security_group, DEFAULT_SG_GROUP_NAME)) - except ClientError as e: + except ClientError: raise DaskEc2Exception("Security group '%s' not found, please create or use the default '%s'" % (security_group, DEFAULT_SG_GROUP_NAME)) @@ -146,36 +150,34 @@ def create_default_sg(self): logger.debug("Setting up default values for the '%s' security group", DEFAULT_SG_GROUP_NAME) security_group = self.get_security_groups(DEFAULT_SG_GROUP_NAME)[0] - IpPermissions = [ - { - "IpProtocol": "tcp", - "FromPort": 0, - "ToPort": 65535, - "IpRanges": [ - { - "CidrIp": "0.0.0.0/0" - }, - ], - }, { - "IpProtocol": "udp", - "FromPort": 0, - "ToPort": 65535, - "IpRanges": [ - { - "CidrIp": "0.0.0.0/0" - }, - ], - }, { - "IpProtocol": "icmp", - "FromPort": -1, - "ToPort": -1, - "IpRanges": [ - { - "CidrIp": "0.0.0.0/0" - }, - ], - } - ] + IpPermissions = [{ + "IpProtocol": "tcp", + "FromPort": 0, + "ToPort": 65535, + "IpRanges": [ + { + "CidrIp": "0.0.0.0/0" + }, + ], + }, { + "IpProtocol": "udp", + "FromPort": 0, + "ToPort": 65535, + "IpRanges": [ + { + "CidrIp": "0.0.0.0/0" + }, + ], + }, { + "IpProtocol": "icmp", + "FromPort": -1, + "ToPort": -1, + "IpRanges": [ + { + "CidrIp": "0.0.0.0/0" + }, + ], + }] try: security_group.authorize_egress(IpPermissions=IpPermissions) @@ -205,8 +207,8 @@ def check_image_is_ebs(self, image_id): root_type = image["RootDeviceType"] if root_type != "ebs": - raise DaskEc2Exception("The AMI {} Root Device Type is not EBS. Only EBS Root Device AMI are supported.".format( - image_id)) + raise DaskEc2Exception("The AMI {} Root Device Type is not EBS. " + "Only EBS Root Device AMI are supported.".format(image_id)) def launch(self, name, image_id, instance_type, count, keyname, security_group_name=DEFAULT_SG_GROUP_NAME, @@ -233,7 +235,7 @@ def launch(self, name, image_id, instance_type, count, keyname, }, ] - if security_group_id is not None: + if security_group_id: security_groups_ids = [security_group_id] else: security_groups_ids = self.get_security_groups_ids(security_group_name) @@ -244,7 +246,7 @@ def launch(self, name, image_id, instance_type, count, keyname, MinCount=count, MaxCount=count, InstanceType=instance_type, - SecurityGroupIds=self.get_security_groups_ids(security_group_name), + SecurityGroupIds=security_groups_ids, BlockDeviceMappings=device_map) if self.subnet_id is not None and self.subnet_id != "": kwargs['SubnetId'] = self.subnet_id @@ -259,8 +261,8 @@ def launch(self, name, image_id, instance_type, count, keyname, try: waiter.wait(InstanceIds=ids) except WaiterError: - raise DaskEc2Exception( - "An unexpected error occurred when launching the requested instances. Refer to the AWS Management Console for more information.") + raise DaskEc2Exception("An unexpected error occurred when launching the requested instances. " + "Refer to the AWS Management Console for more information.") collection = self.ec2.instances.filter(InstanceIds=ids) instances = [] diff --git a/dask_ec2/instance.py b/dask_ec2/instance.py index 67a8395..7d06bf3 100644 --- a/dask_ec2/instance.py +++ b/dask_ec2/instance.py @@ -1,10 +1,8 @@ from __future__ import print_function, division, absolute_import -import os import socket import logging -import paramiko from paramiko.ssh_exception import BadHostKeyException, AuthenticationException, SSHException from .ssh import SSHClient @@ -28,7 +26,8 @@ def from_boto3_instance(cls, instance): self = cls(ip=instance.public_ip_address, uid=instance.id) return self - @retry(catch=(BadHostKeyException, AuthenticationException, SSHException, socket.error, TypeError, DaskEc2Exception)) + @retry(catch=(BadHostKeyException, AuthenticationException, SSHException, socket.error, TypeError, + DaskEc2Exception)) def check_ssh(self): logger.debug('Checking ssh connection for %s', self.ip) self.ssh_client.exec_command("ls") diff --git a/dask_ec2/libpepper.py b/dask_ec2/libpepper.py index d070aca..2a23811 100644 --- a/dask_ec2/libpepper.py +++ b/dask_ec2/libpepper.py @@ -4,10 +4,8 @@ (Specifically the rest_cherrypy netapi module.) ''' -import functools import json import logging -import os import ssl try: ssl._create_default_https_context = ssl._create_stdlib_context @@ -75,7 +73,7 @@ def __init__(self, api_url='https://localhost:8000', debug_http=False, ignore_ss ''' split = urlparse.urlsplit(api_url) - if not split.scheme in ['http', 'https']: + if split.scheme not in ['http', 'https']: raise PepperException("salt-api URL missing HTTP(s) protocol: {0}".format(self.api_url)) self.api_url = api_url @@ -93,8 +91,7 @@ def req(self, path, data=None): :rtype: dictionary ''' - if (hasattr(data, 'get') and - data.get('eauth') == 'kerberos') or self.auth.get('eauth') == 'kerberos': + if (hasattr(data, 'get') and data.get('eauth') == 'kerberos') or self.auth.get('eauth') == 'kerberos': return self.req_requests(path, data) headers = { @@ -128,8 +125,8 @@ def req(self, path, data=None): try: if not (self._ssl_verify): con = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - #con.check_hostname = False - #con.verify_mode = ssl.CERT_NONE + # con.check_hostname = False + # con.verify_mode = ssl.CERT_NONE f = urlopen(req, context=con) else: f = urlopen(req) @@ -175,11 +172,13 @@ def req_requests(self, path, data=None): headers.setdefault('X-Auth-Token', self.auth['token']) # Optionally toggle SSL verification self._ssl_verify = self.ignore_ssl_errors - params = {'url': self._construct_url(path), - 'headers': headers, - 'verify': self._ssl_verify == True, - 'auth': auth, - 'data': json.dumps(data),} + params = { + 'url': self._construct_url(path), + 'headers': headers, + 'verify': self._ssl_verify, + 'auth': auth, + 'data': json.dumps(data) + } logger.debug('postdata {0}'.format(params)) resp = requests.post(**params) if resp.status_code == 401: @@ -206,7 +205,7 @@ def local(self, tgt, fun, arg=None, kwarg=None, expr_form='glob', timeout=None, Wraps :meth:`low`. ''' - low = {'client': 'local', 'tgt': tgt, 'fun': fun,} + low = {'client': 'local', 'tgt': tgt, 'fun': fun} if arg: low['arg'] = arg @@ -231,7 +230,7 @@ def local_async(self, tgt, fun, arg=None, kwarg=None, expr_form='glob', timeout= Wraps :meth:`low`. ''' - low = {'client': 'local_async', 'tgt': tgt, 'fun': fun,} + low = {'client': 'local_async', 'tgt': tgt, 'fun': fun} if arg: low['arg'] = arg @@ -266,7 +265,7 @@ def runner(self, fun, **kwargs): Usage:: runner('jobs.lookup_jid', jid=12345) ''' - low = {'client': 'runner', 'fun': fun,} + low = {'client': 'runner', 'fun': fun} low.update(kwargs) @@ -278,11 +277,9 @@ def login(self, username, password, eauth): authentication token or an empty dict ''' - self.auth = self.req('/login', { - 'username': username, - 'password': password, - 'eauth': eauth - }).get('return', [{}])[0] + self.auth = self.req('/login', {'username': username, + 'password': password, + 'eauth': eauth}).get('return', [{}])[0] return self.auth diff --git a/dask_ec2/salt.py b/dask_ec2/salt.py index a31b2ba..e3f9290 100644 --- a/dask_ec2/salt.py +++ b/dask_ec2/salt.py @@ -1,6 +1,7 @@ """ Utilies to manage salt bootstrap and other stuff """ +import copy import os import logging import itertools @@ -87,14 +88,14 @@ def aggregated_success(self): def group_by_id(self, ignore_fields=None, sort=True): if ignore_fields: - copy = copy_(self) - for id_ in copy: + clone = copy.deepcopy(self) + for id_ in clone: for field in ignore_fields: - del copy[id_][field] + del clone[id_][field] else: - copy = self + clone = self - items = sorted(copy.items(), key=lambda x: x[1]) + items = sorted(clone.items(), key=lambda x: x[1]) groups = [] for key, group in itertools.groupby(items, key=lambda x: x[1]): groups.append((key, [item[0] for item in group])) @@ -133,8 +134,7 @@ def __install_salt_api(): try: __install_salt_api() except RetriesExceededException as e: - raise DaskEc2Exception("%s\nCouldn't bootstrap salt-api. Error is above (maybe try again)" % - e.last_exception) + raise DaskEc2Exception("%s\nCouldn't bootstrap salt-api. Error is above (maybe try again)" % e.last_exception) @retry(retries=3, wait=0) def __setup_salt_master(): @@ -145,9 +145,8 @@ def __setup_salt_master(): try: __setup_salt_master() except RetriesExceededException as e: - raise DaskEc2Exception( - "%s\nCouldn't setup salt-master settings. Error is above (maybe try again)" % - e.last_exception) + raise DaskEc2Exception("%s\nCouldn't setup salt-master settings. Error is above (maybe try again)" % + e.last_exception) @retry(retries=3, wait=0) def __apt_installs(): @@ -172,9 +171,7 @@ def __upgrade_pip(): try: __upgrade_pip() except RetriesExceededException as e: - raise DaskEc2Exception( - "%s\nCouldn't upgrade pip. Error is above (maybe try again)" % - e.last_exception) + raise DaskEc2Exception("%s\nCouldn't upgrade pip. Error is above (maybe try again)" % e.last_exception) @retry(retries=3, wait=0) def __install_salt_rest_api(): @@ -186,8 +183,7 @@ def __install_salt_rest_api(): try: __install_salt_rest_api() except RetriesExceededException as e: - raise DaskEc2Exception("%s\nCouldn't install CherryPy. Error is above (maybe try again)" % - e.last_exception) + raise DaskEc2Exception("%s\nCouldn't install CherryPy. Error is above (maybe try again)" % e.last_exception) @retry(retries=3, wait=0) def __install_pyopensll(): @@ -199,8 +195,7 @@ def __install_pyopensll(): try: __install_pyopensll() except RetriesExceededException as e: - raise DaskEc2Exception("%s\nCouldn't install PyOpenSSL. Error is above (maybe try again)" % - e.last_exception) + raise DaskEc2Exception("%s\nCouldn't install PyOpenSSL. Error is above (maybe try again)" % e.last_exception) @retry(retries=3, wait=0) def __create_ssl_cert(): @@ -212,9 +207,8 @@ def __create_ssl_cert(): try: __create_ssl_cert() except RetriesExceededException as e: - raise DaskEc2Exception( - "%s\nCouldn't generate SSL certificate. Error is above (maybe try again)" % - e.last_exception) + raise DaskEc2Exception("%s\nCouldn't generate SSL certificate. Error is above (maybe try again)" % + e.last_exception) @retry(retries=3, wait=0) def __setup_rest_cherrypy(): @@ -237,9 +231,8 @@ def __setup_salt_external_auth(): try: __setup_salt_external_auth() except RetriesExceededException as e: - raise DaskEc2Exception( - "%s\nCouldn't setup salt external auth system. Error is above (maybe try again)" % - e.last_exception) + raise DaskEc2Exception("%s\nCouldn't setup salt external auth system. Error is above (maybe try again)" % + e.last_exception) @retry(retries=3, wait=0) def __create_saltdev_user(): @@ -264,9 +257,8 @@ def __restart_salt_master(): try: __restart_salt_master() except RetriesExceededException as e: - raise DaskEc2Exception( - "%s\nCouldn't restart salt-master service. Error is above (maybe try again)" % - e.last_exception) + raise DaskEc2Exception("%s\nCouldn't restart salt-master service. Error is above (maybe try again)" % + e.last_exception) @retry(retries=3, wait=0) def __restart_salt_api(): @@ -278,9 +270,8 @@ def __restart_salt_api(): try: __restart_salt_api() except RetriesExceededException as e: - raise DaskEc2Exception( - "%s\nCouldn't restart salt-api service. Error is above (maybe try again)" % - e.last_exception) + raise DaskEc2Exception("%s\nCouldn't restart salt-api service. Error is above (maybe try again)" % + e.last_exception) def async_cmd(results, instance, command): @@ -295,7 +286,7 @@ def __remote_cmd(): try: results[instance.ip] = __remote_cmd() - except RetriesExceededException as e: + except RetriesExceededException: results[instance.ip] = False @@ -309,7 +300,7 @@ def __remote_upload(): try: results[instance.ip] = __remote_upload() - except RetriesExceededException as e: + except RetriesExceededException: results[instance.ip] = False @@ -323,23 +314,21 @@ def install_salt_minion(cluster): for i, instance in enumerate(cluster.instances): minion_id = "node-{}".format(i) cmd = "curl -L https://bootstrap.saltstack.com | sh -s -- " - cmd += "-d -X -P -L -A {master_ip} -i {minion_id} stable".format(master_ip=master_ip, - minion_id=minion_id) + cmd += "-d -X -P -L -A {master_ip} -i {minion_id} stable".format(master_ip=master_ip, minion_id=minion_id) t = threading.Thread(target=async_cmd, args=(results, instance, cmd)) t.start() threads.append(t) for t in threads: t.join() - all_ok = all([r == False for r in results]) + all_ok = all([r is False for r in results]) if not all_ok: failed_nodes = [] for minion_ip, minion_data in results.items(): if minion_data is False: failed_nodes.append(minion_ip) if failed_nodes: - raise DaskEc2Exception("Error bootstraping salt-minion at nodes: %s (maybe try again)" % - failed_nodes) + raise DaskEc2Exception("Error bootstraping salt-minion at nodes: %s (maybe try again)" % failed_nodes) logger.debug("Configuring salt-mine on the salt minions") results, threads = {}, [] @@ -359,9 +348,8 @@ def install_salt_minion(cluster): if minion_data is False: failed_nodes.append(minion_ip) if failed_nodes: - raise DaskEc2Exception( - "Error configuring the salt-mine in the salt-minion at nodes: %s (maybe try again)" - % failed_nodes) + raise DaskEc2Exception("Error configuring the salt-mine in the salt-minion at nodes: %s (maybe try again)" % + failed_nodes) logger.debug("Restarting the salt-minion service") results, threads = {}, [] @@ -380,8 +368,7 @@ def install_salt_minion(cluster): if minion_data is False: failed_nodes.append(minion_ip) if failed_nodes: - raise DaskEc2Exception("Error restarting the salt-minion at nodes: %s (maybe try again)" % - failed_nodes) + raise DaskEc2Exception("Error restarting the salt-minion at nodes: %s (maybe try again)" % failed_nodes) def upload_formulas(cluster): diff --git a/dask_ec2/ssh.py b/dask_ec2/ssh.py index 82a6c18..b93415a 100644 --- a/dask_ec2/ssh.py +++ b/dask_ec2/ssh.py @@ -82,9 +82,11 @@ def exec_command(self, command, sudo=False, **kwargs): while not (channel.recv_ready() or channel.closed or channel.exit_status_ready()): time.sleep(.2) - ret = {'stdout': stdout.read().strip().decode('utf-8'), - 'stderr': stderr.read().strip().decode('utf-8'), - 'exit_code': channel.recv_exit_status()} + ret = { + 'stdout': stdout.read().strip().decode('utf-8'), + 'stderr': stderr.read().strip().decode('utf-8'), + 'exit_code': channel.recv_exit_status() + } return ret def get_sftp(self): @@ -109,17 +111,17 @@ def mkdir(self, path, mode=511): dirname, basename = posixpath.split(path) if self.dir_exists(dirname): logger.debug("Creating directory %s mode=%s", path, mode) - self.sftp.mkdir(basename, mode=mode) # sub-directory missing, so created it + self.sftp.mkdir(basename, mode=mode) # sub-directory missing, so created it self.sftp.chdir(basename) else: - self.mkdir(dirname) # Make parent directories + self.mkdir(dirname) # Make parent directories self.mkdir(path) def dir_exists(self, path): try: self.sftp.chdir(path) return True - except IOError as error: + except IOError: return False def put(self, local, remote, sudo=False): @@ -143,9 +145,9 @@ def put_single(self, local, remote, sudo=False): if sudo: cmd = 'cp -rf {} {}'.format(remote, real_remote) - output = self.exec_command(cmd, sudo=True) + self.exec_command(cmd, sudo=True) cmd = 'rm -rf {}'.format(remote) - output = self.exec_command(cmd, sudo=True) + self.exec_command(cmd, sudo=True) def put_dir(self, local, remote, sudo=False): logger.debug("Uploading directory %s to %s", local, remote) @@ -166,6 +168,6 @@ def put_dir(self, local, remote, sudo=False): if sudo: cmd = 'cp -rf {}/* {}'.format(remote, real_remote) - output = self.exec_command(cmd, sudo=True) + self.exec_command(cmd, sudo=True) cmd = 'rm -rf {}'.format(remote) - output = self.exec_command(cmd, sudo=True) + self.exec_command(cmd, sudo=True) diff --git a/dask_ec2/tests/conftest.py b/dask_ec2/tests/conftest.py new file mode 100644 index 0000000..1f2bd4b --- /dev/null +++ b/dask_ec2/tests/conftest.py @@ -0,0 +1,19 @@ +from __future__ import absolute_import + +import os +import pytest + + +@pytest.yield_fixture(scope="module") +def driver(): + from dask_ec2.ec2 import EC2 + driver = EC2(region="us-east-1", default_vpc=False, default_subnet=False, test=False) + + yield driver + + +@pytest.yield_fixture(scope='module') +def cluster(): + from dask_ec2 import Cluster + clusterfile = os.environ['TEST_CLUSTERFILE'] + yield Cluster.from_filepath(clusterfile) diff --git a/dask_ec2/tests/salt/test_distributed.py b/dask_ec2/tests/salt/test_distributed.py index 96b8c3c..94b0356 100644 --- a/dask_ec2/tests/salt/test_distributed.py +++ b/dask_ec2/tests/salt/test_distributed.py @@ -2,10 +2,9 @@ import pytest -requests = pytest.importorskip("distributed") - -from utils import remotetest, cluster, invoke, assert_all_true +from ..utils import remotetest, assert_all_true +requests = pytest.importorskip("distributed") # def setup_module(module): # utils.invoke('dask-distributed', 'install') @@ -13,5 +12,6 @@ @remotetest def test_dask(cluster): - response = cluster.salt_call("*", "state.sls", ["dask.distributed"])["return"][0] + output = cluster.salt_call("*", "state.sls", ["dask.distributed"]) + response = output["return"][0] assert_all_true(response) diff --git a/dask_ec2/tests/salt/test_salt.py b/dask_ec2/tests/salt/test_salt.py index 1fac8f4..f7f57f1 100644 --- a/dask_ec2/tests/salt/test_salt.py +++ b/dask_ec2/tests/salt/test_salt.py @@ -1,8 +1,6 @@ from __future__ import absolute_import, print_function, division -import pytest - -from ..utils import remotetest, cluster, invoke +from ..utils import remotetest, invoke @remotetest @@ -20,6 +18,7 @@ def test_provision_salt(cluster): print(result.output_bytes) assert result.exit_code == 0 + @remotetest def test_salt_ping(cluster): response = cluster.salt_call("*", "test.ping") diff --git a/dask_ec2/tests/test_cluster.py b/dask_ec2/tests/test_cluster.py index ac993da..b0d2172 100644 --- a/dask_ec2/tests/test_cluster.py +++ b/dask_ec2/tests/test_cluster.py @@ -6,7 +6,7 @@ from dask_ec2 import Cluster, Instance from dask_ec2.exceptions import DaskEc2Exception -from .utils import remotetest, cluster, driver +from .utils import remotetest def test_cluster(): @@ -27,7 +27,7 @@ def test_append_instance(): def test_append_non_instance_type(): cluster = Cluster("foo") - with pytest.raises(DaskEc2Exception) as excinfo: + with pytest.raises(DaskEc2Exception): cluster.append({"wrong": "type"}) @@ -64,7 +64,7 @@ def test_set_keypair(): def test_dict_serde(): cluster = Cluster("foo") username = "user" - keypair="~/.ssh/key" + keypair = "~/.ssh/key" n = 5 for i in range(n): instance = Instance(uid="%i" % i, ip="{0}.{0}.{0}.{0}".format(i), username=username, keypair=keypair) @@ -89,7 +89,7 @@ def test_from_filepath(request, tmpdir): cluster = Cluster("foo") username = "user" - keypair="~/.ssh/key" + keypair = "~/.ssh/key" n = 5 for i in range(n): instance = Instance(uid="%i" % i, ip="{0}.{0}.{0}.{0}".format(i), username=username, keypair=keypair) @@ -142,4 +142,4 @@ def test_from_boto3(driver): keypair=keypair, check_ami=False) - instance = Cluster.from_boto3_instances(region, instances) + Cluster.from_boto3_instances(region, instances) diff --git a/dask_ec2/tests/test_ec2.py b/dask_ec2/tests/test_ec2.py index 822c206..a19cff4 100644 --- a/dask_ec2/tests/test_ec2.py +++ b/dask_ec2/tests/test_ec2.py @@ -6,7 +6,6 @@ from dask_ec2.ec2 import DEFAULT_SG_GROUP_NAME from dask_ec2.exceptions import DaskEc2Exception -from .utils import driver # Some default values name = "test_launch" @@ -14,12 +13,28 @@ instance_type = "m3.2xlarge" count = 3 keyname = "mykey" -keypair = None # Skip check +keypair = None # Skip check volume_type = "gp2" volume_size = 500 security_group = "another-sg" +@mock_ec2 +def test_get_default_vpc(driver): + with pytest.raises(DaskEc2Exception) as e: + driver.get_default_vpc() + + assert "There is no default VPC, please pass VPC ID" == str(e.value) + + +@mock_ec2 +def test_get_default_subnet(driver): + with pytest.raises(DaskEc2Exception) as e: + driver.get_default_subnet() + + assert "There is no VPC, please pass VPC ID or assign a default VPC" == str(e.value) + + @mock_ec2 def test_launch_no_keyname(driver): with pytest.raises(DaskEc2Exception) as e: @@ -33,7 +48,9 @@ def test_launch_no_keyname(driver): volume_size=volume_size, keypair=keypair, check_ami=False) - assert "The keyname 'mykey' does not exist, please create it in the EC2 console" == str(e.value) + + assert ("The keyname 'mykey' does not exist, " + "please create it in the EC2 console") == str(e.value) collection = driver.ec2.instances.filter() instances = [i for i in collection] @@ -59,32 +76,36 @@ def test_launch_no_keyname(driver): @mock_ec2 def test_create_default_security_group(driver): - collection = driver.ec2.security_groups.filter() + security_groups = driver.ec2.security_groups + + collection = security_groups.filter() sgs = [i for i in collection] assert len(sgs) == 1 created_sg = driver.create_default_sg() - collection = driver.ec2.security_groups.filter() + collection = security_groups.filter() sgs = [i for i in collection] assert len(sgs) == 2 - collection = driver.ec2.security_groups.filter(GroupNames=[DEFAULT_SG_GROUP_NAME]) + collection = security_groups.filter(GroupNames=[DEFAULT_SG_GROUP_NAME]) sgs = [i for i in collection] assert len(sgs) == 1 default_sg = driver.ec2.SecurityGroup(created_sg.id) assert len(default_sg.ip_permissions) == 3 - assert default_sg.ip_permissions[0]['FromPort'] == 0 - assert default_sg.ip_permissions[0]['ToPort'] == 65535 - assert default_sg.ip_permissions[0]['IpProtocol'] == 'tcp' - assert default_sg.ip_permissions[0]['IpRanges'] == [{'CidrIp': '0.0.0.0/0'}] + ip_permission = default_sg.ip_permissions[0] + assert ip_permission['FromPort'] == 0 + assert ip_permission['ToPort'] == 65535 + assert ip_permission['IpProtocol'] == 'tcp' + assert ip_permission['IpRanges'] == [{'CidrIp': '0.0.0.0/0'}] assert len(default_sg.ip_permissions_egress) == 4 - assert default_sg.ip_permissions_egress[1]['FromPort'] == 0 - assert default_sg.ip_permissions_egress[1]['ToPort'] == 65535 - assert default_sg.ip_permissions_egress[1]['IpProtocol'] == 'tcp' - assert default_sg.ip_permissions_egress[1]['IpRanges'] == [{'CidrIp': '0.0.0.0/0'}] + ip_permission_egress = default_sg.ip_permissions_egress[1] + assert ip_permission_egress['FromPort'] == 0 + assert ip_permission_egress['ToPort'] == 65535 + assert ip_permission_egress['IpProtocol'] == 'tcp' + assert ip_permission_egress['IpRanges'] == [{'CidrIp': '0.0.0.0/0'}] @mock_ec2 @@ -98,5 +119,7 @@ def test_check_sg(driver): with pytest.raises(DaskEc2Exception) as e: driver.check_sg("ANOTHER_FAKE_SG") - assert "Security group 'ANOTHER_FAKE_SG' not found, please create or use the default 'dask-ec2-default'" == str( - e.value) + + assert ("Security group 'ANOTHER_FAKE_SG' not found, " + "please create or use the default " + "'dask-ec2-default'") == str(e.value) diff --git a/dask_ec2/tests/test_instance.py b/dask_ec2/tests/test_instance.py index 1d14bd7..c78dec2 100644 --- a/dask_ec2/tests/test_instance.py +++ b/dask_ec2/tests/test_instance.py @@ -1,12 +1,9 @@ from __future__ import absolute_import, print_function, division -import pytest - from moto import mock_ec2 from dask_ec2 import Instance -from dask_ec2.exceptions import DaskEc2Exception -from .utils import remotetest, cluster, driver +from .utils import remotetest def test_instance(): @@ -38,7 +35,7 @@ def test_dict_serde(): @remotetest def test_check_ssh(cluster): head = cluster.head - assert head.check_ssh() == True + assert head.check_ssh() @mock_ec2 @@ -51,7 +48,6 @@ def test_from_boto3(driver): keypair = None # Skip check volume_type = "gp2" volume_size = 500 - security_group = "another-sg" driver.ec2.create_key_pair(KeyName=keyname) instances = driver.launch(name=name, @@ -65,4 +61,4 @@ def test_from_boto3(driver): keypair=keypair, check_ami=False) - instance = Instance.from_boto3_instance(instances[0]) + Instance.from_boto3_instance(instances[0]) diff --git a/dask_ec2/tests/test_ssh.py b/dask_ec2/tests/test_ssh.py index 1b6af49..e6f5f4e 100644 --- a/dask_ec2/tests/test_ssh.py +++ b/dask_ec2/tests/test_ssh.py @@ -4,7 +4,7 @@ from dask_ec2.ssh import SSHClient from dask_ec2.exceptions import DaskEc2Exception -from .utils import remotetest, cluster +from .utils import remotetest @remotetest @@ -12,33 +12,36 @@ def test_ssh_ok_pkey_obj(cluster): import os import paramiko instance = cluster.head - pkey = paramiko.RSAKey.from_private_key_file(os.path.expanduser(instance.keypair)) - client = SSHClient(host=instance.ip, username=instance.username, port=instance.port, password=None, pkey=pkey) + keypath = os.path.expanduser(instance.keypair) + pkey = paramiko.RSAKey.from_private_key_file(keypath) + SSHClient(host=instance.ip, username=instance.username, port=instance.port, password=None, pkey=pkey) @remotetest def test_wrong_pkey_type(cluster): instance = cluster.head pkey = {"wrong": "obj"} - with pytest.raises(DaskEc2Exception) as excinfo: - client = SSHClient(host=instance.ip, username=instance.username, port=instance.port, password=None, pkey=pkey) + with pytest.raises(DaskEc2Exception): + SSHClient(host=instance.ip, username=instance.username, port=instance.port, password=None, pkey=pkey) @remotetest def test_ssh_ok_password(cluster): + # NOTE: password is a little bit hardcoded to docker setup instance = cluster.head - password = "root" # NOTE: this is a little bit hardcoded to docker setup + password = "root" client = SSHClient(host=instance.ip, username=instance.username, port=instance.port, password=password, pkey=None) - reponse = client.exec_command("ls") + client.exec_command("ls") client.close() @remotetest def test_ssh_fail_password(cluster): + # NOTE: password is a little bit hardcoded to docker setup instance = cluster.head - password = "root_not" # NOTE: this is a little bit hardcoded to docker setup + password = "root_not" with pytest.raises(DaskEc2Exception) as excinfo: - client = SSHClient(host=instance.ip, username=instance.username, port=instance.port, password=password, pkey=None) + SSHClient(host=instance.ip, username=instance.username, port=instance.port, password=password, pkey=None) assert "Authentication Error" in str(excinfo.value) @@ -56,7 +59,7 @@ def test_ssh_fail_user(cluster): def test_ssh_fail_host(cluster): client = cluster.head.ssh_client client.host = "1.1.1.1" - client.timeout = 3 # so test runs faster + client.timeout = 3 # so test runs faster with pytest.raises(DaskEc2Exception) as excinfo: client.connect() assert "Error connecting to host" in str(excinfo.value) @@ -97,6 +100,7 @@ def fin(): response = client.exec_command("rm -rf /{}".format(testname), sudo=True) assert response["exit_code"] == 0 client.close() + request.addfinalizer(fin) @@ -110,11 +114,11 @@ def test_mkdir(cluster, request): dir2 = posixpath.join(dir1, "dir2") dir3 = posixpath.join(dir2, "dir3") client.mkdir(dir3) - assert client.dir_exists(dir1) == True - assert client.dir_exists(dir2) == True - assert client.dir_exists(dir3) == True + assert client.dir_exists(dir1) + assert client.dir_exists(dir2) + assert client.dir_exists(dir3) - assert client.dir_exists("test -d /FAKEFAKE") == False + assert not client.dir_exists("test -d /FAKEFAKE") assert client.exec_command("test -d /FAKEFAKE")["exit_code"] == 1 assert client.exec_command("test -d {}".format(dir1))["exit_code"] == 0 assert client.exec_command("test -d {}".format(dir2))["exit_code"] == 0 @@ -129,6 +133,7 @@ def fin(): response = client.exec_command("rm -rf /tmp/{}".format(testname), sudo=True) assert response["exit_code"] == 0 client.close() + request.addfinalizer(fin) @@ -169,16 +174,26 @@ def test_put_dir(cluster, tmpdir, request): local = d1.strpath remote = "/tmp/{}".format(testname) client.put(local, remote, sudo=True) - assert client.dir_exists(posixpath.join(remote)) == True - assert client.dir_exists(posixpath.join(remote, "subdir")) == True - assert client.dir_exists(posixpath.join(remote, "subdir", "subsubdir")) == True - assert client.exec_command("test -e {}".format(posixpath.join(remote)))["exit_code"] == 0 - assert client.exec_command("cat {}".format(posixpath.join(remote, "upload1.txt")))["stdout"] == "content1" - assert client.exec_command("cat {}".format(posixpath.join(remote, "subdir", "upload2.txt")))["stdout"] == "content2" - assert client.exec_command("cat {}".format(posixpath.join(remote, "subdir", "subsubdir", "upload3.txt")))["stdout"] == "content3" + + assert client.dir_exists(posixpath.join(remote)) + assert client.dir_exists(posixpath.join(remote, "subdir")) + assert client.dir_exists(posixpath.join(remote, "subdir", "subsubdir")) + + path = posixpath.join(remote) + assert client.exec_command("test -e {}".format(path))["exit_code"] == 0 + + path = posixpath.join(remote, "upload1.txt") + assert client.exec_command("cat {}".format(path))["stdout"] == "content1" + + path = posixpath.join(remote, "subdir", "upload2.txt") + assert client.exec_command("cat {}".format(path))["stdout"] == "content2" + + path = posixpath.join(remote, "subdir", "subsubdir", "upload3.txt") + assert client.exec_command("cat {}".format(path))["stdout"] == "content3" def fin(): response = client.exec_command("rm -rf /tmp/{}".format(testname), sudo=True) assert response["exit_code"] == 0 client.close() + request.addfinalizer(fin) diff --git a/dask_ec2/tests/test_utils_retry.py b/dask_ec2/tests/test_utils_retry.py index 41d684b..e696dbd 100644 --- a/dask_ec2/tests/test_utils_retry.py +++ b/dask_ec2/tests/test_utils_retry.py @@ -41,7 +41,9 @@ def catch_NotImplementedException_raises_Exception(): def test_ok(): assert ok() == 35 - assert ok_with_args('pew', 123, kw='args') == (('pew', 123), {'kw': 'args'}) + + expected = (('pew', 123), {'kw': 'args'}) + assert ok_with_args('pew', 123, kw='args') == expected def test_fails_after_retries(): diff --git a/dask_ec2/tests/utils.py b/dask_ec2/tests/utils.py index bb7a99b..e31e70f 100644 --- a/dask_ec2/tests/utils.py +++ b/dask_ec2/tests/utils.py @@ -1,25 +1,14 @@ from __future__ import absolute_import, print_function, division import os -import sys import pytest -import boto3 from click.testing import CliRunner from dask_ec2.cli.main import cli - -@pytest.yield_fixture(scope="module") -def driver(): - from dask_ec2.ec2 import EC2 - driver = EC2(region="us-east-1", default_vpc=False, default_subnet=False, test=False) - - yield driver - - -remotetest = pytest.mark.skipif('TEST_CLUSTERFILE' not in os.environ, - reason="Environment variable 'TEST_CLUSTERFILE' is required") +remotetest = pytest.mark.skipif( + 'TEST_CLUSTERFILE' not in os.environ, reason="Environment variable 'TEST_CLUSTERFILE' is required") def invoke(*args): @@ -27,14 +16,7 @@ def invoke(*args): args = list(args) args.extend(['--file', clusterfile]) runner = CliRunner() - return runner.invoke(cli, args, catch_exceptions=False, input=sys.stdin) - - -@pytest.yield_fixture(scope='module') -def cluster(): - from dask_ec2 import Cluster - clusterfile = os.environ['TEST_CLUSTERFILE'] - yield Cluster.from_filepath(clusterfile) + return runner.invoke(cli, args, catch_exceptions=False, input=os.devnull) def assert_all_true(salt_output, none_is_ok=False): diff --git a/dask_ec2/utils.py b/dask_ec2/utils.py index 4e1249e..4b23a0f 100644 --- a/dask_ec2/utils.py +++ b/dask_ec2/utils.py @@ -11,19 +11,18 @@ def retry(retries=10, wait=5, catch=None): Decorator to retry on exceptions raised """ catch = catch or (Exception,) - last_exception = None def real_retry(function): def wrapper(*args, **kwargs): + last_exception = None for attempt in range(1, retries + 1): try: ret = function(*args, **kwargs) return ret except catch as e: last_exception = e - logger.debug("Attempt %i/%i of function '%s' failed", attempt, retries, - function.__name__) + logger.debug("Attempt %i/%i of function '%s' failed", attempt, retries, function.__name__) time.sleep(wait) except Exception as e: raise e diff --git a/environment.yml b/environment.yml index 4a80d25..1a70f5d 100644 --- a/environment.yml +++ b/environment.yml @@ -12,4 +12,5 @@ dependencies: - pylint - pip: - moto==0.4.30 + - pytest-flake8 - twine diff --git a/requirements.txt b/requirements.txt index eac329b..ea74fb1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,6 +8,7 @@ pyyaml pytest coverage pytest-cov +pytest-flake8 moto # Utils diff --git a/setup.cfg b/setup.cfg index 18f9b72..3b7797e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -9,3 +9,6 @@ versionfile_source = dask_ec2/_version.py versionfile_build = dask_ec2/_version.py tag_prefix = parentdir_prefix = dask_ec2- + +[tool:pytest] +flake8-max-line-length = 120 diff --git a/versioneer.py b/versioneer.py index b2e4d21..9e590fc 100644 --- a/versioneer.py +++ b/versioneer.py @@ -418,6 +418,7 @@ def get(parser, name): class NotThisMethod(Exception): pass + # these dictionaries contain VCS-specific tools LONG_VERSION_PY = {} HANDLERS = {} @@ -1501,7 +1502,7 @@ def run(self): "STYLE": cfg.style, "TAG_PREFIX": cfg.tag_prefix, "PARENTDIR_PREFIX": cfg.parentdir_prefix, - "VERSIONFILE_SOURCE": cfg.versionfile_source,}) + "VERSIONFILE_SOURCE": cfg.versionfile_source}) cmds["build_exe"] = cmd_build_exe del cmds["build_py"] @@ -1601,7 +1602,7 @@ def do_setup(): "STYLE": cfg.style, "TAG_PREFIX": cfg.tag_prefix, "PARENTDIR_PREFIX": cfg.parentdir_prefix, - "VERSIONFILE_SOURCE": cfg.versionfile_source,}) + "VERSIONFILE_SOURCE": cfg.versionfile_source}) ipy = os.path.join(os.path.dirname(cfg.versionfile_source), "__init__.py") if os.path.exists(ipy):