In [33]:
import boto3
import base64
import paramiko
import time
import uuid
import datetime
import tempfile
import os

In [34]:
#%%writefile meters_jn.py
import boto3
import datetime

class SpotMeter(object):
    """
    APPROXIMATE cost meter for a spot instance.
    """
    def __init__(self, instance_type, availability_zone, count = 1):
        """
        initialise the meter.

        Args:
            instance_type (str): The EC2 instance type
            availability_zone (str): The EC2 availability zone
            count (int): The number of instances - multiplier for all costs
        """
        self.instance_type = instance_type
        self.availability_zone = availability_zone
        self.count = count
        self.ec2_resource = boto3.resource('ec2')
        dph = self.ec2_resource.meta.client.describe_spot_price_history
        data = dph(InstanceTypes=[instance_type],
                   StartTime=datetime.datetime.now(),
                   Filters=[{'Name': 'product-description',
                             'Values':['Linux/UNIX']},
                            {'Name': 'availability-zone',
                             'Values': [availability_zone]}
                           ]
                  )

        self.tz = data['SpotPriceHistory'][0]['Timestamp'].tzinfo
        self.start_time = datetime.datetime.now(self.tz)

    def current_price(self):
        """
        Get the current spot price.
        The value is for *count* instances of type *instance_type* in
        availability zone *availability_zone*.

        Returns:
            price (float): The current spot price in US dollars per hour.
        """

        dph = self.ec2_resource.meta.client.describe_spot_price_history
        data = dph(InstanceTypes=[instance_type],
                   StartTime=datetime.datetime.now(),
                   Filters=[{'Name': 'product-description',
                             'Values':['Linux/UNIX']},
                            {'Name': 'availability-zone',
                             'Values': [self.availability_zone]}
                           ]
                  )

        spot_price = float(data['SpotPriceHistory'][0]['SpotPrice'])
        return spot_price * self.count

    def total_cost(self):
        """
        Total cost since the meter was started

        Returns:
            cost (float): The total cost in US dollars.
        """
        dph = self.ec2_resource.meta.client.describe_spot_price_history
        data = dph(InstanceTypes=[instance_type],
                   EndTime=datetime.datetime.now(self.tz),
                   StartTime=self.start_time,
                   Filters=[{'Name': 'product-description',
                             'Values':['Linux/UNIX']},
                            {'Name': 'availability-zone',
                             'Values': [self.availability_zone]}
                           ]
                  )
        costsum = 0
        then = datetime.datetime.now(self.tz)
        for d in data['SpotPriceHistory']:
            now = then
            then = d['Timestamp']
            if then < self.start_time:
                then = self.start_time
            period = (now - then).seconds / 3600.0
            costsum += period * float(d['SpotPrice'])

        return costsum * self.count

    def total_time(self):
        """
        Total time (in hours) the meter has been running.

        Returns:
            time (float): total time in hours.
        """
        period = datetime.datetime.now(self.tz) - self.start_time
        return period.seconds / 3600.0

In [35]:
#%%writefile instances_jn.py
import boto3
import paramiko
import time
import uuid
import datetime

