In [94]:
!python --version


Python 3.10.16


In [95]:
import boto3
import paramiko
import os
import time

In [96]:
AWS_REGION = "us-east-1"
CLUSTER_NAME = "LSC-cluster"
INSTANCE_TYPE = "t2.medium"
KEY_NAME = "my-key-pair"
SSH_KEY_PATH = "./labsuser.pem"
S3_BUCKET = "LSC-bucket"
PROJECT_FOLDER = "./"
REMOTE_FOLDER = "./LSC_proj/"

# Boto3 clients
ec2_client = boto3.client("ec2", region_name=AWS_REGION)
emr_client = boto3.client("emr", region_name=AWS_REGION)
s3_client = boto3.client("s3", region_name=AWS_REGION)

In [97]:
def get_security_group_id(group_name):
    response = ec2_client.describe_security_groups(Filters=[{"Name": "group-name", "Values": [group_name]}])
    security_groups = response["SecurityGroups"]
    if security_groups:
        return security_groups[0]["GroupId"]
    return None


In [98]:
def create_security_group(group_name="EC2ClusterSecurityGroup"):
    existing_group_id = get_security_group_id(group_name)
    if existing_group_id:
        print(f"Security group '{group_name}' already exists. Reusing Group ID: {existing_group_id}")
        return existing_group_id

    # Create a new security group if it doesn't exist
    response = ec2_client.create_security_group(
        GroupName=group_name,
        Description="Security group for EC2 cluster",
    )
    security_group_id = response["GroupId"]
    print(f"Created security group '{group_name}' with ID: {security_group_id}")

    # Add inbound rules to allow SSH
    ec2_client.authorize_security_group_ingress(
        GroupId=security_group_id,
        IpPermissions=[
            {
                "IpProtocol": "tcp",
                "FromPort": 22,
                "ToPort": 22,
                "IpRanges": [{"CidrIp": "0.0.0.0/0"}],  # Open to all (you may restrict this)
            }
        ],
    )
    return security_group_id


In [99]:
def terminate_instances(instance_ids):
    print("Terminating EC2 instances...")
    ec2_client.terminate_instances(InstanceIds=instance_ids)
    waiter = ec2_client.get_waiter("instance_terminated")
    waiter.wait(InstanceIds=instance_ids)
    print("All instances terminated.")


In [100]:
def delete_security_group(group_name="EC2ClusterSecurityGroup"):
    security_group_id = get_security_group_id(group_name)
    if security_group_id:
        print(f"Deleting security group '{group_name}' with ID: {security_group_id}")
        ec2_client.delete_security_group(GroupId=security_group_id)
        print("Security group deleted.")


In [101]:
def cleanup_resources(instance_ids, group_name="EC2ClusterSecurityGroup"):
    if instance_ids:
        terminate_instances(instance_ids)
    delete_security_group(group_name)

In [102]:
def create_ec2_cluster(instance_type, security_group_id, instance_count=1, key_name="MyKeyPair", volume_size=50):
    """
    Create an EC2 cluster with specified instance type, security group, and volume size.
    :param instance_type: EC2 instance type (e.g., "t2.micro", "m5.large").
    :param security_group_id: Security group ID for the instances.
    :param instance_count: Number of instances to launch.
    :param key_name: Name of the SSH key pair for the instances.
    :param volume_size: Size of the root volume in GB.
    :return: List of instance IDs.
    """
    instances = ec2_client.run_instances(
        ImageId="ami-0c02fb55956c7d316",  # Amazon Linux 2 AMI
        InstanceType=instance_type,
        MinCount=instance_count,
        MaxCount=instance_count,
        KeyName=key_name,
        SecurityGroupIds=[security_group_id],
        BlockDeviceMappings=[
            {
                "DeviceName": "/dev/xvda",  # Root volume
                "Ebs": {
                    "VolumeSize": volume_size,  # Size in GB
                    "VolumeType": "gp2",  # General Purpose SSD
                    "DeleteOnTermination": True,  # Automatically delete on instance termination
                },
            }
        ],
        TagSpecifications=[
            {
                "ResourceType": "instance",
                "Tags": [{"Key": "Name", "Value": CLUSTER_NAME}],
            }
        ],
    )

    instance_ids = [instance["InstanceId"] for instance in instances["Instances"]]
    print(f"Cluster created with instances: {instance_ids}")
    return instance_ids


In [103]:
def run_command(command, ssh_client):
    stdin, stdout, stderr = ssh_client.exec_command(command)
    print(f"Running command: {command}")
    print(stdout.read().decode())  # Output of the command
    print(stderr.read().decode())  # Errors, if any

In [104]:
def wait_for_instances(instance_ids):
    print("Waiting for instances to reach 'running' state...")
    waiter = ec2_client.get_waiter("instance_running")
    waiter.wait(InstanceIds=instance_ids)
    print("All instances are running.")

    print("Waiting for instances to pass status checks...")
    while True:
        response = ec2_client.describe_instance_status(InstanceIds=instance_ids)
        statuses = [
            status["InstanceStatus"]["Status"] == "ok" and status["SystemStatus"]["Status"] == "ok"
            for status in response["InstanceStatuses"]
        ]
        if all(statuses) and len(statuses) == len(instance_ids):
            break
        print("Instances are not fully initialized. Retrying in 10 seconds...")
        time.sleep(10)
    print("All instances passed status checks.")

    # Fetch public DNS names
    instances = ec2_client.describe_instances(InstanceIds=instance_ids)
    public_dns = [
        instance["PublicDnsName"]
        for reservation in instances["Reservations"]
        for instance in reservation["Instances"]
    ]
    return public_dns[0]


