In [2]:
# !pip install paramiko
# !pip install scp

In [1]:
import os
import boto3
import subprocess
import numpy as np
import time
import datetime
import paramiko
import io
from scp import SCPClient, SCPException
import sys
from ast import literal_eval
from files.file_helper import fetch_local_files

In [2]:
np.set_printoptions(threshold=sys.maxsize)

In [86]:
INSTANCE_SIZE = 2

In [87]:
client = boto3.client('ec2', region_name='us-east-1')
# Create SQS client
sqs = boto3.resource('sqs')

In [115]:
def get_default_security_group(client, key_name):
    #extract key_name attribute from the security groups returned
    response = [group[key_name] for group in client.describe_security_groups()['SecurityGroups'] if group['GroupName'] == 'default']

    return response

def get_key_pairs(client, removeExisting=False):
    if removeExisting:
        client.delete_key_pair(KeyName='airscholar-key')

    keypairs = client.describe_key_pairs()['KeyPairs']
    keypair = list(filter(lambda x: x['KeyName'] == 'airscholar-key', keypairs))

    if not keypair:
        keypair = client.create_key_pair(KeyName='airscholar-key')
        f = io.StringIO(keypair['KeyMaterial'])
        data = f.read()
        file = open('labsuser.pem', 'w')
        file.write(data)
        file.close()
    else:
        keypair = keypair[0]

    return keypair

def launch_new_instance(client, keypair, count):
    response = client.run_instances(
        ImageId='ami-05723c3b9cf4bf4ff',
        InstanceType='t2.micro',
        KeyName=keypair,
        MaxCount=count,
        MinCount=count,
        Monitoring={
            'Enabled': True
        },
        SecurityGroupIds= get_default_security_group(client, key_name='GroupId')
    )
    ec2_inst_ids = [res["InstanceId"] for res in response]
    waiter = client.get_waiter('instance_running')
    waiter.wait(InstanceIds=[ec2_inst_ids])
    return ec2_inst_ids

def prepare_instances(client, keypair, count):
    ec2 = boto3.resource('ec2')
    ec2_inst_ids = []

    deployed_count = 0
    for instance in ec2.instances.all():
        deployed_count += 1
        if instance.state['Name'] == 'running':
            ec2_inst_ids.append(instance.id)

    if deployed_count < count:
        ec2_inst_ids.append(launch_new_instance(client, keypair, (count - deployed_count)))

    if not ec2_inst_ids:
        ec2_inst_ids.append(launch_new_instance(client, keypair, count))

    return ec2, ec2_inst_ids

def configure_ssh():
    sshs = []
    for count in range(0, INSTANCE_SIZE):
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        sshs.append(ssh)
    return sshs

def ssh_connect_with_retry(ssh, ip_address, retries):
    if retries > 3:
        return False
    f = open('labsuser.pem', 'r')
    privkey = paramiko.RSAKey.from_private_key(f)
    # print(privkey)
    interval = 5
    try:
        retries += 1
        print('SSH into the instance: {}'.format(ip_address))
        ssh.connect(hostname=ip_address,
                    username='ec2-user', pkey=privkey)
        return True
    except Exception as e:
        print(e)
        time.sleep(interval)
        print('Retrying SSH connection to {}'.format(ip_address))
        ssh_connect_with_retry(ssh, ip_address, retries)

def ssh_disconnect(ssh):
        """Close ssh connection."""
        if ssh:
            ssh.close()

def get_public_address(ec2, instance_id):
    # ec2 = boto3.resource('ec2', region_name='us-east-1')
    instance = ec2.Instance(id=instance_id)
    instance.wait_until_running()
    current_instance = list(ec2.instances.filter(InstanceIds=[instance_id]))
    ip_address = current_instance[0].public_ip_address
    return ip_address

def get_queue(sqs, queue_name):
    # Get the queue. This returns an SQS.Queue instance
    # There is no queue, create a new SQS queue
    attributes = {
        'DelaySeconds': '0',
        'MessageRetentionPeriod': '86400',
        "ReceiveMessageWaitTimeSeconds": "0"
    }

    for idx in range(INSTANCE_SIZE):
        sqs.create_queue(
            QueueName=f"{queue_name}{idx}",
            Attributes=attributes
        )

        sqs.create_queue(
            QueueName=f'result-queue-{idx}',
            Attributes=attributes
        )