class ConnectedInstance(object):
    """ An Instance you can talk to"""
    def __init__(self, instance,  username, key_filename):
        """
        Create a ConnectedInstance.

        Args:
            instance (boto3 Instance): A boto3 Instance
            username (str): The username required to connect to the instance
            key_filename (str): Name of the .pem file

        Attributes:
            ststus (str): Information about whether the instance can. or is,
                doing anything. Can take values "unknown", "ready", "busy", and
                "unavailable".
            state (str): Information about the general health of the instance.
                Can take standard EC2 values ("running", "terminated", etc.) but
                also "usable" when the instance is running and has also passed
                accessibility checks.
            output (str): The output received from the instance so far, since
                the last command was sent to it.
            exit_status (str): The exit status of the last command sent to the
                instance.

        """
        self.instance = instance
        region = instance.placement['AvailabilityZone'][:-1]
        self.resource = boto3.resource('ec2', region_name=region)
        self.status = 'unknown'
        self.state = 'unknown'
        self.output = None
        self.exit_status = None
        self.update()
        if self.state != 'usable':
            return

        self.sshclient = paramiko.SSHClient()
        self.sshclient.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.sshclient.connect(instance.public_ip_address, username=username,
                               key_filename=key_filename, timeout=10)
        self.status = 'ready'
        self.output = None

    def update(self):
        """
        Update status info.

        Updates the *state*, *status*, *output* and *exit_status* attributes
        of the ConnectedInstance.
        """

        self.instance.reload()
        self.state = self.instance.state['Name']
        if self.state == 'running':
            dis = self.resource.meta.client.describe_instance_status
            status = dis(InstanceIds=[self.instance.id])['InstanceStatuses'][0]
            system_status = status['SystemStatus']['Status']
            instance_status = status['InstanceStatus']['Status']
            if system_status == 'ok' and instance_status == 'ok':

                self.state = 'usable'

        if self.status == 'busy':
            while self.channel.recv_ready():
                self.output += self.channel.recv(1024)
            if self.channel.exit_status_ready():
                self.exit_status = self.channel.recv_exit_status()
                if self.state == 'usable':
                    self.status = 'ready'
                else:
                    self.status = 'unavailable'

    def wait(self, timeout=None):
        """
        Wait until not busy.

        Args:
            timeout (float, optional): The maximum time to wait, in seconds. If
                not supplied, wait will wait as long as required.
        """
        start_time = time.time()
        max_wait_exceeded = False
        time.sleep(1)
        self.update()
        while self.status == 'busy' and not max_wait_exceeded:
            self.update()
            if timeout is not None:
                max_wait_exceeded = (time.time() - start_time).seconds > timeout
            if not max_wait_exceeded:
                time.sleep(5)

    def exec_command(self, script, block=True):
        """
        Send a command to the instance.

        Args:
            script (str): The unix command to execute on the instance.
            block (bool, optional): Whether to wait for the command to complete
                or return immediately.
        """

        self.update()
        if self.status == 'unavailable':
            self.output = 'Error - this instance is unavailable'
            self.exit_status = -1
            return

        if self.status != 'ready':

            self.output = 'Error - this instance is not ready'
            self.exit_status = -1
            return

        transport = self.sshclient.get_transport()
        self.channel = transport.open_session()
        self.channel.set_combine_stderr(True)
        self.channel.exec_command(script)
        self.status='busy'
        self.exit_status=None
        self.output = ''
        if block:
            self.wait()
        else:
            return

    def upload(self, localfile, remotefile):
        """
        Upload a file to the instance.
        
        Args:
            localfile (str): name of the file on the local filkesystem.
            remotefile (str): name of the file on the instance's filesystem.
                Relative paths are interpreted relative to the $HOME directory
                of the instances "username*.

        """
        transport = self.sshclient.get_transport()
        sftp = paramiko.SFTPClient.from_transport(transport)
        sftp.put(localfile, remotefile)
        sftp.close()

    def download(self, remotefile, localfile):
        """
        Download a file from the instance.
        
        Args:
            remotefile (str): name of the file on the instance's filesystem.
                Relative paths are interpreted relative to the $HOME directory
                of the instances "username*.
            localfile (str): name of the file on the local filesystem.
        """
        transport = self.sshclient.get_transport()
        sftp = paramiko.SFTPClient.from_transport(transport)
        sftp.get(remotefile, localfile)
        sftp.close()