In [105]:
from paramiko import SSHClient, AutoAddPolicy

def install_dependencies_on_instances(ssh_client):
    commands = [
        "sudo yum update -y",
        "sudo yum install -y git docker docker-compose-plugin",  # Install Python 3.10 and Git
        "sudo systemctl start docker",  # Start Docker
        "sudo systemctl enable docker",  # Enable Docker to start on boot
        "sudo usermod -aG docker ec2-user",  # Add ec2-user to Docker group
        "sudo curl -L https://github.com/docker/compose/releases/download/v2.2.3/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose",  # Install Docker Compose
        "sudo chmod +x /usr/local/bin/docker-compose",  # Make Docker Compose executable
        "sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose", # Make Docker Compose visible
        
        # Miniconda
        "mkdir -p ~/miniconda3 && wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ~/miniconda3/miniconda.sh && bash ~/miniconda3/miniconda.sh -b -u -p ~/miniconda3 && rm ~/miniconda3/miniconda.sh",
    ]

    for cmd in commands:
        run_command(cmd, ssh_client)



In [106]:
def clone_github_repo(repo_url, ssh_client, branch="main"):
    commands = [
        f"git clone -b {branch} {repo_url} {REMOTE_FOLDER}",
        f"source ~/miniconda3/bin/activate && conda create -n myenv python=3.10 -y && conda activate myenv && cd {REMOTE_FOLDER}CANCER && conda install pytorch torchvision torchaudio cpuonly -c pytorch -y && pip3 install --no-input --no-cache-dir -r requirments.txt",
    ]

    for cmd in commands:
        run_command(cmd, ssh_client)
    

In [107]:
f"source ~/miniconda3/bin/activate && conda create -n myenv python=3.10 -y && conda activate myenv && cd {REMOTE_FOLDER}CANCER && conda install pytorch torchvision torchaudio cpuonly -c pytorch -y && pip3 install --no-input --no-cache-dir -r requirments.txt"

'source ~/miniconda3/bin/activate && conda create -n myenv python=3.10 -y && conda activate myenv && cd ./LSC_proj/CANCER && conda install pytorch torchvision torchaudio cpuonly -c pytorch -y && pip3 install --no-input --no-cache-dir -r requirments.txt'

In [108]:
def run_app_on_instances(ssh_client):
    commands = [
        f"cd {REMOTE_FOLDER}CANCER && docker-compose up -d",  # Start Docker Compose in detached mode
        #f"cd {REMOTE_FOLDER}CANCER && source ~/miniconda3/bin/activate && conda activate myenv && nohup python3 producer.py > producer.log 2>&1 &",  # Run producer in the background
        #f"cd {REMOTE_FOLDER}CANCER && source ~/miniconda3/bin/activate && conda activate myenv && nohup python3 consumer.py > consumer.log 2>&1 &",  # Run consumer in the background
    ]
    
    for cmd in commands:
        run_command(cmd, ssh_client)

In [109]:
try:
    security_group_id = create_security_group()
    print(f"security_group_id: {security_group_id}")
    instance_ids = create_ec2_cluster(INSTANCE_TYPE, security_group_id)
    print(f"instance_ids: {instance_ids}")
    public_dns = wait_for_instances(instance_ids)
    print(f"public_dns: {public_dns}")
    
    ssh_client = SSHClient()
    ssh_client.set_missing_host_key_policy(AutoAddPolicy())

    ssh_client.connect(
        hostname=public_dns,
        username="ec2-user",
        key_filename="MyKeyPair.pem"
    )
    
    install_dependencies_on_instances(ssh_client)

    repo_url = "https://github.com/Dodek69/Online-Learning-on-AWS.git"
    clone_github_repo(repo_url, ssh_client)
    
    
    ssh_client = SSHClient()
    ssh_client.set_missing_host_key_policy(AutoAddPolicy())

    ssh_client.connect(
        hostname=public_dns,
        username="ec2-user",
        key_filename="MyKeyPair.pem"
    )

    run_app_on_instances(ssh_client)

except Exception as e:
    print(f"Error encountered: {e}")
    #cleanup_resources(instance_ids, security_group_id)
    raise

Security group 'EC2ClusterSecurityGroup' already exists. Reusing Group ID: sg-01a07e2ce95e9a8c4
security_group_id: sg-01a07e2ce95e9a8c4
Cluster created with instances: ['i-04a611b94c245990d']
instance_ids: ['i-04a611b94c245990d']
Waiting for instances to reach 'running' state...
All instances are running.
Waiting for instances to pass status checks...
Instances are not fully initialized. Retrying in 10 seconds...
Instances are not fully initialized. Retrying in 10 seconds...
Instances are not fully initialized. Retrying in 10 seconds...
Instances are not fully initialized. Retrying in 10 seconds...
Instances are not fully initialized. Retrying in 10 seconds...
Instances are not fully initialized. Retrying in 10 seconds...
Instances are not fully initialized. Retrying in 10 seconds...
Instances are not fully initialized. Retrying in 10 seconds...
Instances are not fully initialized. Retrying in 10 seconds...
Instances are not fully initialized. Retrying in 10 seconds...
Instances are no