In [118]:
# create a Keypair for ec2 Instance
import boto3
import os
from botocore.exceptions import ClientError
from rich import print as rprint

def create_ec2_key_pair(key_name, file_name):
    ec2 = boto3.client('ec2')

    try:
        # Check if the key pair already exists
        ec2.describe_key_pairs(KeyNames=[key_name])
        rprint(f"[yellow]Key pair '{key_name}' already exists.[/yellow]")
    except ClientError as e:
        if e.response['Error']['Code'] == 'InvalidKeyPair.NotFound':
            try:
                # Create a key pair
                key_pair = ec2.create_key_pair(KeyName=key_name)

                # Save the private key to a file
                with open(file_name, 'w') as file:
                    file.write(key_pair['KeyMaterial'])

                rprint(f"[green]Key pair '{key_name}' created and saved to '{file_name}'[/green]")
            except Exception as create_error:
                rprint(f"[red]An error occurred while creating the key pair: {create_error}[/red]")
        else:
            rprint(f"[red]An error occurred: {e}[/red]")

if __name__ == "__main__":
    key_name = "BastionKey"
    file_name = os.path.join(os.getcwd(), f"{key_name}.pem")

    create_ec2_key_pair(key_name, file_name)

In [137]:
# create security group for Bastion Host in VPC
import boto3
from botocore.exceptions import ClientError

def get_vpc_id(tag_name, tag_value):
    ec2 = boto3.client('ec2')
    vpcs = ec2.describe_vpcs(Filters=[{'Name': f'tag:{tag_name}', 'Values': [tag_value]}])
    return vpcs['Vpcs'][0]['VpcId']

def create_security_group(vpc_id, group_name, description, ingress_rules, tags):
    ec2 = boto3.client('ec2')
    try:
        response = ec2.create_security_group(
            GroupName=group_name,
            Description=description,
            VpcId=vpc_id
        )
        security_group_id = response['GroupId']
        print(f"Security Group '{group_name}' created with ID: {security_group_id}")

        # Authorize Ingress Rules
        for rule in ingress_rules:
            ec2.authorize_security_group_ingress(
                GroupId=security_group_id,
                IpPermissions=[rule]
            )

        # Tag the Security Group
        ec2.create_tags(Resources=[security_group_id], Tags=tags)

        return security_group_id
    except ClientError as e:
        if e.response['Error']['Code'] == 'InvalidGroup.Duplicate':
            print(f"Security Group '{group_name}' already exists.")
            # Retrieve existing security group ID
            response = ec2.describe_security_groups(Filters=[{'Name': 'group-name', 'Values': [group_name]}])
            return response['SecurityGroups'][0]['GroupId']
        else:
            raise e

def main():
    vpc_id = get_vpc_id('Name', 'MyVPC')
    group_name = 'SGBastion'
    description = 'Bastion host security group'
    ingress_rules = [
        {
            'IpProtocol': 'tcp',
            'FromPort': 22,
            'ToPort': 22,
            'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
        }
    ]
    tags = [
        {
            'Key': 'Name',
            'Value': 'SGBastion'
        }
    ]
    security_group_id = create_security_group(vpc_id, group_name, description, ingress_rules, tags)
    print(f"Security Group ID: {security_group_id}")

if __name__ == "__main__":
    main()

Security Group 'SGBastion' already exists.
Security Group ID: sg-0ad81ad39818e2710


In [138]:
import boto3
from rich import print as rprint

#get VPC ID
vpc_name = 'MyVPC'

ec2 = boto3.client('ec2')
response = ec2.describe_vpcs(
    Filters=[
        {'Name': 'tag:Name', 'Values': [vpc_name]}
    ]
)

vpcs = response['Vpcs'][0]['VpcId']
rprint(vpcs)

In [139]:
ec2 = boto3.client('ec2')

vpc_id = vpc_id
subnet_name = 'public_subnet_1'
availability_zone = 'eu-west-1a'

response = ec2.describe_subnets(
    Filters=[
        {'Name': 'vpc-id', 'Values': [vpc_id]},
        {'Name': 'availability-zone', 'Values': [availability_zone]},
        {'Name': 'tag:Name', 'Values': [subnet_name]}
    ]
)


subnets = response['Subnets'][0]['SubnetId']
if not subnets:
    raise ValueError(f"No private subnet found in {availability_zone} with the specified criteria.")

rprint(subnets)

In [140]:
ec2 = boto3.client('ec2')
security_group_name='SGBastion'


response = ec2.describe_security_groups(
    Filters=[
        {'Name': 'vpc-id', 'Values': [vpc_id]},
        {'Name': 'group-name', 'Values': [security_group_name]}
    ]
)

security_group_id = response['SecurityGroups'][0]['GroupId']
rprint(security_group_id)

In [141]:
import boto3
from botocore.exceptions import ClientError