def create_connected_instance(image_id, instance_type, region=None, name=None, 
                    user_data=None, security_groups=None, username=None,
                    shared_file_system=None, mount_point=None):
    """
    Creates a single connected instance - not in the spot pool
    """
    if region is None:
        region = boto3.session.Session().region_name
    if region is None:
        raise ValueError('Error - no region identified')
    ec2_resource = boto3.resource('ec2', region_name=region)
    if name is None:
        name = str(uuid.uuid4())[:8]
    key_name = name
    pem_file = '/Users/pazcal/.xbow/{}.pem'.format(key_name)
    response = ec2_resource.meta.client.describe_key_pairs(Filters=[{'Name': 'key-name', 'Values': [key_name]}])
    if len(response['KeyPairs']) == 0:
        response = ec2_resource.meta.client.create_key_pair(KeyName=key_name)
        with open(pem_file, 'w') as f:
            f.write(response['KeyMaterial'])
        os.chmod(pem_file, 0600)

    image = ec2_resource.Image(image_id)
    if username is None:   
        if image.tags is None:
            raise ValueError('Error - a username is required ')
        tagdict = {}
        for tag in image.tags:
            tagdict[tag['Key']] = tag['Value']
        username = tagdict.get('username')
        if username is None:
            raise ValueError('Error - a username is required ') 
    else:
        if image.tags is None:
            image.create_tags(Tags=[{'Key': 'username', 'Value': username}])
        else:
            tagdict = {}
            for tag in image.tags:
                tagdict[tag['Key']] = tag['Value']
            username = tagdict.get('username')
            if username is None:
                image.create_tags(Tags=[{'Name': 'username', 'Values': [username]}])

    efs_client = boto3.client('efs', region_name=region)
    if shared_file_system is not None:
        dfs = efs_client.describe_file_systems
        response = dfs(CreationToken=shared_file_system)['FileSystems']
        if len(response) > 0:
            FileSystemId = response[0]['FileSystemId']
        else:
            cfs = efs_client.create_file_system
            response = cfs(CreationToken=shared_file_system)
            FileSystemId = response['FileSystemId']

            subnets = ec2_resource.subnets.all()
            sgf = ec2_resource.security_groups.filter
            security_groups = sgf(GroupNames=efs_security_groups)

            efs_security_groupid = [security_group.group_id
                                    for security_group in security_groups]
            for subnet in subnets:
                cmt = efs_client.create_mount_target
                cmt(FileSystemId=FileSystemId,
                    SubnetId=subnet.id,
                    SecurityGroups=efs_security_groupid
                   )
        mount_command = '#!/bin/bash\n mkdir {}\n'.format(mount_point)
        dnsname = '{}.efs.{}.amazonaws.com'.format(FileSystemId, region)
        mount_command += 'mount -t nfs -o nfsvers=4.1,rsize=1048576,'
        mount_command += 'wsize=1048576,hard,timeo=600,retrans=2 '
        mount_command += '{}:/ {}\n'.format(dnsname, mount_point)
        mount_command += ' chmod go+rw {}\n'.format(mount_point)
    else:
        mount_command = None
    if user_data is None:
        user_data = mount_command
    else:
        user_data = mount_command + user_data
        
    instance = ec2_resource.create_instances(ImageId=image_id, InstanceType=instance_type, KeyName=key_name,
                                              UserData=user_data, SecurityGroups=security_groups,
                                              ClientToken=str(uuid.uuid4()), MaxCount=1, MinCount=1)[0]
    instance.wait_until_running()
    ci = ConnectedInstance(instance, username, pem_file)
    return ci

In [36]:
#%%writefile pools_jn.py
def create_spot_pool(count=1, price=1.0, image_id=None, region=None,
                     instance_type=None, name=None, user_data=None,
                     security_groups=None, username=None,
                     shared_file_system=None, mount_point=None):
    """
    Creates an instance of a SpotInstancePool.

    Args:
        count (int, optional): Number of ConnectedInstances in the pool.
        price (float, optional): The target spot price, in dollars.
        image_id (str): The AMI to use.
        region (str, optional): The EC2 region to create instances in. If not
            specified the value in tbe boto3 configuration file is used.
        name (str, optional): The name to give the SpotInstancePool. If
            supplied, it must not mach any known pool in the same region.
        security_groups (list): List of security groups for the instance.
        username (str, optional): The username to connect to the instance. If
            not supplied, an attempt will be name to find it from the tags
            associated with the AMI.
        user_data (str, optional): Commands to be executed at start-up.
        shared_file_system (str, optional): Name of an efs file system to
            attach to each instance.
        mount_point (str, optional): Mount directory for the shared file system.

    Returns:
        SpotInstancePool

    """
    if region is None:
        region = boto3.session.Session().region_name
    if region is None:
        raise ValueError('Error - no region identified')
    ec2_resource = boto3.resource('ec2', region_name=region)
    if name is not None:
        response = ec2_resource.meta.client.describe_spot_instance_requests(Filters=[{'Name': 'launch-group', 'Values':[name]},
          {'Name': 'state', 'Values': ['open', 'active']}])
        spot_instance_request_ids = [s['SpotInstanceRequestId'] for s in response['SpotInstanceRequests']]
        if len(spot_instance_request_ids) > 0:
            raise ValueError('Error - spot pool {} already exists'.format(name))
        launch_group = name
        key_name = launch_group
    else:
        key_name = str(uuid.uuid4())[:8]
        launch_group = key_name
    pem_file = '/Users/pazcal/.xbow/{}.pem'.format(launch_group)
    response = ec2_resource.meta.client.describe_key_pairs(Filters=[{'Name': 'key-name', 'Values': [key_name]}])
    if len(response['KeyPairs']) == 0:
        response = ec2_resource.meta.client.create_key_pair(KeyName=key_name)
        with open(pem_file, 'w') as f:
            f.write(response['KeyMaterial'])
        os.chmod(pem_file, 0600)

    if username is None:
        image = ec2_resource.Image(image_id)
        tagdict = {}
        for tag in image.tags:
            tagdict[tag['Key']] = tag['Value']
        username = tagdict.get('username')

    efs_client = boto3.client('efs', region_name=region)
    if shared_file_system is not None:
        dfs = efs_client.describe_file_systems
        response = dfs(CreationToken=shared_file_system)['FileSystems']
        if len(response) > 0:
            FileSystemId = response[0]['FileSystemId']
        else:
            cfs = efs_client.create_file_system
            response = cfs(CreationToken=shared_file_system)
            FileSystemId = response['FileSystemId']

            subnets = ec2_resource.subnets.all()
            sgf = ec2_resource.security_groups.filter
            security_groups = sgf(GroupNames=efs_security_groups)

            efs_security_groupid = [security_group.group_id
                                    for security_group in security_groups]
            for subnet in subnets:
                cmt = efs_client.create_mount_target
                cmt(FileSystemId=FileSystemId,
                    SubnetId=subnet.id,
                    SecurityGroups=efs_security_groupid
                   )
        mount_command = '#!/bin/bash\n mkdir {}\n'.format(mount_point)
        dnsname = '{}.efs.{}.amazonaws.com'.format(FileSystemId, region)
        mount_command += 'mount -t nfs -o nfsvers=4.1,rsize=1048576,'
        mount_command += 'wsize=1048576,hard,timeo=600,retrans=2 '
        mount_command += '{}:/ {}\n'.format(dnsname, mount_point)
        mount_command += ' chmod go+rw {}\n'.format(mount_point)
    else:
        mount_command = None
    if user_data is None:
        user_data = mount_command
    else:
        user_data = mount_command + user_data
        
    print user_data
    #raise ValueError('Stop here')
    rsi = ec2_resource.meta.client.request_spot_instances
    response = rsi(ClientToken=str(uuid.uuid4()),
                   InstanceCount=count,
                   SpotPrice=price,
                   Type='persistent',
                   LaunchGroup=launch_group,
                   LaunchSpecification={
                                        'SecurityGroups': security_groups,
                                        'ImageId': image_id,
                                        'InstanceType': instance_type,
                                        'KeyName': key_name,
                                        'UserData': base64.b64encode(user_data)
                                       })

    n_up = 0
    while n_up == 0:
        time.sleep(5)
        response = ec2_resource.meta.client.describe_spot_instance_requests(Filters=[{'Name': 'launch-group', 'Values':[launch_group]},
              {'Name': 'state', 'Values': ['open', 'active']}])
        spot_instance_request_ids = [s['SpotInstanceRequestId'] for s in response['SpotInstanceRequests']]
        n_up = len(spot_instance_request_ids)
    sip = SpotInstancePool(launch_group, region)
    return sip

