# Cloud Computing Assignment 2022-2023
Implementation of an application processing large data sets in parallel on a distributed Cloud environment (ie. AWS)

### Solution setup - Pre-requisites:
1. Make sure the aws credentials taken from the Learner Lab are updated in the ~/.aws/credentials file (Test connection locally using aws sts get-caller-identity)
2. Specify the "labsuser.pem" perm-key's (taken from the Learner Lab) path, needed by paramiko to connect to the EC2 instances and execute ssh commands.
3. Create EC2, S3 and SQS resources and clients using boto3.
### Solution setup steps (Using Boto3):
1. Create a cluster of EC2 instances on AWS, using the AWS Linux 2 images.
2. Create a S3 bucket to store the data.
3. Create a SQS queue to store stacks of messages.

### IMPORTS:

In [None]:
import sys
import boto3
import numpy as np
import paramiko
import time
import json

### CONFIGURATION:

In [None]:
# Permission key:
pem_key = 'learner-lab-cfg/labsuser.pem'
# Create an EC2 resource (higher level abstraction than a client):
ec2 = boto3.resource('ec2')
# Create a S3 resource:
s3 = boto3.resource('s3')
# Create a SQS resource:
sqs = boto3.resource('sqs')
# Create a SSM resource:
ssm_client = boto3.client('ssm')

### BOTO 3 - APP INTERFACE

In [None]:
# Get boto3 session credentials, successfully authenticated using updated local credentials:
def get_boto3_session_credentials():
    session = boto3.session.Session()
    credentials = session.get_credentials()
    return credentials

### SSM - APP INTERFACE

In [None]:
# EXECUTE SSH COMMAND ON INSTANCE USING SSM - BY INSTANCE ID:
def exec_SSH_on_instance_using_SSM(instance_id, command):
    response = ssm_client.send_command(
        InstanceIds=[instance_id],
        DocumentName='AWS-RunShellScript',
        Parameters={'commands': [command]}
    )
    return response

### SQS - APP INTERFACE

In [None]:
# CREATE SQS QUEUE:
def create_sqs_queue(queue_name):
    try:
        queue = sqs.create_queue(
            QueueName=queue_name,
            Attributes={
                'FifoQueue': 'true',
                'MessageRetentionPeriod': '86400',
                'ContentBasedDeduplication': 'true'
            }
        )
        return queue
    except Exception as e:
        print(e)
        return None

# CREATE MULTIPLE SQS QUEUES - AND WAIT FOR ALL TO BE CREATED:
def create_sqs_queues(queue_names):
    queues = []
    for i in range(len(queue_names)):
        try:
            queue = create_sqs_queue(queue_names[i])
            queues.append(queue)
            print(i+1,'/',len(queue_names),': Created queue: ', queue_names[i])
        except Exception as e:
            print(e)
            return None

# DELETE SQS QUEUE:
def delete_sqs_queue(queue_name):
    try:
        queue = sqs.get_queue_by_name(QueueName=queue_name)
        queue.delete()
        return True
    except Exception as e:
        print(e)
        return False

# GET ALL SQS QUEUES:
def get_all_sqs_queues():
    queues = []
    for queue in sqs.queues.all():
        queues.append(queue)
    return queues

# VIEW ALL SQS QUEUES:
def view_all_sqs_queues():
    queues = get_all_sqs_queues()
    for queue in queues:
        print(queue.url)

# GET QUEUE STATUS:
def get_queue_attributes(queue_name):
    queue_attributes = sqs.get_queue_by_name(QueueName=queue_name).attributes
    return queue_attributes

# SEND MESSAGE TO SQS QUEUE:
def send_message_to_sqs_queue(queue_name, message, group_id):
    try:
        queue = sqs.get_queue_by_name(QueueName=queue_name)
        response = queue.send_message(
            MessageBody=message,
            MessageGroupId=group_id
        )
        return response
    except Exception as e:
        print(e)
        return None