def get_vpc_id(vpc_name):
    ec2 = boto3.client('ec2')
    response = ec2.describe_vpcs(Filters=[{'Name': 'tag:Name', 'Values': [vpc_name]}])
    if not response['Vpcs']:
        raise ValueError(f"No VPC found with the name {vpc_name}")
    return response['Vpcs'][0]['VpcId']

def get_subnet_id(vpc_id, subnet_name, availability_zone):
    ec2 = boto3.client('ec2')
    response = ec2.describe_subnets(
        Filters=[
            {'Name': 'vpc-id', 'Values': [vpc_id]},
            {'Name': 'availability-zone', 'Values': [availability_zone]},
            {'Name': 'tag:Name', 'Values': [subnet_name]}
        ]
    )
    if not response['Subnets']:
        raise ValueError(f"No subnet found in {availability_zone} with the name {subnet_name}")
    return response['Subnets'][0]['SubnetId']

def get_security_group_id(security_group_name, vpc_id):
    ec2 = boto3.client('ec2')
    response = ec2.describe_security_groups(
        Filters=[
            {'Name': 'vpc-id', 'Values': [vpc_id]},
            {'Name': 'group-name', 'Values': [security_group_name]}
        ]
    )
    return response['SecurityGroups'][0]['GroupId']

def get_instance_public_ip(tag_key, tag_value):
    ec2 = boto3.client('ec2')
    response = ec2.describe_instances(
        Filters=[
            {'Name': f'tag:{tag_key}', 'Values': [tag_value]},
            {'Name': 'instance-state-name', 'Values': ['pending', 'running']}
        ]
    )
    if response['Reservations']:
        return response['Reservations'][0]['Instances'][0].get('PublicIpAddress', 'No public IP assigned')
    return None

def create_ec2_instance(vpc_name, key_name, security_group_name):
    ec2 = boto3.client('ec2')
    try:
        if check_instance_exists('Name', 'Bastion_AZ_1a'):
            public_ip = get_instance_public_ip('Name', 'Bastion_AZ_1a')
            print(f"An instance with the tag 'Bastion_AZ_1a' already exists with Public IP: {public_ip}")
            return

        vpc_id = get_vpc_id(vpc_name)
        availability_zone = 'eu-west-1a'
        public_subnet_id = get_subnet_id(vpc_id, 'public_subnet_1', availability_zone)
        security_group_id = get_security_group_id(security_group_name, vpc_id)

        # Get the latest Amazon Linux 2 AMI
        response = ec2.describe_images(
            Filters=[
                {'Name': 'name', 'Values': ['amzn2-ami-hvm-2.0.*-x86_64-gp2']},
                {'Name': 'owner-alias', 'Values': ['amazon']}
            ],
            Owners=['amazon'],
            MaxResults=1000
        )
        images = response['Images']
        latest_image = sorted(images, key=lambda x: x['CreationDate'], reverse=True)[0]
        ami_id = latest_image['ImageId']
        print(f"Using AMI ID: {ami_id}")

        # Create the EC2 instance
        response = ec2.run_instances(
            ImageId=ami_id,
            KeyName=key_name,
            SecurityGroupIds=[security_group_id],
            SubnetId=public_subnet_id,
            InstanceType='t2.micro',
            Placement={'AvailabilityZone': availability_zone},
            BlockDeviceMappings=[{'DeviceName': '/dev/sdh', 'Ebs': {'VolumeSize': 100}}],
            MinCount=1,
            MaxCount=1,
            InstanceInitiatedShutdownBehavior='stop',
            TagSpecifications=[
                {
                    'ResourceType': 'instance',
                    'Tags': [{'Key': 'Name', 'Value': 'Bastion_AZ_1a'}]
                },
                {
                    'ResourceType': 'volume',
                    'Tags': [{'Key': 'Name', 'Value': 'Bastion'}]
                }
            ]
        )

        instance_id = response['Instances'][0]['InstanceId']
        print(f"Instance created: {instance_id}")

        # Wait for the instance to get a public IP address
        waiter = ec2.get_waiter('instance_running')
        waiter.wait(InstanceIds=[instance_id])

        # Get the public IP address
        response = ec2.describe_instances(InstanceIds=[instance_id])
        public_ip = response['Reservations'][0]['Instances'][0]['PublicIpAddress']
        print(f"Public IP Address: {public_ip}")

    except ClientError as e:
        print(f"An error occurred: {e}")

def check_instance_exists(tag_key, tag_value):
    ec2 = boto3.client('ec2')
    response = ec2.describe_instances(
        Filters=[
            {'Name': f'tag:{tag_key}', 'Values': [tag_value]},
            {'Name': 'instance-state-name', 'Values': ['pending', 'running']}
        ]
    )
    return len(response['Reservations']) > 0

if __name__ == "__main__":
    vpc_name = 'MyVPC'
    key_name = 'BastionKey'
    security_group_name = 'SGBastion'
    create_ec2_instance(vpc_name, key_name, security_group_name)

Using AMI ID: ami-00caa7df15d2e771f
Instance created: i-0576e458537401716
Public IP Address: 3.254.155.240