class SpotInstancePool(object):
    """A pool of persistent connected spot instances"""

    def __init__(self, name, region=None):
        """
        Load an instance of a SpotInstancePool.

        Args:
            name (str): name of the SpotInstancePool, as given when it was
                created.
            region (str, optional): Name of the EC2 region. If not supplied,
                the default value from the boto3 configuration file is used.

        Attributes:
            name (str): Name of the pool.
            status (str): Status of the pool. Takes the value of the most
                significant ststus of any instance in the pool, in the order:
                "busy" > "unavailable" > "ready" > "unknown".
            meter (InstanceMeter): an InstanceMeter for the pool, initialised
                at the time of loading (not of pool creation!).
            outputs (list): Outputs from the last command run on each instance
                in the pool.
            exit_statuses (list): Exit status of the last command run on each
                instance in the pool.
        """

        if region is None:
            region = boto3.session.Session().region_name
        if region is None:
            raise ValueError('Error - no region identified')
        self.ec2_resource = boto3.resource('ec2', region_name=region)
        self.name = name
        response = self.ec2_resource.meta.client.describe_spot_instance_requests(Filters=[{'Name': 'launch-group', 'Values':[name]},
          {'Name': 'state', 'Values': ['open', 'active']}])
        self.spot_instance_request_ids = [s['SpotInstanceRequestId'] for s in response['SpotInstanceRequests']]
        if len(self.spot_instance_request_ids) == 0:
            raise ValueError('Error - spot pool {} does not exist'.format(name))
        else:
            self.instance_count = len(self.spot_instance_request_ids)

        az = response['SpotInstanceRequests'][0]['LaunchSpecification']['Placement']['AvailabilityZone']
        instance_type = response['SpotInstanceRequests'][0]['LaunchSpecification']['InstanceType']
        self.meter = SpotMeter(instance_type, az, count=self.instance_count)
        self.launch_group = name
        self.key_name = name
        self.kp = self.ec2_resource.KeyPair(self.key_name)
        self.pem_file = '/Users/pazcal/.xbow/{}.pem'.format(name)

        image_id = response['SpotInstanceRequests'][0]['LaunchSpecification']['ImageId']
        image = self.ec2_resource.Image(image_id)
        tagdict = {}
        if image.tags is None:
            raise ValueError('Error - the chosen image does not define the username')
        for tag in image.tags:
            tagdict[tag['Key']] = tag['Value']
        self.username = tagdict.get('username')

        if self.username is None:
            raise ValueError('Error - the chosen image does not define the username')

        self.outputs = None
        self.exit_statuses = None
        self.status = None
        self.instances = None
        self.connected_instances = None
        self.refresh()

    def update(self):
        """
        Update the status of the pool
        Updates all pool attributes.
        """
        for ci in self.connected_instances:
            ci.update()
        self.outputs = [ci.output for ci in self.connected_instances]
        self.exit_statuses = [ci.exit_status for ci in self.connected_instances]
        statuses = [ci.status for ci in self.connected_instances]
        if 'busy' in statuses:
            self.status = 'busy'
        else:
            if 'unavailable' in statuses:
                self.status = 'unavailable'
            else:
                self.status = 'ready'

    def wait(self):
        """Wait until the pool is not busy"""
        for ci in self.connected_instances:
            ci.wait()
        self.outputs = [ci.output for ci in self.connected_instances]
        self.exit_statuses = [ci.exit_status for ci in self.connected_instances]
        statuses = [ci.status for ci in self.connected_instances]
        if 'unavailable' in statuses:
            self.status = 'unavailable'
        else:
            self.status = 'ready'

    def refresh(self):
        """Update the list of running instances.

        Checks the status of each instance in the pool, and if they appear
        to have died, waits for them to be replaced by the EC2 persistent
        spot instance process.
        """
        if self.connected_instances is not None:
            if self.status == 'busy':
                raise RuntimeError("Error - cannot refresh while the pool is busy")

        if self.instances is None:
            n_up = 0
        else:
            for i in self.instances:
                i.reload()
            n_up = [i.state for i in self.instances].count('running')

        while n_up < self.instance_count:
            self.instances = self.ec2_resource.instances.filter(Filters=[
                {'Name': 'instance-state-name', 'Values': ['running']},
                {'Name': 'spot-instance-request-id', 'Values': self.spot_instance_request_ids}
            ])
            self.instance_ids = [i.id for i in self.instances]
            n_up = len(self.instance_ids)
            if n_up < self.instance_count:
                time.sleep(15)

        my_waiter = self.ec2_resource.meta.client.get_waiter('instance_status_ok')
        my_waiter.wait(InstanceIds=self.instance_ids)
        self.connected_instances = [ConnectedInstance(i,
                                                      self.username,
                                                      self.pem_file)
                                    for i in self.instances]
        self.update()


    def terminate(self):
        """Terminate the pool of instances"""
        csr = self.ec2_resource.meta.client.cancel_spot_instance_requests
        response = csr(SpotInstanceRequestIds=self.spot_instance_request_ids)
        for i in self.instances:
            i.terminate()
        self.kp.delete()

    def exec_command(self, command, block=True):
        """
        Run a command on all instances in the pool.

        Args:
             command (str): The script to execute.
             block (bool, optional): Whether or not to wait until the command
                 completes before returning.
        """
        self.update()
        if self.status == 'unavailable':
            self.refresh()
        for ci in self.connected_instances:
            ci.output = None
            ci.exec_command(command, block=False)
        if block:
            self.wait()

    def exec_commands(self, commandlist, block=True):
        """
        Run each command in commandlist on a different instance.

        Args:
            commandlist (list): List of commands (str) to run. The list length
                must be less than or equal to the pool size.
            block (bool, optional): Whether or not to wait until the commands
                complete before returning.
        """

        self.update()
        if self.status == 'unavailable':
            self.refresh()
        if len(commandlist) > self.instance_count:
            raise ValueError('Error - more commands than available instances')
        for ci in self.connected_instances:
            ci.output = None
        for i in range(len(commandlist)):
            self.connected_instances[i].exec_command(commandlist[i], block=False)
        if block:
            self.wait()

    def upload(self, localfiles, remotefiles):
        """
        Upload files to the pool.

        Each local file in the list localfiles is uploaded to a different
        instance, with the remote name taken from the same element in
        remotefiles. If either of localfiles or remotefiles is a string, it
        is expanded to [filename] * *pool_size*. The two lists must be equal
        in length and less than or equal to *pool_size*. If the list lengths
        are less than *pool_size*, later instances get no files.
        """
        self.update()
        if self.status == 'unavailable':
            self.refresh()
        if isinstance(localfiles, list):
            if len(localfiles) > self.instance_count:
                raise ValueError('Error - more elements in localfiles list than instances in the pool')
        if isinstance(remotefiles, list):
            if len(remotefiles) > self.instance_count:
                raise ValueError('Error - more elements in remotefiles list than instances in the pool')
        if isinstance(localfiles, list) and not isinstance(remotefiles, list):
            remotefiles = [remotefiles] * len(localfiles)
        if isinstance(remotefiles, list) and not isinstance(localfiles, list):
            localfiles = [localfiles] * len(remotefiles)
        if not (isinstance(localfiles, list) and isinstance(remotefiles, list)):
            localfiles = [localfiles] * self.instance_count
            remotefiles = [remotefiles] * self.instance_count
        if len(localfiles) != len(remotefiles):
                raise ValueError('Error - filelists must be the same length')
        zlist = zip(localfiles, remotefiles)
        for i in range(len(zlist)):
            self.connected_instances[i].upload(zlist[i][0], zlist[i][1])

    def download(self, localfiles, remotefiles):
        """
        Download files from the pool.

        localfiles must be a list of length <= *pool_size*; remotefiles
        can be the same, or a single string in which case it is expanded
        to [remotefiles] * len(localfiles).
        """
        self.update()
        if self.status == 'unavailable':
            self.refresh()
        if not isinstance(localfiles, list):
            raise ValkueError('Error - localfiles must be a list')
        else:
            if len(localfiles) > self.instance_count:
                raise ValueError('Error - more elements in localfiles list than instances in the pool')
        if isinstance(remotefiles, list):
            if len(remotefiles) > self.instance_count:
                raise ValueError('Error - more elements in remotefiles list than instances in the pool')
        if not isinstance(remotefiles, list):
            remotefiles = [remotefiles] * len(localfiles)
        if len(localfiles) != len(remotefiles):
                raise ValueError('Error - filelists must be the same length')
        zlist = zip(localfiles, remotefiles)
        for i in range(len(zlist)):
            self.connected_instances[i].download(zlist[i][0], zlist[i][1])