# BULK SEND MESSAGES TO SQS QUEUE, AND CONTROL RESPONSE:
def send_bulk_messages_to_sqs_queue(queue_name, messages, group_id):
    try:
        queue = sqs.get_queue_by_name(QueueName=queue_name)
    except Exception as e:
        print(e)
        return None
    for i in range(0, len(messages)):
        response = queue.send_message(
            MessageBody=messages[i],
            MessageGroupId=group_id
        )
        # print(response)
        if response['ResponseMetadata']['HTTPStatusCode'] != 200 or 'connection' in response['ResponseMetadata']['HTTPHeaders']:
            # print('ERROR: Failed to send message to SQS queue! Retrying now...')
            while response['ResponseMetadata']['HTTPStatusCode'] != 200 or 'connection' in response['ResponseMetadata']['HTTPHeaders']:
                response = queue.send_message(
                    MessageBody=messages[i],
                    MessageGroupId=group_id
                )
                # print(response)
                if response['ResponseMetadata']['HTTPStatusCode'] == 200 and 'connection' not in response['ResponseMetadata']['HTTPHeaders']:
                    break
        print('Step '+str(i+1)+'/'+str(len(messages))+' done.')

# GET LAST 10 MESSAGES FROM SQS QUEUE:
def get_last_ten_messages_from_sqs_queue(queue_name, flight_time):
    try:
        queue = sqs.get_queue_by_name(QueueName=queue_name)
        messages = queue.receive_messages(
            MaxNumberOfMessages=10,
            VisibilityTimeout=flight_time
        )
        return messages
    except Exception as e:
        print(e)
        return None

# GET LAST MESSAGE FROM SQS QUEUE:
def get_last_message_from_sqs_queue(queue_name, flight_time):
    try:
        queue = sqs.get_queue_by_name(QueueName=queue_name)
        messages = queue.receive_messages(
            MaxNumberOfMessages=1,
            VisibilityTimeout=flight_time
        )
        return messages
    except Exception as e:
        print(e)
        return None

# GET THE AMOUNT OF AVAILABLE MESSAGES IN SQS QUEUE:
def get_amount_of_available_messages_in_sqs_queue(queue_name):
    try:
        queue = sqs.get_queue_by_name(QueueName=queue_name)
        amount = queue.attributes['ApproximateNumberOfMessages']
        return int(amount)
    except Exception as e:
        print(e)
        return None

# GET THE AMOUNT OF IN-FLIGHT MESSAGES IN SQS QUEUE:
def get_amount_of_in_flight_messages_in_sqs_queue(queue_name):
    try:
        queue = sqs.get_queue_by_name(QueueName=queue_name)
        amount = queue.attributes['ApproximateNumberOfMessagesNotVisible']
        return int(amount)
    except Exception as e:
        print(e)
        return None

# DELETE MESSAGE FROM SQS QUEUE - BY RECEIPT HANDLE:
def delete_message_from_sqs_queue(queue_name, receipt_handle):
    try:
        queue = sqs.get_queue_by_name(QueueName=queue_name)
        response = queue.delete_messages(
            Entries=[
                {
                    'Id': '1',
                    'ReceiptHandle': receipt_handle
                }
            ]
        )
        return response
    except Exception as e:
        print(e)
        return None

# GET MAX MESSAGE SIZE FROM SQS QUEUE - IN BYTES:
def get_max_message_size_from_sqs_queue(queue_name):
    try:
        queue = sqs.get_queue_by_name(QueueName=queue_name)
        attributes = queue.attributes
        return int(attributes['MaximumMessageSize'])
    except Exception as e:
        print(e)
        return None

# PURGE SQS QUEUE:
def purge_queue(queue_name):
    try:
        queue = sqs.get_queue_by_name(QueueName=queue_name)
        queue.purge()
        return True
    except Exception as e:
        print(e)
        return False

### S3 - APP INTERFACE

In [None]:
# CREATE S3 BUCKET:
def create_s3_bucket(bucket_name):
    try:
        s3.create_bucket(
            Bucket=bucket_name,
            ObjectOwnership='BucketOwnerPreferred'
        )
    except Exception as e:
        print(e)
    return s3.Bucket(bucket_name)

# DELETE S3 BUCKET:
def delete_s3_bucket(bucket_name):
    try:
        bucket = s3.Bucket(bucket_name)
        bucket.objects.all().delete()
        bucket.delete()
    except Exception as e:
        print(e)

