### Playing with terraform

We start with a few basics:

1. `base.tf`: contains the 'boilerplate' - everything except the description of any instances
2. `terraform_ec2_key` and `teraform_ec2_key.pub` - key pair for accessing the instances
3. `simplejob.sh` - a simple shell script, with xbow decorations, slurm-style
4. `input1.dat` and `input2.dat` - input files needed by simplejob.sh

Our aim is to run `simplejob.sh` on a remote instance, and get the output back.

One design feature: rather than stage files directly between here and the remote instance, we
will go via an intermediate s3 bucket. That way we can keep output data safe if we delete the instance.

The desired workflow is as follows:

1. We run 'terraform apply' to get to a base state
2. By parsing the terraform.tfstate file, we get a few useful and constant things like the name of the s3 bucket.
3. We parse 'simplejob.sh' to find out what resources it is requesting.
4. We are going to create a new job. As with a conventional job scheduler, we neeed to give the job a unique id (just a sequence number). We inspect the s3 bucket: the files for each job go into a unique "directory" in the bucket that has the same name as the job id. We set the new job id to one higher than the highest number we find here.
5. We create a .tf file that contains the specification of the new instance. This includes tagging it with the job id.
6. We run 'terraform apply' to create the new instance.
7. While it launches, we upload the input files to the s3 bucket - not forgetting to add the job script file (simplejob.sh) as well.
8. Once we get confirmation that the new instance is up and running, we transfer the job files to it, placing them in a subdirectory $HOME/job_id
9. We submit the job to tsp
10. We poll tsp evey so often, until the job is complete.
11. We re-sync the job directory with the s3 bucket, so uploading the output files
12. We delete the .tf file for the worker we have just finished using
13. We run 'terraform apply' to destroy the worker
14. We download the output files from the s3 bucket
15. We clean out the s3 bucket

The design means that it is possible to have multiple jobs running at any time, each on its own worker and each reading/writing to a unique 'directory' in the s3 bucket. 

It also makes it possible that there could be an independent "reaper" daemon process that every so often checks each current worker to see if it has finished running its job, and if so executes steps 11-13 above

Once in real code, there would be possibilities to speed things up by doing stuff concurrently - e.g. while the instance boots up, files can be transferred to the s3 bucket, and while the instance is destroyed, files can be being copied out of the s3 bucket back into the local directory.

Once in real code, it might look something like:

1. To initialize everything, only run once:
```
xbow init
```
2. To submit a job:
```
xbow submit <jobfile>
```  
3. To check on the status of running jobs:
```
xbow stat
```
4. To look at the progress of a particular job (ls -l of the remote directory):
```
xbow ls <job_id>
```
5. To retrieve data from a finished job:
```
xbow retrieve <job_id>
```    
6. To stop a job (or clean out a finished one manually):
```
xbow rm <job_id>
```   
    

In [1]:
import json
import os.path as op
import os
import subprocess

First a command to run terraform (assumed here to be using the Docker image):

In [2]:
def terraform(command):
    """
    Run terraform with the given command
    """
    base_command = 'docker run -i -v "$PWD":/wd -w /wd -v "$HOME"/.aws:/root/.aws hashicorp/terraform:light'
    result = subprocess.run(base_command + ' ' + command, shell=True, capture_output=True)
    return result

Run 'terraform apply':

In [36]:
result = terraform('apply -no-color -auto-approve')
if result.returncode != 0:
    print(result.stdout.decode())

Load the terraform .tfstate file, and find out how to get various useful stuff out of it...

In [37]:
def get_tfstate():
    with open('terraform.tfstate') as f:
        data = json.load(f)
    tfstate = {}
    
    worker_list = []
    for resource in data['resources']:
        for instance in resource['instances']:
            if 'ami' in instance['attributes']:
                worker_list.append(instance['attributes'])
    workers = {}
    for worker in worker_list:
        workers[worker['tags']['JobId']] = worker
    tfstate['workers'] = workers
    
    bucket = None
    for resource in data['resources']:
        for instance in resource['instances']:
            if 'bucket' in instance['attributes']:
                bucket = instance['attributes']
    tfstate['bucket_name'] = bucket['bucket']
    tfstate['region'] = bucket['region']
    
    key_name = None
    for resource in data['resources']:
        for instance in resource['instances']:
            if 'key_name' in instance['attributes']:
                key_name = instance['attributes']['key_name']
    tfstate['key_name'] = key_name
    
    return tfstate

In [38]:
tfstate = get_tfstate()

Check existing workers (there may be none at this stage)

In [40]:
workers = tfstate['workers']
for w in workers:
    print(w, workers[w]['public_ip'], workers[w]['spot_bid_status'])