In [37]:
region = 'eu-west-1'
price = '0.4'
worker_instance_type = 'm4.large'
scheduler_instance_type = 't2.small'
image_id = 'ami-9f0164e6' # new image created 22nd Jan
ec2_security_groups = ['efs-walkthrough1-ec2-sg']
efs_security_groups = ['efs-walkthrough1-mt-sg']
shared_file_system = 'MyTestFileSystem'
mount_point = '/home/ubuntu/shared'
scheduler_name = 'MyDaskScheduler'
worker_pool_name = 'MyDaskWorkers'

In [38]:
ec2_resource = boto3.resource('ec2', region_name=region)
efs_client = boto3.client('efs', region_name=region)

In [39]:
user_data = 'dask-scheduler --scheduler-file {}/.dsf.json &\n'.format(mount_point)
ci = create_connected_instance(
                        name=scheduler_name,
                        image_id=image_id,
                        instance_type=scheduler_instance_type,
                        shared_file_system=shared_file_system,
                        mount_point=mount_point,
                        security_groups=ec2_security_groups,
                        user_data=user_data,
                        username='ubuntu'
                      )


In [39]:
ci.update()
ci.status='unknown'
ci.update()
ci.exec_command('df')
print ci.output
print ci.status, ci.state, ci.instance.state['Name']