# PRINT S3 BUCKET DETAILS:
def print_s3_bucket_details(bucket):
    print("{name=%s, creation_date=%s}" % (bucket.name, bucket.creation_date))

# VIEW ALL S3 BUCKETS:
def view_all_s3_buckets():
    for bucket in s3.buckets.all():
        print_s3_bucket_details(bucket)

# UPLOAD LOCAL FILE TO S3 BUCKET:
def upload_local_file_to_s3(filename, bucketname, destination_path):
    s3.Bucket(bucketname).upload_file(filename, destination_path+filename)

# DELETE FILE FROM S3 BUCKET:
def delete_file_from_s3(filename, bucketname, path):
    s3.Bucket(bucketname).Object(path+filename).delete()

# DELETE DIRECTORY FROM S3 BUCKET:
def delete_directory_from_s3_bucket(bucket_name, directory_name):
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)
    bucket.objects.filter(Prefix=directory_name).delete()

### EC2 - APP INTERFACE

In [None]:
# START ALL EC2 INSTANCES:
def start_all_instances():
    # Start all instances:
    for instance in ec2.instances.filter(Filters=[{'Name': 'instance-state-name', 'Values': ['stopped']}]):
        instance.start()
        print('Starting instance: ', instance.id, instance.tags[0]['Value'])

# STOP EC2 INSTANCE - BY NAME:
def stop_instance_by_name(instance_name):
    # Stop instance by name:
    for instance in ec2.instances.filter(Filters=[{'Name': 'tag:Name', 'Values': [instance_name], 'Name': 'instance-state-name', 'Values': ['running']}]):
        instance.stop()
        print('Stopping instance: ', instance.id, instance.tags[0]['Value'])

# STOP ALL EC2 INSTANCES:
def stop_all_instances():
    # Stop all instances:
    for instance in ec2.instances.filter(Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]):
        instance.stop()
        print('Stopping instance: ', instance.id, instance.tags[0]['Value'])

# PRINT EC2 INSTANCE DETAILS:
def print_instance_details(instance):
    print("{id=%s, name=%s, state=%s, type=%s}" % (instance.id, instance.tags[0]['Value'], instance.state['Name'], instance.instance_type))

# VIEW ALL EC2 INSTANCES:
def view_all_instances(include_terminated):
    # Print instance ID, name, state, and type:
    for instance in ec2.instances.all():
        if include_terminated or instance.state['Name'] != 'terminated':
            print_instance_details(instance)

# VIEW ALL EC2 INSTANCES - BY STATE FILTER:
def view_instances_by_state(state):
    # Print instance ID, name, state, and type; who are running
    for instance in ec2.instances.filter(Filters=[{'Name': 'instance-state-name', 'Values': [state]}]):
        print_instance_details(instance)

# TERMINATE ALL EC2 INSTANCES:
def terminate_all_instances():
    # Terminate all instances:
    for instance in ec2.instances.all():
        if instance.state['Name'] != 'terminated':
            instance.terminate()
            print('Terminated instance: ', instance.id, instance.tags[0]['Value'])

# TERMINATE EC2 INSTANCE - BY NAME:
def terminate_instance_by_name(name):
    for instance in ec2.instances.all():
        if instance.tags[0]['Value'] == name and instance.state['Name'] != 'terminated':
            instance.terminate()
            print('Terminated instance: ', instance.id, instance.tags[0]['Value'])

# CREATE INSTANCE - BY NAME, USING "amazon linux 2" AMI, "t2.micro" INSTANCE TYPE, "vockey" KEY PAIR, AND "default" SECURITY GROUP:
def create_instance(name):
    ec2.create_instances(
        ImageId='ami-0b0dcb5067f052a63',
        MinCount=1,
        MaxCount=1,
        InstanceType='t2.micro',
        KeyName='vockey',
        SecurityGroupIds=['sg-0f52fa9fe5477133b'],
        TagSpecifications=[
            {
                'ResourceType': 'instance',
                'Tags': [
                    {
                        'Key': 'Name',
                        'Value': name
                    },
                ]
            },
        ],
        IamInstanceProfile={
                       'Arn': 'arn:aws:iam::868429207081:instance-profile/LabInstanceProfile'
                   },
    )