In [41]:
print(tfstate['bucket_name'])
print(tfstate['key_name'])
print(tfstate['region'])
bucket_name = tfstate['bucket_name']
key_name = tfstate['key_name']

laughtongroup.charlie.xbow
terraform_ec2_key
eu-west-1


Parse a job file and extract the options

In [42]:
def parse_script(scriptfile):
    """
    Extract xbow parameters from a script file
    """
    with open(scriptfile) as f:
        lines = f.readlines()
    result = {}
    for line in lines:
        if line[:6] == '#XBOW ':
            words = line.split()
            if len(words) != 2:
                raise ValueError('Error cannot parse {}'.format(line))
            paramdef = words[1]
            if paramdef[:2] != '--':
                raise ValueError('Error cannot parse {}'.format(line))
            if not '=' in paramdef:
                raise ValueError('Error cannot parse {}'.format(line))
            try:
                key, value = paramdef[2:].split('=')
            except:
                raise ValueError('Error cannot parse {}'.format(line))
            if key in result:
                result[key].append(value)
            else:
                result[key] = [value]
    for key in result:
        if len(result[key]) == 1:
            result[key] = result[key][0]
    return result

In [43]:
job_script = 'simplejob.sh'
job_options = parse_script(job_script)
print(job_options)

{'instance_type': 't2.small', 'job_name': 'mysim', 'upload': ['input1.dat', 'input2.dat']}


In [None]:
def next_job_id(bucket_name):
    """
    The next job should have an id one greater than the largest so far
    """
    result = subprocess.run('aws s3 ls s3://{}/'.format(bucket_name).split(), capture_output=True)
    if result.returncode != 0:
        raise RuntimeError('Error getting job ids from bucket')
    job_ids = []
    for line in result.stdout.decode().split('\n'):
        if 'PRE' in line:
            job_ids.append(line.split()[1][:-1])
    job_ids = [int(j) for j in job_ids]
    job_ids.sort()
    if len(job_ids) > 0:
        next_job = job_ids[-1] + 1
    else:
        next_job = 0
    return next_job

In [66]:
next_job = next_job_id(bucket_name)
print(next_job)

0


In [45]:
def create_instance_tf_file(instance_spec):
    """
    Create a .tf file for a new instance
    """
    required_keys = ['job_index', 'instance_type', 'xbow_bucket', 'key_name']
    for key in required_keys:
        if not key in instance_spec:
            raise ValueError('Error - instance specification missing required key {}'.format(key))
    
    tf_instance_template = """resource "aws_spot_instance_request" "worker_{job_index}" {{
  ami           = data.aws_ami.base.id
  instance_type = "{instance_type}"
  key_name = "{key_name}"
  security_groups = ["allow_ssh"]
  iam_instance_profile = "EC2InstanceRole"

  depends_on = [aws_s3_bucket.xbow_bucket]

  tags = {{
    Name = "Worker-{job_index}"
    JobId = "{job_index}"
  }}
}}

output "worker_{job_index}_public_ip" {{
  value = aws_spot_instance_request.worker_{job_index}.public_ip
}}
output "worker_{job_index}_spot_request_state" {{
  value = aws_spot_instance_request.worker_{job_index}.spot_request_state
}}
output "worker_{job_index}_spot_bid_status" {{
  value = aws_spot_instance_request.worker_{job_index}.spot_bid_status
}}
    """
    tf_file = 'worker_{job_index}.tf'.format(**instance_spec)
    with open(tf_file, 'w') as f:
        f.write(tf_instance_template.format(**instance_spec))
    return tf_file

In [46]:
instance_spec = {
    'job_index':      next_job,
    'instance_type':  job_options['instance_type'],
    'xbow_bucket':    bucket_name,
    'key_name':       key_name
}
tf_file = create_instance_tf_file(instance_spec)

In [47]:
result = terraform('apply -no-color -auto-approve')
if result.returncode != 0:
    print(result.stdout.decode())

Check the state of the instance - if not ready yet don't worry, we can get on with uploading data to the s3 bucket:

In [49]:
tfstate = get_tfstate()
workers = tfstate['workers']
for w in workers:
    print(w, workers[w]['public_ip'], workers[w]['spot_request_state'])

0 None open


Transferring files onto the instance via an intermediate s3 bucket (for resilience/backup)

