In [None]:
%config IPCompleter.greedy=True

import boto3
import networkx as nx
import numpy as np
import uuid
import requests

import time
import os
import io
import configparser

In [None]:
config = configparser.ConfigParser()

# Write example config to disk.
config['AWS'] = {
    'ACCESS_KEY': '',
    'SECRET_KEY': ''
}

config['Master'] = {
    'MASTER_URL': '',
    'API_KEY': ''
}

with open('config.txt.sample', 'w') as configfile:
    config.write(configfile)
    
config = configparser.ConfigParser()
config.read('config.txt')

session = boto3.Session(
    aws_access_key_id=config['AWS']['access_key'], 
    aws_secret_access_key=config['AWS']['secret_key'], 
)


In [None]:
def getInstanceIdsByType():
    response = session.client("ec2").describe_instances()
    reservations = response['Reservations']
    
    instancesByType = dict()
    for reservation in reservations:
        for instance in reservation['Instances']:
            for tag in instance['Tags']:
                if tag['Key'] == 'instance_type':
                    instancetype = tag['Value']
                    if not instancetype in instancesByType:
                        instancesByType[instancetype] = [instance['InstanceId']]
                    else:
                        instancesByType[instancetype].append(instance['InstanceId'])
    
    return instancesByType    

def terminateInstancesWithIds(ids):
    session.client("ec2").terminate_instances(InstanceIds=ids)

def createMaster(session):
    ec2_client = session.client('ec2')

def generate_graph_bytes(directory, n, p):
    if not os.path.exists(directory):
        os.makedirs(directory)
    
    file = os.path.join(directory, '%s-%d-%s.graph' % (uuid.uuid4(), n, p))
    G = nx.fast_gnp_random_graph(int(n), p, directed=True)
        
    # Generate random edge weights.
    for (u, v) in G.edges():
        G.edges[u,v]['weight'] = round(np.random.random(), 2)

    m = nx.to_numpy_matrix(G, dtype=np.float32)
    m = np.matrix.flatten(m)
        
    with open(file, 'wb') as f:
        f.write(np.packbits(m > 0).tobytes())
        f.close()    
        
    m = m[m > 0]
            
    with open(file, 'ab') as f:
        f.write(m.tobytes())
        
    return file

def generate_graph(output, n, p, prepend_vertex_weight=False):    
    if prepend_vertex_weight:
        # Uniform for pagerank.
        node_weights = [str(1.0 / n)] * n
        output.write(str.encode(",".join(node_weights) + "\n"))

    # Generate ER graph with n nodes and edge probability p
    G = nx.erdos_renyi_graph(int(n), p, directed=True)
    
    # Generate random edge weights.
    
    for (u, v) in G.edges():
        G.edges[u,v]['weight'] = round(np.random.random(), 2)

    nx.write_weighted_edgelist(G, output, delimiter=",")

def run_experiment():
    api_url = config['Master']['master_url']
    api_key = config['Master']['api_key']
    max_workers = 5
    log_file = 'default_log'
    n_graphs = 10
    n_graph_nodes = 1000
    p_graph_edge = 0.5
    
    master_user_data = """#!/bin/bash

    curl -L https://github.com/doriandekoning/IN4392-cloud-computing-lab/releases/download/$(curl --silent "https://api.github.com/repos/doriandekoning/IN4392-cloud-computing-lab/releases/latest" | grep -Po '"tag_name": "\K.*?(?=")')/master_amd64_linux  > /home/ubuntu/go/bin/master && chmod +x /home/ubuntu/go/bin/master

    echo 'export MAXWORKERS=%s' > /home/ubuntu/env
    echo 'export MASTER_PORT=8000' >> /home/ubuntu/env
    echo 'export MASTER_ADDRESS=%s' >>/home/ubuntu/env
    echo 'export OWN_PORT=8888' >> /home/ubuntu/env
    echo 'export OWN_ADDRESS=http://`wget http://ipecho.net/plain -O - -q ; echo`' >> /home/ubuntu/env
    echo 'export API_KEY=%s' >> /home/ubuntu/env
    echo 'export OWN_INSTANCEID=`ec2metadata --instance-id`' >> /home/ubuntu/env
    echo 'export LOGFILE=%s' >> /home/ubuntu/env
    """ % (max_workers, api_url, api_key, log_file)
    
    try:
        # Connect to EC2
        ec2_client = session.client('ec2')

        # Starting up a master.
        print("Starting master...")
        response = ec2_client.run_instances(LaunchTemplate={'LaunchTemplateName': "Master"}, MinCount=1, MaxCount=1, UserData=master_user_data)
        
        initialized = False
        # Wait until the master is ready.
        while not initialized:
            # Attempt to contact /health, if we manage to we are ready.
            try:
                time.sleep(5)
                r = requests.get('%s/health' % api_url, headers={'X-Auth': api_key}, timeout=5)
                
                if r.status_code is 200 and r.json()['Workers'] != None:                        
                    initialized=True
                    print("Master is ready!")
                else:
                    print("Waiting for workers...")
                        

            except (requests.Timeout, requests.ConnectionError) as e: 
                print("Waiting for master...")
                continue

        # Generate graphs and post them to the system.
        for i in range(n_graphs):
            file = generate_graph_bytes("./graphs", 8, 0.5)
            
            try:
                print('Submitting graph', i, end='\r')
                r = requests.request("POST", "%s/processgraph" % api_url, files={'file': open(file, 'rb')}, headers={'X-Auth': api_key}, params={"algorithm":"pagerank","maxsteps":"1000", "size": n})
            except Exception as e: 
                print("Error submitting graph for processing.", e)
            
        print("All graphs submitted!")
        # If all graphs are posted we start polling every 10 seconds if the system is done.
        finished = False
        while not finished:
            # Every 10 seconds, check if we're done.
            time.sleep(10)
            finished = True
            print("Master has finished processing...")

            
        # Make sure all workers are being killed.
        print("Request to kill workers.")
        r = requests.get('%s/killworkers' % api_url, headers={'X-Auth': api_key}, timeout=20)
        # Make sure all logs are being written.
        
        print("Request to write metrics...")
        r = requests.get('%s/forcewritemetrics' % api_url, headers={'X-Auth': api_key}, timeout=20)
    except Exception as e:
        print(e)
    finally:
        # Sleep so the master can finish/kill workers.
        time.sleep(10)
        
        print("Terminating master...")
        # Terminate all masters.
        instances = getInstanceIdsByType()
        terminateInstancesWithIds(instances['master'])
        
        # Also terminate workers to be sure we clean everything up.
        terminateInstancesWithIds(instances['worker'])

        print("Finished...")


In [None]:
run_experiment()