Error - this instance is not ready
unknown usable running


In [22]:
ci2 = ConnectedInstance(ci.instance, 'ubuntu', '/Users/pazcal/.xbow/MyDaskScheduler.pem')

In [24]:
ci2.exec_command('ls shared/CoCo-MD_example')
print ci2.output
print ci2.status, ci2.state, ci2.instance.state['Name']

10wk002ns100its.py
csaw_amber.pdb
csaw.min1
csaw.top
production_md.in
README
run_cocomd.py
tmd_1.in
tmd_2.in

ready usable running


In [29]:
ci2.upload('interface_jn.py', 'interfacess.py')

In [15]:
user_data = 'dask-worker --scheduler-file {}/.dsf.json --worker-port 45792 &\n'.format(mount_point)
sip = create_spot_pool(count=2,
                        name=worker_pool_name,
                        price=price,
                        image_id=image_id,
                        instance_type=worker_instance_type,
                        shared_file_system=shared_file_system,
                        mount_point=mount_point,
                        security_groups=ec2_security_groups,
                        user_data=user_data
                      )

#!/bin/bash
 mkdir /home/ubuntu/shared
mount -t nfs -o nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2 fs-d014ac19.efs.eu-west-1.amazonaws.com:/ /home/ubuntu/shared
 chmod go+rw /home/ubuntu/shared
