In [1]:
import os
import time
import json
import wget
import yaml
import urllib
import boto3
import base64
import logging
import asyncio
import paramiko
from utils import *
from constants import *
from scp import SCPClient
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from botocore.exceptions import NoCredentialsError, ClientError

executor = ThreadPoolExecutor()


In [2]:
instance_id_list = []
fmbench_config_map = []


In [3]:
def load_yaml_file(file_path):
    """
    Load and parse a YAML file.

    Args:
        file_path (str): The path to the YAML file to be read.

    Returns:
        dict: Parsed content of the YAML file as a dictionary.
    """
    with open(file_path, "r") as file:
        try:
            data = yaml.safe_load(file)
            return data
        except yaml.YAMLError as error:
            print(f"Error reading the YAML file: {error}")
            return None


In [4]:
config_data = load_yaml_file(yaml_file_path)


In [5]:
print(config_data)


{'aws': {'region': 'us-east-1', 'iam_instance_profile_arn': 'arn:aws:iam::471112568442:instance-profile/EC2'}, 'run_steps': {'security_group_creation': True, 'key_pair_generation': False, 'deploy_ec2_instance': True, 'run_bash_script': True, 'delete_ec2_instance': True}, 'security_group': {'group_name': 'ec2_multi_deploy', 'description': 'MultiDeploy EC2 Security Group', 'vpc_id': None}, 'key_pair_gen': {'key_pair_name': 'ec2_multi_deploy_kp1', 'key_pair_fpath': 'ec2_multi_deploy_kp1.pem'}, 'instances': [{'instance_type': 'g5.2xlarge', 'ami_id': 'ami-05c3e698bd0cffe7e', 'startup_script': 'startup_scripts/gpu_ubuntu_startup.txt', 'fmbench_config': 'https://raw.githubusercontent.com/dheerajoruganty/multi-deploy-ec2/refs/heads/main/configs/config-ec2-llama3-8b.yml'}]}


In [6]:
if config_data["run_steps"]["security_group_creation"]:
    GROUP_NAME = config_data["security_group"].get("group_name")
    DESCRIPTION = config_data["security_group"].get("description", " ")
    VPC_ID = config_data["security_group"].get("vpc_id", "")
    try:
        sg_id = create_security_group(GROUP_NAME, DESCRIPTION, VPC_ID)

        if sg_id:
            # Add inbound rules if security group was created successfully
            authorize_inbound_rules(sg_id)
    except ClientError as e:
        print(f"An error occurred while creating or getting the security group: {e}")


Security Group 'ec2_multi_deploy' already exists. Fetching existing security group ID.
Inbound rule already exists for Security Group sg-013c79f5ffe82fb52. Skipping...


In [7]:
if config_data["run_steps"]["key_pair_generation"]:
    PRIVATE_KEY_FNAME = config_data["key_pair_gen"]["key_pair_name"]
    private_key = create_key_pair(PRIVATE_KEY_FNAME)
elif config_data["run_steps"]["key_pair_generation"] == False:
    KEY_PAIR_NAME = config_data["key_pair_gen"]["key_pair_name"]
    PRIVATE_KEY_FNAME = config_data["key_pair_gen"]["key_pair_fpath"]
    try:
        with open(f"{PRIVATE_KEY_FNAME}", "r") as file:
            private_key = file.read()
    except FileNotFoundError:
        print(f"File not found: {PRIVATE_KEY_FNAME}")
    except IOError as e:
        print(f"Error reading file {PRIVATE_KEY_FNAME}: {e}")


In [8]:
for i in config_data["instances"]:
    print(i)


{'instance_type': 'g5.2xlarge', 'ami_id': 'ami-05c3e698bd0cffe7e', 'startup_script': 'startup_scripts/gpu_ubuntu_startup.txt', 'fmbench_config': 'https://raw.githubusercontent.com/dheerajoruganty/multi-deploy-ec2/refs/heads/main/configs/config-ec2-llama3-8b.yml'}