In [50]:
class S3Stager(object):
    """
    A thing for moving files to and from instances via s3
    """
    def __init__(self, bucket_id, remote_ip, key_name, remote_dir):
        self.bucket_uri = 's3://{}'.format(bucket_id)
        self.blob_base = op.join(self.bucket_uri, remote_dir)
        self.remote_ip = remote_ip
        self.key_name = key_name
        self.remote_dir = remote_dir
    
    def upload(self, filenames):
        """
        Upload a local file to the remote instance, via s3
        """
        if not isinstance(filenames, list):
            filenames = [filenames]
        targetdir = self.blob_base + '/'
        for filename in filenames:
            result = subprocess.run(['aws', 's3', 'cp', filename, targetdir], capture_output=True)
            if result.returncode != 0:
                return result
        return result
    
    def sync(self):
        """
        Synchronise all files betweenthe s3 bucket and the instance
        """
        result = subprocess.run(['ssh', '-i', self.key_name, '-o', 'StrictHostKeyChecking=no', 'ubuntu@{}'.format(self.remote_ip), 
                                  'aws', 's3', 'sync', self.blob_base, self.remote_dir], capture_output=True)
        if result.returncode != 0:
            return result
        result = subprocess.run(['ssh', '-i', self.key_name, '-o', 'StrictHostKeyChecking=no', 'ubuntu@{}'.format(self.remote_ip), 
                                  'aws', 's3', 'sync', self.remote_dir, self.blob_base], capture_output=True)
        return result
        
    def download(self, filenames):
        """
        Download files from the S3 bucket to the current directory
        """
        if not isinstance(filenames, list):
            filenames = [filenames]
        include_string = ' '.join(['--include "{}"'.format(filename) for filename in filenames])
        
        result = subprocess.run(['aws', 's3', 'sync', self.blob_base, '.', '--exclude', '"*"'] + include_string.split(), capture_output=True)
        return result
    
    def ls(self):
        """
        List the contents of the s3 bucket
        """
        result = subprocess.run(['aws', 's3', 'ls', self.blob_base + '/'], capture_output=True)
        return result
    
    def purge(self):
        """
        Remove all files from the s3 bucket
        """
        result = subprocess.run(['aws', 's3', 'rm', self.blob_base + '/', '--recursive'], capture_output=True)
        return result
        

Transfer files to the instance, via the s3 bucket:

In [51]:
next_job = str(next_job)
stager = S3Stager(bucket_name, workers[next_job]['public_ip'], key_name, next_job)
files_to_upload = job_options['upload']
files_to_upload.append(job_script)
result = stager.upload(files_to_upload)
if result.returncode != 0:
    print(result)

In [52]:
result = stager.ls()
print(result.stdout.decode())

2020-01-24 17:28:11         40 input1.dat
2020-01-24 17:28:11         40 input2.dat
2020-01-24 17:28:12        158 simplejob.sh



If neccessary, wait for confirmation that the instance is ready

In [53]:
result = terraform('refresh -no-color')
if result.returncode != 0:
    print(result.stdout.decode())
tfstate = get_tfstate()
workers = tfstate['workers']
for w in workers:
    print(w, workers[w]['public_ip'], workers[w]['spot_request_state'])

0 34.253.86.87 active


Now files can be transferred for s3 to the new instance (note we have to recreate the stager object, to make sure it now has a valid ip address to connect to):

In [56]:
stager = S3Stager(bucket_name, workers[next_job]['public_ip'], key_name, next_job)
result = stager.sync()
if result.returncode != 0:
    print(result)

Run a command on a remote instance:

In [57]:
def remote_run(public_ip, key_name, command):
    result = subprocess.run(['ssh', '-i', key_name, '-o', 'StrictHostKeyChecking=no', 'ubuntu@{}'.format(public_ip)] + command.split(), capture_output=True)
    return result

Submit the job:

In [58]:
result = remote_run(workers[next_job]['public_ip'], key_name, 'cd {} &&  tsp sh {}'.format(next_job, job_script))
print(result.stdout.decode())

0



Check the job:

In [59]:
result = remote_run(workers[next_job]['public_ip'], key_name, 'tsp')
print(result.stdout.decode())

ID   State      Output               E-Level  Times(r/u/s)   Command [run=0/1]
0    finished   /tmp/ts-out.yOAhtp   0        0.00/0.00/0.00 sh simplejob.sh



In [60]:
result = remote_run(workers[next_job]['public_ip'], key_name, 'tsp -c')
print(result.stdout.decode())




In [61]:
result = remote_run(workers[next_job]['public_ip'], key_name, 'ls {}'.format(next_job))
print(result.stdout.decode())

input1.dat
input2.dat
output.dat
simplejob.sh



Re-sync the worker with the s3 bucket, so it's safe to destroy it

In [63]:
result = stager.sync()
if result.returncode != 0:
    print(result)

Remove the worker

In [67]:
os.remove(tf_file)

In [68]:
result = terraform('apply -no-color -auto-approve')
if result.returncode != 0:
    print(result.stdout.decode())

Download the results file from the s3 bucket, which can then be cleaned out

In [64]:
result = stager.download("*")
if result.returncode != 0:
    print(result)

In [65]:
result = stager.purge()
if result.returncode != 0:
    print(result)