dask-worker --scheduler-file /home/ubuntu/shared/.dsf.json --worker-port 45792 &



In [32]:
sip.terminate()

In [None]:
sip1.upload('bpti.crd', 'shared/bpti.crd')
sip1.upload('min1.in', 'shared/min1.in')

In [None]:
sip2.exec_command('sudo pip install extasy.coco')
print sip2.outputs

In [None]:
sip.exec_command('mkdir test; mkdir bin')
sip.exec_command("echo \#\!/bin/bash > bin/jobrunner.sh; echo chmod +x \$1 >> bin/jobrunner.sh")
sip.exec_command("echo nohup ./\$1 2\>\&1 \</dev/null \& >> bin/jobrunner.sh")
sip.exec_command("echo echo \$? \> _EXITCODE_ >> bin/jobrunner.sh")
#sip.exec_command("echo ps -e \| grep \$1 \| awk \'{print $1}\' >> bin/jobrunner.sh")
sip.exec_command('cat bin/jobrunner.sh')
print sip.outputs[0]

In [None]:
sip.exec_command("rm test/runme.sh; echo \#\!/bin/bash > test/runme.sh")
sip.exec_command('echo sleep 60; echo ls -al .. >> test/runme.sh; cat test/runme.sh')
print sip.outputs[0]

In [None]:
class BatchPool(object):
    """
    A simple batch job oriented pool class.
    
    """
    def __init__(self, pool):
        """Create a new BatchPool instance.
        
        Args:
            pool (SpotInstancePool): the pool of instances to run the jobs
            
        Attributes:
            status (list): status of each instance in the BatchPool. Each may be one of
                "running", "finished", or "terminated".
            outputs [list]: outputs (so far) from the last command submitted to each instance.
            exit_statuses [list]: exit status for the last command submitted to each instance, or None if
                it is still running.
        """
        self.pool = pool
        self.status = []
        self.jobids = []
        self.exit_statuses = []
        self.get_exitstatuses()
        self.pool.exec_command("ps -e | grep runme.sh")
        for output in self.pool.outputs:
            if 'runme.sh' in output:
                self.status.append('running')
                self.jobids.append(output.split()[0])
            else:
                self.status.append('idle')
                self.jobids.append(1)
        for i in range(len(self.exit_statuses)):
            if self.exit_statuses[i] != None and self.status == 'idle':
                self.status = 'finished'
        self.outputs = []
        self.wait()
    
    def submit(self, commands):
        """
        Submit commands to the pool.
        
        Args:
            commands (str or list): commands to execute. If a string, the same command
                is executed on each instance. If a list, each element is sent to a different
                instance.
            """
        if "running" is self.status:
            raise RuntimeError('Error - the pool is still busy')
        if not isinstance(commands, list):
            self.commands = [commands] * self.pool.instance_count
        else:
            self.commands = commands
        self.cleanup()
        self.pool.exec_command("echo \#\!/bin/bash > test/runme.sh")
        self.pool.exec_commands(["echo {} >> test/runme.sh".format(cmd) for cmd in self.commands])
        #self.pool.exec_command("echo echo \$? \> _EXITCODE_ >> test/runme.sh")
        self.pool.exec_command("cd test; ~/bin/jobrunner.sh runme.sh > runme.log; ps -e | grep runme.sh")
        self.jobids = [int(output.split()[0]) for output in self.pool.outputs]
        self.status = ['submitted'for i in range(self.pool.instance_count)]
        
    def update(self):
        """
        Update the batch pool attributes.
        """
        commands = ['ps -p {} -h'.format(jid) for jid in self.jobids]
        self.pool.exec_commands(commands)
        self.status = []
        for output in self.pool.outputs:
            if 'runme.sh' in output:
                self.status.append('running')
            else:
                self.status.append('finished')
        self.get_output()
        
    def wait(self):
        """
        Wait until all commands have completed.
        """
        self.update()
        while 'running' in self.status:
            time.sleep(10)
            self.update()
        self.get_exitstatuses()
        self.cleanup()
        
    def cancel(self):
        """
        Cancel all jobs.
        """
        self.pool.exec_command('killall runme.sh')
        cmds = []
        for i in range(len(self.status)):
            if self.status[i] == 'running':
                cmds.append('echo 1 > test/_EXITCODE_')
                self.status[i] = 'terminated'
            else:
                cmds.append('ls')
        self.pool.exec_commands(cmds)
        self.get_output()
        self.get_exitstatuses()
  
    def get_output(self):
        """
        Update the outputs attribute with data from the instances.
        """
        self.pool.exec_command('cat test/runme.log')
        self.outputs = self.pool.outputs

    def get_exitstatuses(self):
        self.pool.exec_command('if [[ -a test/_EXITCODE_ ]]; then cat test/_EXITCODE_; fi')
        self.exit_statuses = []
        for out in self.pool.outputs:
            if out == '':
                self.exit_statuses.append(None)
            else:
                self.exit_statuses.append(int(out))
            
    def cleanup(self):
        self.pool.exec_command('rm test/runme.log test/runme.sh test/_EXITCODE_')