In [9]:
# if config_data["run_steps"]["deploy_ec2_instance"]:
#     iam_arn = config_data["aws"]["iam_instance_profile_arn"]
#     print(iam_arn)
#     # WIP Parallelize This.
#     for instance in config_data["instances"]:
#         instance_type = instance["instance_type"]
#         ami_id = instance["ami_id"]
#         startup_script = instance["startup_script"]
#         # command_to_run = instance["command_to_run"]
#         with open(f"{startup_script}", "r") as file:
#             user_data_script = file.read()
#         # user_data_script += command_to_run
#         # Create an EC2 instance with the user data script
#         instance_id = create_ec2_instance(
#             KEY_PAIR_NAME,
#             sg_id,
#             user_data_script,
#             ami_id,
#             instance_type,
#             iam_arn,
#         )
#         instance_id_list.append(instance_id)
#         fmbench_config_map.append({instance_id: instance["fmbench_config"]})


In [10]:
instance_id_list = ["i-03d3d545852de1106"]
fmbench_config_map.append(
    {
        "i-03d3d545852de1106": "https://raw.githubusercontent.com/dheerajoruganty/multi-deploy-ec2/refs/heads/main/configs/config-ec2-llama3-8b.yml"
    }
)


In [11]:
def check_completion_flag(
    hostname, username, key_file_path, flag_file_path="/tmp/startup_complete.flag"
):
    """
    Checks if the startup flag file exists on the EC2 instance.

    Args:
        hostname (str): The public IP or DNS of the EC2 instance.
        username (str): The SSH username (e.g., 'ubuntu').
        key_file_path (str): The path to the PEM key file.
        flag_file_path (str): The path to the startup flag file on the instance. Default is '/tmp/startup_complete.flag'.

    Returns:
        bool: True if the flag file exists, False otherwise.
    """
    try:
        # Initialize the SSH client
        ssh_client = paramiko.SSHClient()
        ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        # Load the private key
        private_key = paramiko.RSAKey.from_private_key_file(key_file_path)

        # Connect to the instance
        ssh_client.connect(hostname, username=username, pkey=private_key)
        print(f"Connected to {hostname} as {username}")

        # Check if the flag file exists
        stdin, stdout, stderr = ssh_client.exec_command(
            f"test -f {flag_file_path} && echo 'File exists'"
        )
        output = stdout.read().decode().strip()
        error = stderr.read().decode().strip()

        # Close the connection
        ssh_client.close()

        # Return True if the file exists, otherwise False
        return output == "File exists"

    except Exception as e:
        print(f"Error connecting via SSH to {hostname}: {e}")
        return False


In [12]:
def wait_for_flag(
    instance,
    max_wait_time=600,
    check_interval=30,
    flag_file_path="/tmp/startup_complete.flag",
):
    """
    Waits for the startup flag file on the EC2 instance, and returns the script if the flag file is found.

    Args:
        instance (dict): The dictionary containing instance details (hostname, username, key_file_path).
        formatted_script (str): The bash script content to be executed.
        remote_script_path (str): The remote path where the script should be saved on the instance.
        max_wait_time (int): Maximum wait time in seconds (default: 600 seconds or 10 minutes).
        check_interval (int): Interval time in seconds between checks (default: 30 seconds).
    """
    elapsed_time = 0
    while elapsed_time < max_wait_time:
        # Check if the startup flag exists on the instance
        startup_complete = check_completion_flag(
            hostname=instance["hostname"],
            username=instance["username"],
            key_file_path=instance["key_file_path"],
            flag_file_path=flag_file_path,
        )

        if startup_complete:
            return True

        # Wait for the specified check interval before trying again
        print(
            f"{flag_file_path} flag file not found. Checking again in {check_interval} seconds..."
        )
        time.sleep(check_interval)
        elapsed_time += check_interval

    return False