# CREATE MULTIPLES INSTANCES - BY NAME, USING "amazon linux 2" AMI, "t2.micro" INSTANCE TYPE, "vockey" KEY PAIR, AND "default" SECURITY GROUP:
def create_instances_and_wait_for_running(names):
    for name in names:
        create_instance(name)
    print('All instances created. Waiting for them to be running...')
    # Filter for instances with the given names:
    instances = ec2.instances.filter(Filters=[{'Name': 'tag:Name', 'Values': names}, {'Name': 'instance-state-name', 'Values': ['pending']}])
    amount = 0
    while len(list(instances)) > 0:
        if amount != len(list(instances)):
            print('Remaining instances: ', len(list(instances)))
            amount = len(list(instances))
        time.sleep(1)
        instances = ec2.instances.filter(Filters=[{'Name': 'tag:Name', 'Values': names}, {'Name': 'instance-state-name', 'Values': ['pending']}])
    print('All instances are running.')

# UPDATE INSTANCE AWS CREDENTIALS:
def update_instance_credentials_using_boto3_session_credentials(instance_name):
    exec_SSH_on_instance(instance_name, 'aws configure set aws_access_key_id '+get_boto3_session_credentials().access_key)
    exec_SSH_on_instance(instance_name, 'aws configure set aws_secret_access_key '+get_boto3_session_credentials().secret_key)
    exec_SSH_on_instance(instance_name, 'aws configure set aws_session_token '+get_boto3_session_credentials().token)
    exec_SSH_on_instance(instance_name, 'aws configure set region us-east-1')

# GET INSTANCE ID - BY NAME:
def get_instance_id_by_name(name):
    for instance in ec2.instances.all():
        if instance.tags[0]['Value'] == name and instance.state['Name'] != 'terminated':
            return instance.id

# GET INSTANCE PUBLIC IP - BY NAME:
def get_instance_public_dns_by_name(name):
    for instance in ec2.instances.all():
        if instance.tags[0]['Value'] == name and instance.state['Name'] != 'terminated':
            return instance.public_dns_name

# GET INSTANCE PUBLIC IP - BY NAME:
def get_instance_public_ip_by_name(name):
    for instance in ec2.instances.all():
        if instance.tags[0]['Value'] == name and instance.state['Name'] != 'terminated':
            return instance.public_ip_address

# GET INSTANCE PUBLIC IP - BY ID:
def get_instance_public_dns_by_id(instance_id):
    for instance in ec2.instances.all():
        if instance.id == instance_id:
            return instance.public_dns_name

# EXECUTE SSH COMMAND ON INSTANCE - BY NAME:
def exec_SSH_on_instance(instance_name, command):
    paramiko_key = paramiko.RSAKey.from_private_key_file(pem_key)
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        client.connect(hostname=get_instance_public_dns_by_id(get_instance_id_by_name(instance_name)), username='ec2-user', pkey=paramiko_key)
        stdin, stdout, stderr = client.exec_command(command)
        return stdout.read(), stderr.read()
    except Exception as e:
        return e

### SPARK - APP INTERFACE

In [None]:
# WORK IN PROGRESS ...

In [None]:
# from pyspark.sql import SparkSession
# name = "master"
#
# # Create a SparkSession:
# # TODO: WIP
# spark = SparkSession.builder.master("spark://" + get_instance_public_ip_by_name(name) + ":7077").getOrCreate()
# sc = spark.sparkContext
# sc.setLogLevel("OFF")
# sc.uiWebUrl
#
# spark.stop()

### MATRIX - FUNCTIONS

In [None]:
# Create a matrix of nxn size:
def create_random_square_matrix(n):
    return np.random.randint(0, 10, size=(n, n))