def send_message_to_queue(sqs, queue_name, message):
    queue = sqs.get_queue_by_name(QueueName=queue_name)
    # Send message to SQS queue
    response = queue.send_messages(
        Entries=message
    )
    # print(response)
    return response

def install_required_packages(ssh):
    stdin, stdout, stderr = ssh.exec_command("sudo yum install pip -y && sudo pip install numpy boto3")
    return stdout, stderr

def initialise_instances(client):
    sshs = configure_ssh()
    keypair = get_key_pairs(client, False)
    ec2, instances = prepare_instances(client, keypair['KeyName'], INSTANCE_SIZE)
    ip_addresses = [get_public_address(ec2, instance) for instance in instances]

    for idx in range(0, len(sshs)):
        ssh = sshs[idx]
        ip_address = ip_addresses[idx]
        # connect to ssh
        print(f"Conencting to Instance-{idx} with IP Address {ip_address}")
        ssh_connect_with_retry(ssh, ip_address, 0)
        # install required python packages
        print(f"Installing required packages for Instance-{idx} with IP Address {ip_address}")
        stdout, stderr = install_required_packages(ssh)
        print(stdout.read().decode('utf-8'))
        print(stderr.read().decode('utf-8'))

        #configure aws access to the instance
        print(f"Configuring Instance -{idx} with IP Address {ip_address} for remote access")
        configure_aws_access_for_ssh(ssh, ip_address)

        #upload worker file to the instance
        scp = SCPClient(ssh.get_transport())
        bulk_upload(scp, fetch_local_files('./worker'), '~', ip_address)

        # start worker on the instance
        print(f"Starting worker {idx}")
        stdin, stdout, stderr = ssh.exec_command(f'python ./worker.py {idx}')
        # # print(stdout.read().decode('utf-8'))
        # # print(stderr.read().decode('utf-8'))

def get_messages_from_queue(instance_size, queue, message_size=10):
    messages = []
    sqs = boto3.resource('sqs')
    queue = sqs.get_queue_by_name(QueueName=queue)

    for message in queue.receive_messages(MaxNumberOfMessages=message_size, MessageAttributeNames=['All'], WaitTimeSeconds=0):
        messages.append(literal_eval(message.body))
        message.delete()
    return messages