In [13]:
async def execute_fmbench(instance, formatted_script, remote_script_path):
    """
    Asynchronous wrapper for deploying an instance using synchronous functions.
    """
    # Check for the startup completion flag

    # Handle configuration file (download/upload) and get the remote path
    remote_config_path = await handle_config_file_async(instance)

    # Format the script with the remote config file path
    # Change this later to be a better implementation, right now it is bad.
    formatted_script = formatted_script.format(config_file=remote_config_path)

    startup_complete = await asyncio.get_event_loop().run_in_executor(
        executor, wait_for_flag, instance, 600, 30, "/tmp/startup_complete.flag"
    )

    if startup_complete:
        print("Startup Script complete, executing fmbench now")

        # Upload and execute the script on the instance
        script_output = await asyncio.get_event_loop().run_in_executor(
            executor,
            upload_and_execute_script_invoke_shell,
            instance["hostname"],
            instance["username"],
            instance["key_file_path"],
            formatted_script,
            remote_script_path,
        )
        print(f"Script Output from {instance['hostname']}:\n{script_output}")

        # Check for the fmbench completion flag
        fmbench_complete = await asyncio.get_event_loop().run_in_executor(
            executor, wait_for_flag, instance, 1200, 30, "/tmp/fmbench_completed.flag"
        )

        if fmbench_complete:
            print("Fmbench Run successful, Getting the folders now")
            await asyncio.get_event_loop().run_in_executor(
                executor, check_and_retrieve_results_folder, instance, "output"
            )


# Function to execute the deployment tasks in parallel
async def multi_deploy_fmbench(instance_details, bash_script, remote_script_path):
    tasks = []

    # Create a task for each instance
    for instance in instance_details:
        # Format the script with the specific config file
        formatted_script = bash_script

        # Create an async task for this instance
        tasks.append(execute_fmbench(instance, formatted_script, remote_script_path))

    # Run all tasks concurrently
    await asyncio.gather(*tasks)


In [14]:
async def main():
    await multi_deploy_fmbench(instance_details, bash_script, remote_script_path)


In [15]:
if config_data["run_steps"]["run_bash_script"]:
    instance_details = generate_instance_details(
        instance_id_list, PRIVATE_KEY_FNAME, fmbench_config_map, region="us-east-1"
    )  # Call the async function
    await main()


Config is a URL. Downloading from https://raw.githubusercontent.com/dheerajoruganty/multi-deploy-ec2/refs/heads/main/configs/config-ec2-llama3-8b.yml...
Connected to ec2-98-84-43-205.compute-1.amazonaws.com as ubuntu
Uploaded downloaded_configs/config-ec2-llama3-8b.yml to ec2-98-84-43-205.compute-1.amazonaws.com:/home/ubuntu/config-ec2-llama3-8b.yml
Connected to ec2-98-84-43-205.compute-1.amazonaws.com as ubuntu
Startup Script complete, executing fmbench now
Connected to ec2-98-84-43-205.compute-1.amazonaws.com as ubuntu
Script uploaded to /home/ubuntu/run_fmbench.sh
Script Output from ec2-98-84-43-205.compute-1.amazonaws.com:
Last login: Tue Sep 24 17:49:19 2024 from 69.251.233.174
(fmbench_python311) ubuntu@ip-172-31-1-191:~/foundation-model-benchmarking-tool$ chmod +x /home/ubuntu/run_fmbench.sh
(fmbench_python311) ubuntu@ip-172-31-1-191:~/foundation-model-benchmarking-tool$ bash -l -c '/home/ubuntu/run_fmbench.sh'

EnvironmentNameNotFound: Could not find conda environment: fmbench_

In [16]:
instance_details


NameError: name 'instance_details' is not defined

In [16]:
if config_data["run_steps"]["delete_ec2_instance"]:
    for instance_id in instance_id_list:
        delete_ec2_instance(instance_id)
    instance_id_list = []


Instance i-07d4207115911759e has been terminated.