# Split the matrix in blocks until the size of combined blocks is less than max_message_size:
def find_optimal_blocks_amount(matrix, max_message_size):
    matrix_body = {
        'i': 1,
        'j': 1,
        'temp-slice': matrix.tolist()
    }
    matrix_size = len(json.dumps(matrix_body))
    min_blocks_amount = 1
    for i in range(2, 10000):
        block_size = matrix_size / i**2
        amount_of_blocks_per_message = 2*i
        message_size = block_size * amount_of_blocks_per_message
        if message_size < max_message_size and matrix.shape[0] % i < np.round(matrix.shape[0] / i):
            min_blocks_amount = i**2
            break
    return min_blocks_amount

# Split matrix into blocks:
def split_matrix_in_blocks(matrix, amount_of_blocks):
    side_size = np.sqrt(amount_of_blocks)
    if not side_size.is_integer():
        raise ValueError('ERROR: Amount of blocks must be a square number!')
    else:
        if matrix.shape[0] % side_size != 0:
            pad = np.floor(matrix.shape[0] / side_size)
            sub_matrices = np.empty((int(side_size)+1, int(side_size)+1), dtype=np.ndarray)
            for i in range(0, sub_matrices.shape[0]-1):
                for j in range(0, sub_matrices.shape[0]-1):
                    sub_matrices[i][j] = matrix[int(i * pad):int((i + 1) * pad), int(j * pad):int((j + 1) * pad)]
            for i in range(0, sub_matrices.shape[0]-1):
                sub_matrices[i][sub_matrices.shape[0]-1] = matrix[int(i * pad):int((i + 1) * pad), int((sub_matrices.shape[0]-1) * pad):]
            for j in range(0, sub_matrices.shape[0]-1):
                sub_matrices[sub_matrices.shape[0]-1][j] = matrix[int((sub_matrices.shape[0]-1) * pad):, int(j * pad):int((j + 1) * pad)]
            sub_matrices[sub_matrices.shape[0]-1][sub_matrices.shape[0]-1] = matrix[int((sub_matrices.shape[0]-1) * pad):, int((sub_matrices.shape[0]-1) * pad):]
        else:
            sub_matrices = np.empty((int(side_size), int(side_size)), dtype=np.ndarray)
            pad = matrix.shape[0] / side_size
            for i in range(0, sub_matrices.shape[0]):
                for j in range(0, sub_matrices.shape[0]):
                    sub_matrices[i][j] = matrix[int(i * pad):int((i + 1) * pad), int(j * pad):int((j + 1) * pad)]
    return sub_matrices

# Compute a single block:
def compute_single_block(A, B, i, j, size):
    block = np.dot(
        np.concatenate([A[i][k] for k in range(size)], axis=1),
        np.concatenate([B[k][j] for k in range(size)], axis=0)
    )
    return block

# Multiply two matrices:
def multiply_matrices(matrix1, matrix2, amount_of_blocks):
    # Split matrices into blocks:
    A = split_matrix_in_blocks(matrix1, amount_of_blocks)
    B = split_matrix_in_blocks(matrix2, amount_of_blocks)
    result = np.empty((A.shape[0], B.shape[1]), dtype=np.ndarray)
    size = A.shape[0]
    for i in range(0, result.shape[0]):
        for j in range(0, result.shape[1]):
            result[i][j] = compute_single_block(A, B, i, j, size)
    result = np.concatenate([np.concatenate([result[i][j] for j in range(size)], axis=1) for i in range(size)], axis=0)
    return result

## AWS - SOLUTION SETUP AND TASKS EXECUTION:

In [None]:
# SETTINGS=
worker_amount = 2

In [None]:
# EC2 INSTANCES:
# instances_names = np.concatenate((np.array(['master']), np.array(['worker' + str(i) for i in range(1, worker_amount+1)]))).tolist()
# create_instances_and_wait_for_running(instances_names)
# SQS QUEUES:
queues_names = ['main-protected-jobss.fifo', 'main-protected-resultss.fifo']
create_sqs_queues(queues_names)

In [None]:
view_all_instances(False)
view_all_sqs_queues()

In [None]:
get_amount_of_in_flight_messages_in_sqs_queue(queues_names[0])

In [None]:
# CREATE A MATRIX - GIVING THE SIDE SIZE:
matrix_shape = 500
matrix = create_random_square_matrix(matrix_shape)