def split_row(array, nrows, ncols):
    """
    Return an array of shape (n, nrows, ncols) where
    n * nrows * ncols = arr.size

    If arr is a 2D array, the returned array should look like n subblocks with
    each subblock preserving the "physical" layout of arr.
    """
    h, w = array.shape
    assert h % nrows == 0, f"{h} rows is not evenly divisible by {nrows}"
    assert w % ncols == 0, f"{w} cols is not evenly divisible by {ncols}"
    return (array.reshape(h//nrows, nrows, -1, ncols)
               .swapaxes(1,2)
               .reshape(-1, nrows, ncols))

def split_col(array, nrows, ncols):
    """Split a matrix into sub-matrices."""
    r, h = array.shape
    return [np.vsplit(i, 5) for i in np.hsplit(arr1, r)]

def generate_array(nrows, ncols):
    arr = np.random.randint(5, size=(nrows, ncols))
    # # print('arr 1:\n', arr)
    # arr = split_row(arr, 1, split_size)
    arr1 = np.random.randint(5, size=(nrows, ncols))
    # print('arr 2:\n', arr1)
    # arr1 = split_col(arr1, 1, split_size)

    return arr, arr1

def upload_file_to_s3(file_name, bucket, object_name=None):
    """Upload a file to an S3 bucket

    :param file_name: File to upload
    :param bucket: Bucket to upload to
    :param object_name: S3 object name. If not specified then file_name is used
    :return: True if file was uploaded, else False
    """

    # If S3 object_name was not specified, use file_name
    if object_name is None:
        object_name = os.path.basename(file_name)

    # Upload the file
    s3_client = boto3.client('s3')
    try:
        response = s3_client.upload_file(file_name, bucket, object_name)
    except:
        # logging.error(e)
        return False
    return True

def bulk_upload(scp, filepaths: list[str], remote_path, host):
        """
        Upload multiple files to a remote directory.

        :param List[str] filepaths: List of local files to be uploaded.
        """
        try:
            scp.put(
                filepaths,
                remote_path=remote_path,
                recursive=True
            )
            print(f"Finished uploading {len(filepaths)} files to {remote_path} on {host}")
        except SCPException as e:
            print(f"SCPException during bulk upload: {e}")
        except Exception as e:
            print(f"Unexpected exception during bulk upload: {e}")

def configure_aws_access_for_ssh(ssh, ip_address):
    """
    This function extracts the AWS configuration you have locally and push to the server
    :param ssh:ssh object
    :return:
    """
    output = subprocess.getoutput("cat ~/.aws/credentials")
    ssh.exec_command(f'mkdir ~/.aws && touch ~/.aws/credentials')
    ssh.exec_command(f"echo '{output}' > ~/.aws/credentials")
    print(f'SSH AWS configuration done for {ip_address}')

def matrix_dot_product(matrix_a, matrix_b):
    start_time = datetime.datetime.now()
    result = []
    for i in range(len(matrix_a)):
        row = []
        for j in range(len(matrix_b[0])):
            sum = 0
            for k in range(len(matrix_b)):
                sum += matrix_a[i][k] * matrix_b[k][j]
            row.append(sum)
        result.append(row)
    print('Computation time', datetime.datetime.now() - start_time)

    return result

def matrix_add(matrix_1, matrix_2):
    start_time = datetime.datetime.now()
    result = []
    for idx_row in range(0, len(matrix_1)):
        row = matrix_1[idx_row]
        row1 = matrix_2[idx_row]
        cols = []
        for idx_col in range(0, len(row)):
            cols.append(row[idx_col] + row1[idx_col])
        result.append(cols)
    print('Computation time', datetime.datetime.now() - start_time)
    return result

In [89]:
def delete_queues(sqs):
    #get all the queues
    client = boto3.client('sqs')
    response = client.list_queues(
    MaxResults=123)

    for queue in response['QueueUrls']:
        client.delete_queue(QueueUrl= queue)
        print(f'{queue} deleted!')

# teardown(client)

In [90]:
QUEUE_NAME = 'queue'

In [91]:
get_queue(sqs, QUEUE_NAME)

In [96]:
initialise_instances(client)

Conencting to Instance-0 with IP Address 54.91.68.78
SSH into the instance: 54.91.68.78
Installing required packages for Instance-0 with IP Address 54.91.68.78
Updating Subscription Management repositories.
Unable to read consumer identity

This system is not registered with an entitlement server. You can use subscription-manager to register.

Last metadata expiration check: 0:42:48 ago on Fri 16 Dec 2022 11:21:45 AM UTC.
Package python3-pip-21.2.3-6.el9.noarch is already installed.
Dependencies resolved.
Nothing to do.
Complete!


Configuring Instance -0 with IP Address 54.91.68.78 for remote access
SSH AWS configuration done for 54.91.68.78
Finished uploading 3 files to ~ on 54.91.68.78
Starting worker 0
Conencting to Instance-1 with IP Address 54.160.226.118
SSH into the instance: 54.160.226.118
Installing required packages for Instance-1 with IP Address 54.160.226.118
Updating Subscription Management repositories.
Unable to read consumer identity

This system is not registered with

### ADDITION

In [116]:
ARRAY_SIZE = 100
CHUNK_SIZE = 10 #int(ARRAY_SIZE/INSTANCE_SIZE)
arr, arr1 = generate_array(ARRAY_SIZE,ARRAY_SIZE)
s_arr = split_row(arr, CHUNK_SIZE, CHUNK_SIZE)
s_arr1 = split_row(arr1, CHUNK_SIZE, CHUNK_SIZE)

In [121]:
RES1 = np.array(matrix_add(arr, arr1))
np.array(RES1)

Computation time 0:00:00.004984


array([[4, 3, 7, 7, 6, 8, 7, 5, 6, 2, 2, 3, 6, 2, 7, 6, 8, 6, 2, 7, 7, 3,
        6, 4, 3, 5, 7, 6, 3, 0, 4, 4, 5, 4, 0, 3, 8, 3, 6, 1, 2, 4, 2, 7,
        0, 3, 1, 4, 4, 2, 4, 4, 6, 0, 5, 8, 5, 4, 2, 2, 7, 1, 3, 7, 6, 5,
        6, 2, 5, 4, 4, 4, 8, 5, 4, 5, 6, 6, 3, 0, 2, 4, 2, 6, 4, 4, 0, 5,
        1, 4, 4, 5, 8, 4, 3, 3, 4, 4, 6, 3],
       [1, 1, 4, 4, 6, 1, 4, 0, 4, 3, 4, 5, 3, 2, 3, 6, 4, 8, 1, 6, 2, 4,
        2, 5, 7, 3, 5, 5, 2, 2, 1, 4, 6, 7, 7, 3, 0, 5, 2, 5, 3, 2, 5, 5,
        0, 4, 3, 7, 1, 4, 3, 3, 5, 7, 5, 3, 4, 3, 3, 3, 1, 3, 7, 8, 1, 3,
        4, 3, 7, 4, 4, 8, 3, 5, 7, 3, 7, 7, 5, 6, 1, 2, 1, 3, 2, 0, 3, 4,
        4, 6, 4, 3, 1, 3, 4, 7, 3, 5, 7, 7],
       [7, 6, 2, 5, 6, 3, 2, 4, 2, 5, 5, 1, 3, 0, 1, 8, 2, 3, 0, 6, 4, 3,
        1, 6, 7, 3, 6, 4, 7, 3, 2, 4, 2, 4, 8, 3, 2, 6, 2, 3, 4, 1, 5, 5,
        2, 3, 6, 6, 4, 4, 7, 7, 1, 1, 2, 4, 5, 3, 3, 0, 3, 3, 2, 6, 1, 5,
        3, 5, 3, 3, 4, 4, 7, 3, 8, 0, 3, 4, 3, 1, 7, 0, 6, 4, 5, 5, 7, 6,
        6, 3, 4, 3, 5,

In [123]:
def reformat_data(data):
    return str(data).replace('\n', '')

operation = 'addition'

for id, dt in enumerate(np.array_split(np.arange(0, len(s_arr)), INSTANCE_SIZE)):
    # [print(f'queue{id}', s_arr[idx], s_arr1[idx]) for idx in range(min(dt), max(dt)+1) if len(dt)>0]
        [send_message_to_queue(sqs, f'queue{id}', [{"Id": f"{idx+1}", "MessageBody": str((operation, (idx, (reformat_data(s_arr[idx]), reformat_data(s_arr1[idx]))))) }])
              for idx in range(min(dt), max(dt)+1) if len(dt)>0]


In [124]:
temp_res = []
def remove_space(input_data):
    return input_data.replace('  ', ' ').replace('  ', ' ')

def format_data(input_data):
        return input_data.replace('  ', ' ').replace('  ', ',').replace('[ ', '[').replace(' ', ',')

for a,b in enumerate(np.array_split(np.arange(0, len(s_arr)), INSTANCE_SIZE)):
    compute_res = []
    for i in range(len(b)):
        res = get_messages_from_queue(INSTANCE_SIZE, f'result-queue-{a}', 1)
        [compute_res.append(msg) for msg in res]
    compute_res.sort()
    [temp_res.append(literal_eval(res))  for (index, res) in compute_res]

final_res = []
temp = []
for idx, data in enumerate(temp_res):
    temp.append(data)
    if idx % int(ARRAY_SIZE/CHUNK_SIZE) == int(ARRAY_SIZE/CHUNK_SIZE)-1:
        final_res.append(np.hstack([tuple(t) for t in temp]))
        temp = []
RES2 = np.concatenate(final_res)

In [125]:
RES1 == RES2

array([[ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,
         True],
       [ True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  True,  True,
         True,  True,  True,  True,  True,  True,  True,  Tru

In [19]:
#create instance
#configure instance
#create matrix
#split matrix
#send matrix to the queue
#read matrix from the queue on the instance created
#compute matrix
#send result to base


In [127]:
!git add .
!git commit -am "Completed Addition Workflow"
!git push --set-upstream origin main

[main fef693e] Completed Addition Workflow
 1 file changed, 11 insertions(+), 14 deletions(-)
Enumerating objects: 16, done.
Counting objects: 100% (16/16), done.
Delta compression using up to 8 threads
Compressing objects: 100% (10/10), done.
Writing objects: 100% (10/10), 15.92 KiB | 5.31 MiB/s, done.
Total 10 (delta 5), reused 0 (delta 0), pack-reused 0
remote: Resolving deltas: 100% (5/5), completed with 3 local objects.[K
To https://github.com/airscholar/MLCloudComputing-python.git
 + b487c8a...fef693e main -> main (forced update)
branch 'main' set up to track 'origin/main'.