In [None]:
bp = BatchPool(sip)
print bp.status

In [None]:
sip.exec_command('chmod +x bin/jobrunner.sh')

In [None]:
bp.submit('sudo pip install dask distributed')
print 'status=', bp.status
print 'jobids = ', bp.jobids
print 'exit statuses = ', bp.exit_statuses

In [None]:
time.sleep(5)
bp.update()
print 'status=', bp.status
bp.wait()
print 'status=', bp.status
print 'outputs=', bp.outputs
print 'exit status =', bp.exit_statuses

In [None]:
sip2 = create_spot_pool(count=1,
                        name='MySchedulerInstance',
                        price=price,
                        image_id=image_id,
                        instance_type=instance_type,
                        shared_file_system=shared_file_system,
                        mount_point=mount_point,
                        security_groups=ec2_security_groups,
                        )



In [None]:
sip2.exec_command('mkdir test; mkdir bin')
sip2.exec_command("echo \#\!/bin/bash > bin/jobrunner.sh; echo chmod +x \$1 >> bin/jobrunner.sh")
sip2.exec_command("echo nohup ./\$1 2\>\&1 \</dev/null \& >> bin/jobrunner.sh")
sip2.exec_command("echo echo \$? \> _EXITCODE_ >> bin/jobrunner.sh")
#sip.exec_command("echo ps -e \| grep \$1 \| awk \'{print $1}\' >> bin/jobrunner.sh")
sip2.exec_command('cat bin/jobrunner.sh')
sip2.exec_command('chmod +x bin/jobrunner.sh')
print sip2.outputs[0]

In [None]:
bp2 = BatchPool(sip2)
print bp2.status

In [None]:
#bp2.submit('sudo pip install dask distributed')
print 'status=', bp2.status
print 'jobids = ', bp2.jobids
bp2.wait()
print 'status=', bp2.status
print 'outputs=', bp2.outputs
print 'exit statuses = ', bp2.exit_statuses


In [None]:
bp2.submit('dask-scheduler --scheduler-file ~/shared/scheduler_file.json')
time.sleep(5)
bp2.update()
print bp2.outputs[0]

In [None]:
bp.submit('dask-worker --scheduler-file ~/shared/scheduler_file.json')
time.sleep(5)
bp.update()
print 'status=', bp.status
print bp.outputs[0]
print bp.outputs[1]

In [None]:
bp.update()
print 'status=', bp.status
print 'outputs=', bp.outputs[0]

In [None]:
sip2.exec_command('cat ~/shared/scheduler_file.json')
print sip2.outputs

In [None]:
sip1.terminate()

In [None]:
print sip.pem_file

In [None]:
sip.exec_commands(['cp tios/test/examples/bpti.tpr ~/shared'])
print sip.outputs[1]
print sip.exit_statuses
print [ci.status for ci in sip.connected_instances]

In [None]:

sip.exec_commands(['cat spam', 'source /usr/local/gromacs/2016.4/bin/GMXRC; gmx mdrun -deffnm ~/shared/bpti'])
print sip.outputs[1]
print sip.exit_statuses
print [ci.status for ci in sip.connected_instances]

In [None]:
print [ci.status for ci in sip.connected_instances]
#sip.update()

In [None]:
sip.refresh()
print sip.status

In [None]:
list(sip.instances)[0].terminate()
time.sleep(5)
print sip.get_status()
sip.update()
print sip.get_status()

In [None]:
sip3.terminate()

In [None]:
efs_client.delete_file_system(FileSystemId=FileSystemId)

In [None]:
result = ec2_resource.meta.client.describe_spot_instance_requests(Filters=[{'Name': 'launch-group', 'Values': ['TestLaunchGroup']}])

In [None]:
print [s['SpotInstanceRequestId'] for s in result['SpotInstanceRequests']]

In [None]:
print result['SpotInstanceRequests'][0]['LaunchSpecification']['ImageId']

In [None]:
sip.exec_command('sudo pip install dask distributed --upgrade')

In [None]:
print sip.outputs[1]

In [None]:
print boto3.session.Session().region_name