# SPLIT MATRIX INTO BLOCKS - USING OPTIMAL BLOCKS AMOUNT:
max_SQS_msg_size = get_max_message_size_from_sqs_queue(queues_names[0])
blocks = split_matrix_in_blocks(matrix, find_optimal_blocks_amount(matrix, max_SQS_msg_size))

# STORE IN AN ARRAY THE SLICES REQUIRED TO COMPUTE EACH BLOCK OF THE RESULT MATRIX:
slices = np.empty((blocks.shape[0], blocks.shape[1], 3), dtype=np.ndarray)
for i in range(0, blocks.shape[0]):
    for j in range(0, blocks.shape[1]):
        slices[i][j][0] = np.concatenate([blocks[i][k] for k in range(blocks.shape[0])], axis=1)
        slices[i][j][1] = np.concatenate([blocks[k][j] for k in range(blocks.shape[0])], axis=0)

# CREATE A LIST OF MESSAGES TO SEND TO SQS QUEUE:
messages = []
for i in range(0, blocks.shape[0]):
    for j in range(0, blocks.shape[1]):
        message_body = {
            'i': i,
            'j': j,
            'left-slice': slices[i][j][0].tolist(),
            'right-slice': slices[i][j][1].tolist()
        }
        json_message_body = json.dumps(message_body)
        messages.append(json_message_body)

# BULK SEND MESSAGES TO THE "JOBS" QUEUE:
send_bulk_messages_to_sqs_queue(queues_names[0], messages, 'work')

In [None]:
def gather_jobs_then_compute_and_send_results(jobs_queue_name, results_queue_name):
    while get_amount_of_in_flight_messages_in_sqs_queue(jobs_queue_name) > 0 or get_amount_of_available_messages_in_sqs_queue(jobs_queue_name) > 0:
        messages = get_last_ten_messages_from_sqs_queue(jobs_queue_name, 300)
        result_messages = []
        for message in messages:
            message_body = json.loads(message.body)
            left_slice = np.array(message_body['left-slice'])
            right_slice = np.array(message_body['right-slice'])
            result = np.dot(left_slice, right_slice)
            result_message_body = {
                'i': message_body['i'],
                'j': message_body['j'],
                'result': result.tolist()
            }
            json_result_message_body = json.dumps(result_message_body)
            result_messages.append(json_result_message_body)
        try:
            send_bulk_messages_to_sqs_queue(results_queue_name, result_messages, 'result')
            for message in messages:
                delete_message_from_sqs_queue(jobs_queue_name, message.receipt_handle)
        except:
            print('Error while sending messages to the results queue.')
            break
    print('The jobs queue is empty. All jobs have geen gathered and sent to the results queue.')

gather_jobs_then_compute_and_send_results(queues_names[0], queues_names[1])

In [None]:
def gather_results_and_reconstruct_matrix(results_queue_name, result_matrix_shape):
    result_matrix = np.empty((result_matrix_shape, result_matrix_shape), dtype=np.ndarray)
    while get_amount_of_available_messages_in_sqs_queue(results_queue_name) > 0 or get_amount_of_in_flight_messages_in_sqs_queue(results_queue_name) > 0:
        messages = get_last_ten_messages_from_sqs_queue(results_queue_name, 300)
        for message in messages:
            message_body = json.loads(message.body)
            result_matrix[message_body['i']][message_body['j']] = np.array(message_body['result'])
            delete_message_from_sqs_queue(results_queue_name, message.receipt_handle)
    result_matrix = np.concatenate([np.concatenate([result_matrix[i][j] for j in range(result_matrix_shape)], axis=1) for i in range(result_matrix_shape)], axis=0)
    print('The results queue is empty. All results have been gathered and the result matrix has been reconstructed.')
    return result_matrix

result_matrix = gather_results_and_reconstruct_matrix(queues_names[1], blocks.shape[0])

In [None]:
print('The result matrix is equal to the self computed product: ' + str(np.array_equal(result_matrix, np.dot(matrix, matrix))))

In [None]:
# CLEAN-UP:
for queue_name in queues_names:
    purge_queue(queue_name)
    delete_sqs_queue(queue_name)
# for instances_name in instances_names:
#     stop_instance_by_name(instances_name)
#     terminate_instance_by_name(instances_name)