# AWS Resources deployment using Infrastructure-as-Code (on a Jupyter Notebook)

### Libraries

In [None]:
# !python3 -m pip install boto3
# !python3 -m pip install requests
# !python3 -m pip install tqdm
# !python3 -m pip install pandas
# !python3 -m pip install s3fs
# !python3 -m pip install ipywidgets
# !python3 -m pip install -q -U paramiko
# !python3 -m pip install -q -U scp
# !pip install -q -U ipython-sql
# !pip install -q -U psycopg2-binary

In [1]:
import IPython
import boto3
import time
import os
import json
import requests
import pandas as pd
import paramiko
import scp
from zipfile import ZipFile
from urllib.request import urlopen
from tqdm.notebook import tqdm
import psycopg2
from botocore.config import Config

#### Some basic settings

In [2]:
#core
my_bucket_name = 'dantohe-my-experimental-iac-01'
# the covid-19 data lake is located in us-east-2
my_region = 'us-east-2'
stem = 'my-experimental'

#ec2
my_InstanceProfileName = f'{stem}-InstanceProfileName-iac-01'
ec2_pem_name      = f'{stem}-kp-june-2021-01'
my_role_name = f'{stem}-ec2-role-01'
my_security_group_name = f'{stem}-Airflow-security-group-01'

#redshift
my_redshift_role_name = f'{stem}-Redshift-role-01'
redshift_port = 5439
redshift_user = 'redshift'
# Only printable ASCII characters except for '/', '@', '"', ' ', '\', ''' may be used.
redshift_MasterUserPassword = 'kljhdfsKLJDD12345'
redshift_db=f'{stem}-capstone-db'
redshift_ClusterIdentifier=f'{stem}-redshift-cluster'
#https://aws.amazon.com/redshift/pricing/
redshift_NodeType='dc2.large'
# https://docs.aws.amazon.com/redshift/latest/mgmt/working-with-clusters.html
redshift_NumberOfNodes=1
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.create_cluster
redshift_ClusterType='single-node'


#### Configuring the clients

In [3]:
my_config = Config(
    region_name = my_region,
    signature_version = 'v4',
    retries = {
        'max_attempts': 10,
        'mode': 'standard'
    }
)

# client = boto3.client('kinesis', config=my_config)
ec2 = boto3.client('ec2', config=my_config)
iam = boto3.client('iam', config=my_config)
redshift = boto3.client('redshift', config=my_config)

#### Create an EC2 key-pair    
If the key already exists then don't do anything.    

In [4]:
key_exists = False
response = ec2.describe_key_pairs()['KeyPairs']
for key in response:
    if key['KeyName'] == ec2_pem_name:
        key_exists = True
    found_instance = ec2.describe_instances(
        Filters=[
            {
                'Name': 'key-name',
                'Values': [key['KeyName']]
            }
        ]
    )['Reservations']

if key_exists:
    print('key already exists')
else:
    ec2_pem_path = f'./{ec2_pem_name}.pem'
    if os.path.isfile(ec2_pem_path):
        os.remove(ec2_pem_path)
    ec2_keypair = ec2.create_key_pair(KeyName=ec2_pem_name)
    with open(ec2_pem_path, 'w+') as ec2_pem_file:
        ec2_pem_file.write(str(ec2_keypair['KeyMaterial']))
    !chmod 400 {ec2_pem_path}
    print(f'{ec2_pem_name} has been created sucessfully and the pem is available at\n{ec2_pem_path}')
    

my-experimental-kp-june-2021-01 has been created sucessfully and the pem is available at
./my-experimental-kp-june-2021-01.pem


## EC2 Resources    
IAM - refrences: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iam.html#role   


### Create a role and attach the s3 access policies

#### A utility that checks if a given role already exists    
If already in place returns the role object, otherwise returns a None.    

In [5]:
def does_role_already_exist(role_name):
    roles = iam.list_roles()
    role_list = roles['Roles']
    requested_role= None

    for role in role_list:
        if role['RoleName'] == role_name:
            requested_role = role
            return requested_role
    return requested_role

### Create role for ec2 and attach s3 access policies

In [6]:
roles = iam.list_roles()
role_list = roles['Roles']
ec2_role= None

for key in role_list:
    if key['RoleName'] == my_role_name:
        ec2_role = key

if ec2_role is not None:
    print(f'Role {my_role_name} already exists')
else:
    ec2_role = iam.create_role(
        Path='/',
        RoleName=my_role_name,
        Description='',
        MaxSessionDuration=3600,
        AssumeRolePolicyDocument="""{
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": { "Service": "ec2.amazonaws.com"},
          "Action": "sts:AssumeRole"
        }
      ]
    }""".replace('<dw_bucket>', my_bucket_name))['Role']
    ### Also atach the S3 policy to the role
    for ec2_policy in  [
        'arn:aws:iam::aws:policy/AmazonS3FullAccess']:
        assert iam.attach_role_policy(
            RoleName=ec2_role['RoleName'],
            PolicyArn=ec2_policy)['ResponseMetadata']['HTTPStatusCode'] == 200
    
    print(f'{my_role_name} has been createed - the S3 policies have been attached')
#     ec2_role['Arn']

my-experimental-ec2-role-01 has been createed - the S3 policies have been attached


### Creates the instance profile AND adds the role to instance profile

In [7]:
instance_profiles = iam.list_instance_profiles()
instance_profiles_list = instance_profiles['InstanceProfiles']
ec2_instance_profile = None

# existing_instance_profile_names =[]

for key in instance_profiles_list:
    if key['InstanceProfileName'] == my_InstanceProfileName:
        ec2_instance_profile =key
#     existing_instance_profile_names.append(key['InstanceProfileName'])

# if my_InstanceProfileName in existing_instance_profile_names:
#     print(f'{my_InstanceProfileName} already exists')
if ec2_instance_profile is not None:
    print(f'{my_InstanceProfileName} already exists')
else:
    #creates the instaance profile
    ec2_instance_profile = iam.create_instance_profile(InstanceProfileName=my_InstanceProfileName)['InstanceProfile']
    iam.get_waiter('instance_profile_exists').wait(InstanceProfileName=my_InstanceProfileName)

    #adds the role to the instance profile
    assert iam.add_role_to_instance_profile(InstanceProfileName=ec2_instance_profile['InstanceProfileName'], RoleName=ec2_role['RoleName'])['ResponseMetadata']['HTTPStatusCode'] == 200
    print(f'{my_InstanceProfileName} has been created')

my-experimental-InstanceProfileName-iac-01 has been created


### Createting a security group

In [8]:
security_groups = ec2.describe_security_groups()
existing_security_groups = security_groups['SecurityGroups']

ec2_sg = None

for key in existing_security_groups:
    if key['GroupName'] == my_security_group_name:
      ec2_sg=key  
    
if ec2_sg is not None:
    print(f'The security group {my_security_group_name} already exists')
else:
    ec2_sg = ec2.create_security_group(
        Description='Allows 22 trafic',
        GroupName=my_security_group_name)
    ec2.authorize_security_group_ingress(CidrIp='0.0.0.0/0', FromPort=22, ToPort=22, GroupId=ec2_sg['GroupId'], IpProtocol='TCP')
    ec2.authorize_security_group_ingress(CidrIp='0.0.0.0/0', FromPort=8080, ToPort=8080, GroupId=ec2_sg['GroupId'], IpProtocol='TCP')
    ec2.authorize_security_group_ingress(CidrIp='0.0.0.0/0', FromPort=5555, ToPort=5555, GroupId=ec2_sg['GroupId'], IpProtocol='TCP')
    ec2.authorize_security_group_ingress(CidrIp='0.0.0.0/0', FromPort=3306, ToPort=3306, GroupId=ec2_sg['GroupId'], IpProtocol='TCP')
    print(f'The security group {my_security_group_name} has been created')
    print(f"SG ID: {ec2_sg['GroupId']}")

The security group my-experimental-Airflow-security-group-01 has been created
SG ID: sg-073b8ab85a18dbe75


### Requesting spot instance(s)

In [9]:
# !aws configure set region 'us-east-2'
my_session = boto3.session.Session()
my_region = my_session.region_name
ags_west = boto3.client('autoscaling', region_name=my_region)
print(f"We are in Region: {my_region}")

We are in Region: us-east-2


In [10]:
# time.sleep(30) #wait instance profile...
#Amazon Linux AMI - it has some issues and complications with installing mysql and airflow
# ec2_ami_id = 'ami-0aeeebd8d2ab47354'
#defaulting to ubuntu
# us-east-1 'ami-09e67e426f25ce0d7'
# us-west-2 'ami-03d5c68bab01f3496'
# us-east-2 'ami-00399ec92321828f5'
ec2_ami_id = 'ami-00399ec92321828f5'
ec2_spot = ec2.request_spot_instances(
    AvailabilityZoneGroup=my_region,
    InstanceCount=1,
    LaunchSpecification={
        'SecurityGroupIds': [ec2_sg['GroupId']],
        'EbsOptimized': False,
        'KeyName': ec2_pem_name,
        'ImageId': ec2_ami_id,
        'InstanceType': 't3.large',
        'IamInstanceProfile': {
            'Arn': ec2_instance_profile['Arn']
        },
        "BlockDeviceMappings": [
            {
                "DeviceName": "/dev/sda1",
                "Ebs": {
                        "DeleteOnTermination": True,
                        "VolumeSize": 30,
                        "Encrypted": False,
                        "VolumeType": "gp2"
                }
            }
        ],
    },
    SpotPrice='0.10',
    Type='one-time',
    InstanceInterruptionBehavior='terminate'
)
ec2_spot_id = ec2_spot['SpotInstanceRequests'][0]['SpotInstanceRequestId']
ec2.get_waiter('spot_instance_request_fulfilled').wait(SpotInstanceRequestIds=[ec2_spot_id])
print(f'Spot instance request: {ec2_spot_id}')

Spot instance request: sir-6ytrsgdg


### Gets the instance ID

In [11]:
ec2_vm_id = ec2.describe_spot_instance_requests(SpotInstanceRequestIds=[ ec2_spot_id ]) \
    ['SpotInstanceRequests'] \
    [0] \
    ['InstanceId']
ec2.get_waiter('instance_status_ok').wait(InstanceIds=[ ec2_vm_id ])
print(f'InstanceIds: {ec2_vm_id}')

InstanceIds: i-0b3ac31d882114985


### Allocating a public IP address

In [12]:
ec2_ip = ec2.allocate_address(Domain='vpc')
print(f"PublicIp: {ec2_ip['PublicIp']}\nAllocationId: {ec2_ip['AllocationId']}")

PublicIp: 3.128.44.183
AllocationId: eipalloc-0fd0464f53378e7e8


### Associates the IP address with the instance

In [13]:
ec2_vm_ip = ec2.associate_address(
     InstanceId = ec2_vm_id,
     AllocationId = ec2_ip["AllocationId"])
print(f"IP AssociationId: {ec2_vm_ip['AssociationId']}")

IP AssociationId: eipassoc-0e183da27eea0857e


## SSH

### SSH utilities

In [14]:
def get_ssh(ip, pem_path):
    print(f"ssh -i {pem_path} ubuntu@{ip}")
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(hostname=ip, username='ubuntu', pkey=paramiko.RSAKey.from_private_key_file(pem_path))
    return ssh

def run_via_ssh(
        ip,
        pem_path,
        commands,
        display_output=False):
    
    ssh = get_ssh(ip, pem_path)
    try:
        for command in tqdm(commands):
            stdin, stdout, stderr = ssh.exec_command(command)
            exit_status = stdout.channel.recv_exit_status()
            if exit_status == 0:
                print(('command executed successfuly:::', command))
                if display_output:
                    output_buffer = stdout.read().decode('utf-8')
                    if output_buffer:
                        print(f">>> {output_buffer}")
            else:
                error_buffer = stderr.read().decode('utf-8')
                print(('!!!failed', command))
                print(f"!!! {error_buffer}")
    finally:
        ssh.close()

### Instaling MySql and Airflow

In [15]:
run_via_ssh(
    ip=ec2_ip['PublicIp'],
    pem_path=ec2_pem_path,
    commands=[
        'sudo apt-get -y update',
        'sudo apt-get install -y libmysqlclient-dev mysql-server',
        f"sudo mysql -e \"SET GLOBAL explicit_defaults_for_timestamp = 1;\"",
        f"sudo mysql -e \"DROP DATABASE IF EXISTS airflow;\"",       
        f"sudo mysql -e \"CREATE DATABASE airflow CHARACTER SET UTF8mb3 COLLATE utf8_general_ci;\"",
        f"sudo mysql -e \"CREATE USER 'airflow'@'localhost' IDENTIFIED BY 'airflow';\"",
        f"sudo mysql -e \"GRANT ALL PRIVILEGES ON airflow.* TO 'airflow'@'localhost';\"",
        f"sudo apt install -y redis-server",
        'sudo apt-get install -y python3 python3-pip python3-setuptools',
        'sudo pip3 install -U pip',
        'sudo pip3 install -U apache-airflow',
        'sudo pip3 install -U apache-airflow[mysql]',
        'sudo pip3 install -U apache-airflow[celery]'
    ])

ssh -i ./my-experimental-kp-june-2021-01.pem ubuntu@3.128.44.183


  0%|          | 0/13 [00:00<?, ?it/s]

('command executed successfuly:::', 'sudo apt-get -y update')
('command executed successfuly:::', 'sudo apt-get install -y libmysqlclient-dev mysql-server')
('command executed successfuly:::', 'sudo mysql -e "SET GLOBAL explicit_defaults_for_timestamp = 1;"')
('command executed successfuly:::', 'sudo mysql -e "DROP DATABASE IF EXISTS airflow;"')
('command executed successfuly:::', 'sudo mysql -e "CREATE DATABASE airflow CHARACTER SET UTF8mb3 COLLATE utf8_general_ci;"')
('command executed successfuly:::', 'sudo mysql -e "CREATE USER \'airflow\'@\'localhost\' IDENTIFIED BY \'airflow\';"')
('command executed successfuly:::', 'sudo mysql -e "GRANT ALL PRIVILEGES ON airflow.* TO \'airflow\'@\'localhost\';"')
('command executed successfuly:::', 'sudo apt install -y redis-server')
('command executed successfuly:::', 'sudo apt-get install -y python3 python3-pip python3-setuptools')
('command executed successfuly:::', 'sudo pip3 install -U pip')
('command executed successfuly:::', 'sudo pip3 in

### Configure Airflow

In [16]:
run_via_ssh(ip=ec2_ip['PublicIp'],
    pem_path=ec2_pem_path,
    commands=[
        'airflow db init',
        'sudo apt-get install -y crudini',
        "crudini --set ~/airflow/airflow.cfg core load_examples False",
        "crudini --set ~/airflow/airflow.cfg core load_default_connections False",
        "crudini --set ~/airflow/airflow.cfg core sql_alchemy_conn 'mysql://airflow:airflow@localhost/airflow'",
        "crudini --set ~/airflow/airflow.cfg core executor CeleryExecutor",
        "crudini --set ~/airflow/airflow.cfg core sql_alchemy_schema airflow",
        "crudini --set ~/airflow/airflow.cfg scheduler min_file_process_interval 10",
        "crudini --set ~/airflow/airflow.cfg scheduler dag_dir_list_interval 60",
        "crudini --set ~/airflow/airflow.cfg celery result_backend 'redis://127.0.0.1:6379/0'",
        "crudini --set ~/airflow/airflow.cfg celery broker_url 'db+mysql://airflow:airflow@localhost/airflow'",
        'airflow db init',
    ])

ssh -i ./my-experimental-kp-june-2021-01.pem ubuntu@3.128.44.183


  0%|          | 0/12 [00:00<?, ?it/s]

('command executed successfuly:::', 'airflow db init')
('command executed successfuly:::', 'sudo apt-get install -y crudini')
('command executed successfuly:::', 'crudini --set ~/airflow/airflow.cfg core load_examples False')
('command executed successfuly:::', 'crudini --set ~/airflow/airflow.cfg core load_default_connections False')
('command executed successfuly:::', "crudini --set ~/airflow/airflow.cfg core sql_alchemy_conn 'mysql://airflow:airflow@localhost/airflow'")
('command executed successfuly:::', 'crudini --set ~/airflow/airflow.cfg core executor CeleryExecutor')
('command executed successfuly:::', 'crudini --set ~/airflow/airflow.cfg core sql_alchemy_schema airflow')
('command executed successfuly:::', 'crudini --set ~/airflow/airflow.cfg scheduler min_file_process_interval 10')
('command executed successfuly:::', 'crudini --set ~/airflow/airflow.cfg scheduler dag_dir_list_interval 60')
('command executed successfuly:::', "crudini --set ~/airflow/airflow.cfg celery result_

### Create Airflow dag directory

In [17]:
run_via_ssh(
    ip=ec2_ip['PublicIp'],
    pem_path=ec2_pem_path,
    commands=[
        'mkdir -p ~/airflow/dags'
    ])

ssh -i ./my-experimental-kp-june-2021-01.pem ubuntu@3.128.44.183


  0%|          | 0/1 [00:00<?, ?it/s]

('command executed successfuly:::', 'mkdir -p ~/airflow/dags')


### Install python modules

In [18]:
run_via_ssh(
    ip=ec2_ip['PublicIp'],
    pem_path=ec2_pem_path,
    commands=[
        'sudo pip3 install -U tensorflow',
        'sudo pip3 install -U pandas',
        'sudo pip3 install -U scikit-learn',
        'sudo pip3 install -U numpy',
        'sudo pip3 install -U psycopg2-binary',
        'sudo pip3 install -U requests',
        'sudo pip3 install -U boto3',
        'sudo pip3 install -U matplotlib',
        'sudo pip3 install -U reportlab',
        'sudo pip3 install -U flower',
        'sudo pip3 install -U proj',
        'sudo pip3 install -U redis'
    ])

ssh -i ./my-experimental-kp-june-2021-01.pem ubuntu@3.128.44.183


  0%|          | 0/12 [00:00<?, ?it/s]

('command executed successfuly:::', 'sudo pip3 install -U tensorflow')
('command executed successfuly:::', 'sudo pip3 install -U pandas')
('command executed successfuly:::', 'sudo pip3 install -U scikit-learn')
('command executed successfuly:::', 'sudo pip3 install -U numpy')
('command executed successfuly:::', 'sudo pip3 install -U psycopg2-binary')
('command executed successfuly:::', 'sudo pip3 install -U requests')
('command executed successfuly:::', 'sudo pip3 install -U boto3')
('command executed successfuly:::', 'sudo pip3 install -U matplotlib')
('command executed successfuly:::', 'sudo pip3 install -U reportlab')
('command executed successfuly:::', 'sudo pip3 install -U flower')
('command executed successfuly:::', 'sudo pip3 install -U proj')
('command executed successfuly:::', 'sudo pip3 install -U redis')


### Start Airflow

In [19]:
# airflow kerberos -D
# airflow scheduler -D
# airflow webserver -D
run_via_ssh(
    ip=ec2_ip['PublicIp'],
    pem_path=ec2_pem_path,
    commands=[
        'airflow users  create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin',
        'airflow scheduler -D',
        'airflow celery worker -D',
        'airflow celery flower -D',
        'airflow webserver -p 8080 -D'
    ])

ssh -i ./my-experimental-kp-june-2021-01.pem ubuntu@3.128.44.183


  0%|          | 0/5 [00:00<?, ?it/s]

('command executed successfuly:::', 'airflow users  create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin')
('command executed successfuly:::', 'airflow scheduler -D')
('command executed successfuly:::', 'airflow celery worker -D')
('command executed successfuly:::', 'airflow celery flower -D')
('command executed successfuly:::', 'airflow webserver -p 8080 -D')


### Accesing the environment 

In [20]:
print(f"SSH      : ssh -i {ec2_pem_name} ubuntu@{ec2_ip['PublicIp']}")
print(f"WebServer: http://{ec2_ip['PublicIp']}:8080")
# print(f"Flower   : http://{ec2_ip['PublicIp']}:5555")

SSH      : ssh -i my-experimental-kp-june-2021-01 ubuntu@3.128.44.183
WebServer: http://3.128.44.183:8080
Flower   : http://3.128.44.183:5555


## Refshift Setup

### Creates a role for Redshift and attach the needed policies.   

In [21]:
redshift_role = does_role_already_exist(my_redshift_role_name)

if redshift_role is None:
    redshift_role = iam.create_role(
        Path='/',
        RoleName=my_redshift_role_name,
        Description='role used for for capstone project',
        MaxSessionDuration=3600,
        AssumeRolePolicyDocument="""{
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "redshift.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }""")['Role']
#     attaching the policies
    for redshift_policy in  [
        'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess',
        'arn:aws:iam::aws:policy/AWSGlueConsoleFullAccess']:
        assert iam.attach_role_policy(
            RoleName=redshift_role['RoleName'],
            PolicyArn=redshift_policy)['ResponseMetadata']['HTTPStatusCode'] == 200
    print(f'Redshift role {my_redshift_role_name} has been created. The policies were also attached. ')
else:
    print(f'Redshift role {my_redshift_role_name} already exists ')
print(f"The Redshift role has been ceated with the ARN: {redshift_role['Arn']}")

Redshift role my-experimental-Redshift-role-01 has been created. The policies were also attached. 
The Redshift role has been ceated with the ARN: arn:aws:iam::986106953013:role/my-experimental-Redshift-role-01


### Create a security group for Redshift

In [22]:
redshift_sg = ec2.create_security_group(
    Description='Allows 5432 trafic',
    GroupName='Redshift')
ec2.authorize_security_group_ingress(CidrIp='0.0.0.0/0', FromPort=5439, ToPort=5439, GroupId=redshift_sg['GroupId'], IpProtocol='TCP')
print(f"Refshift security group {redshift_sg['GroupId']} created successfuly")

Refshift security group sg-0cb96482aa669bba5 created successfuly


### Allocate a public IP for Redshift

In [23]:
redshift_ip = ec2.allocate_address(Domain='vpc')
# [ redshift_ip['PublicIp'], redshift_ip['AllocationId'] ]
print(f"Redshift PublicIp: {redshift_ip['PublicIp']} for AllocationId: {redshift_ip['AllocationId']}")

Redshift PublicIp: 3.129.99.152 for AllocationId: eipalloc-0f2173c8888d522fe


### Create a Redshift cluster

In [24]:
# redshift_host = redshift_ip['PublicIp']
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.create_cluster
redshift_cluster = redshift.create_cluster(
    DBName=redshift_db,
    ClusterIdentifier=redshift_ClusterIdentifier,
    NodeType=redshift_NodeType,
    ClusterType=redshift_ClusterType,
#     AvailabilityZone=my_region,
#     change the next one (uncomment) if the redshift_ClusterType is multinode
#     see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.create_cluster
#     NumberOfNodes=redshift_NumberOfNodes,
    MasterUsername=redshift_user,
    MasterUserPassword=redshift_MasterUserPassword,
    VpcSecurityGroupIds=[ redshift_sg['GroupId'] ],
    IamRoles=[ redshift_role['Arn'] ],
    ElasticIp=redshift_ip['PublicIp'],
    PubliclyAccessible=True,
    Encrypted=False)['Cluster']
redshift.get_waiter('cluster_available').wait(ClusterIdentifier=redshift_cluster['ClusterIdentifier'])
print(f"Redshift cluster {redshift_cluster['ClusterIdentifier']} has been created")

Redshift cluster my-experimental-redshift-cluster has been created


### Connecting to the cluster

In [25]:
%load_ext sql

In [26]:
redshift_url = f"postgresql://{redshift_user}:{redshift_MasterUserPassword}@{redshift_ip['PublicIp']}:5439/{redshift_db}"
print(f"user: {redshift_user}\npassword: {redshift_MasterUserPassword}\nserver: {redshift_ip['PublicIp']}:5439/{redshift_db}")    
print(f"Redshift connection string = '{redshift_url}'")
%sql $redshift_url

user: redshift
password: kljhdfsKLJDD12345
server: 3.129.99.152:5439/my-experimental-capstone-db
Redshift connection string = 'postgresql://redshift:kljhdfsKLJDD12345@3.129.99.152:5439/my-experimental-capstone-db'


In [27]:
%sql $redshift_url

### Using the AWS covid-19 Data Lake      
Abouy the covid-19 data lake : https://aws.amazon.com/blogs/big-data/a-public-data-lake-for-analysis-of-covid-19-data/     
The url to the CloudFormation template that will create the data lake within the account :https://us-east-2.console.aws.amazon.com/cloudformation/home?region=us-east-2#/stacks/create/review?templateURL=https://covid19-lake.s3.us-east-2.amazonaws.com/cfn/CovidLakeStack.template.json&stackName=CovidLakeStack     
Additional resources: https://aws.amazon.com/blogs/big-data/exploring-the-public-aws-covid-19-data-lake/     


### Create an external table using the Glue Data Catalog 

In [34]:
#Identify redshift role ARN and ust it for creating an external table using the Glue data catalog.
print(f"The Redshift role has been ceated with the ARN: {redshift_role['Arn']}")

The Redshift role has been ceated with the ARN: arn:aws:iam::986106953013:role/my-experimental-Redshift-role-01


In [35]:
%%sql
create external schema spectrum_schema from data catalog
 database 'covid-19'
 iam_role 'arn:aws:iam::986106953013:role/my-experimental-Redshift-role-01'
 create external database if not exists;		
# redshift_role['Arn']    

 * postgresql://redshift:***@3.129.99.152:5439/my-experimental-capstone-db
(psycopg2.errors.DuplicateSchema) Schema "spectrum_schema" already exists

[SQL: create external schema spectrum_schema from data catalog
 database 'covid-19'
 iam_role 'arn:aws:iam::986106953013:role/my-experimental-Redshift-role-01'
 create external database if not exists;]
(Background on this error at: http://sqlalche.me/e/14/f405)


### Create an table using the external table

In [36]:
%%sql
drop table if exists public.alleninstitute_metadata;
create table public.alleninstitute_metadata
as select * from 
spectrum_schema.alleninstitute_metadata;


 * postgresql://redshift:***@3.129.99.152:5439/my-experimental-capstone-db
Done.
Done.


[]

### Verify table 

In [37]:
%%sql
select count(*) from 
spectrum_schema.alleninstitute_metadata;

 * postgresql://redshift:***@3.129.99.152:5439/my-experimental-capstone-db
1 rows affected.


count
161564


In [39]:
%%sql
select * from 
spectrum_schema.alleninstitute_metadata
limit 1;

 * postgresql://redshift:***@3.129.99.152:5439/my-experimental-capstone-db
1 rows affected.


cord_uid,sha,source_x,title,doi,pmcid,pubmed_id,license,abstract,publish_time,authors,journal,microsoft academic paper id,who #covidence,has_full_text,full_text_file,url
lom1dbk7,,WHO,Factors Associated With Surgical Mortality and Complications Among Patients With and Without Coronavirus Disease 2019 (COVID-19) in Italy,,,,unk,"Importance: There are limited data on mortality and complications rates in patients with coronavirus disease 2019 (COVID-19) who undergo surgery. Objective: To evaluate early surgical outcomes of patients with COVID-19 in different subspecialties. Design, Setting, and Participants: This matched cohort study conducted in the general, vascular and thoracic surgery, orthopedic, and neurosurgery units of Spedali Civili Hospital (Brescia, Italy) included patients who underwent surgical treatment from February 23 to April 1, 2020, and had positive test results for COVID-19 either before or within 1 week after surgery. Gynecological and minor surgical procedures were excluded. Patients with COVID-19 were matched with patients without COVID-19 with a 1:2 ratio for sex, age group, American Society of Anesthesiologists score, and comorbidities recorded in the surgical risk calculator of the American College of Surgeons National Surgical Quality Improvement Program. Patients older than 65 years were also matched for the Clinical Frailty Scale score. Exposures: Patients with positive results for COVID-19 and undergoing surgery vs matched surgical patients without infection. Screening for COVID-19 was performed with reverse transcriptase-polymerase chain reaction assay in nasopharyngeal swabs, chest radiography, and/or computed tomography. Diagnosis of COVID-19 was based on positivity of at least 1 of these investigations. Main Outcomes and Measures: The primary end point was early surgical mortality and complications in patients with COVID-19; secondary end points were the modeling of complications to determine the importance of COVID-19 compared with other surgical risk factors. Results: Of 41 patients (of 333 who underwent operation during the same period) who underwent mainly urgent surgery, 33 (80.5%) had positive results for COVID-19 preoperatively and 8 (19.5%) had positive results within 5 days from surgery. Of the 123 patients of the combined cohorts (78 women [63.4%]; mean [SD] age, 76.6 [14.4] years), 30-day mortality was significantly higher for those with COVID-19 compared with control patients without COVID-19 (odds ratio [OR], 9.5; 95% CI, 1.77-96.53). Complications were also significantly higher (OR, 4.98; 95% CI, 1.81-16.07); pulmonary complications were the most common (OR, 35.62; 95% CI, 9.34-205.55), but thrombotic complications were also significantly associated with COVID-19 (OR, 13.2; 95% CI, 1.48-&#8734;). Different models (cumulative link model and classification tree) identified COVID-19 as the main variable associated with complications. Conclusions and Relevance: In this matched cohort study, surgical mortality and complications were higher in patients with COVID-19 compared with patients without COVID-19. These data suggest that, whenever possible, surgery should be postponed in patients with COVID-19.",2020,"Doglietto, Francesco; Vezzoli, Marika; Gheza, Federico; Lussardi, Gian Luca; Domenicucci, Marco; Vecchiarelli, Luca; Zanin, Luca; Saraceno, Giorgio; Signorini, Liana; Panciani, Pier Paolo; Castelli, Francesco; Maroldi, Roberto; Rasulo, Francesco Antonio; Benvenuti, Mauro Roberto; Portolani, Nazario; Bonardelli, Stefano; Milano, Giuseppe; Casiraghi, Alessandro; Calza, Stefano; Fontanella, Marco Maria",JAMA surg. (Online),,,,,


### Create a dataframe from the table

In [40]:
from sqlalchemy import create_engine
# import pandas as pd
engine = create_engine(redshift_url)
data_frame = pd.read_sql('SELECT * FROM public.alleninstitute_metadata;', engine)

In [41]:
data_frame.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 161564 entries, 0 to 161563
Data columns (total 17 columns):
 #   Column                       Non-Null Count   Dtype 
---  ------                       --------------   ----- 
 0   cord_uid                     161564 non-null  object
 1   sha                          161564 non-null  object
 2   source_x                     161564 non-null  object
 3   title                        161564 non-null  object
 4   doi                          161564 non-null  object
 5   pmcid                        161564 non-null  object
 6   pubmed_id                    161564 non-null  object
 7   license                      161564 non-null  object
 8   abstract                     161564 non-null  object
 9   publish_time                 161564 non-null  object
 10  authors                      161564 non-null  object
 11  journal                      161564 non-null  object
 12  microsoft academic paper id  0 non-null       object
 13  who #covidence

#### Columns 
 0   cord_uid    
 1   sha    
 2   source_x    
 3   title    
 4   doi    
 5   pmcid    
 6   pubmed_id    
 7   license    
 8   abstract    
 9   publish_time    
 10  authors    
 11  journal    
 12  microsoft academic paper id    
 13  who #covidence    
 14  has_full_text    
 15  full_text_file    
 16  url    
 

In [42]:
data_frame.head()

Unnamed: 0,cord_uid,sha,source_x,title,doi,pmcid,pubmed_id,license,abstract,publish_time,authors,journal,microsoft academic paper id,who #covidence,has_full_text,full_text_file,url
0,lom1dbk7,,WHO,Factors Associated With Surgical Mortality and...,,,,unk,Importance: There are limited data on mortalit...,2020,"Doglietto, Francesco; Vezzoli, Marika; Gheza, ...",JAMA surg. (Online),,,,,
1,f6pioe0x,,WHO,COVID-19 outbreak in Northern Italy: Viewpoint...,,,,unk,,2020,"Kurihara, Hayato; Bisagni, Pietro; Faccincani,...",J Trauma Acute Care Surg,,,,,
2,pzkltzv2,,WHO,Keeping the country positive during the COVID ...,,,,unk,,2020,"Prabhu, Arvind N; Kamath, Giridhar B; Pai, Div...",Asian J Psychiatr,,,,,
3,ysogcprc,,WHO,An analysis of spatiotemporal pattern for COIV...,,,,unk,This study seeks to examine and analyze the sp...,2020,"Mo, Chunbao; Tan, Dechan; Mai, Tingyu; Bei, Ch...",J. med. virol,,,,,
4,wkbddypb,,WHO,Atypical Presentation of COVID-19 Incidentally...,,,,unk,"The incidence of COVID-19, a severe acute resp...",2020,"Mattoli, Maria Vittoria; Taralli, Silvia; Penn...",Clin. nucl. med,,,,,


In [44]:
data_frame.to_csv('temp/alleninstitute_metadata.csv')

## Clean Up

### Delete Redshift cluster

In [45]:
redshift.delete_cluster(ClusterIdentifier=redshift_cluster['ClusterIdentifier'], SkipFinalClusterSnapshot=True)
redshift.get_waiter('cluster_deleted').wait(ClusterIdentifier=redshift_cluster['ClusterIdentifier'])
print('Redshift cluster deleted.')

Redshift cluster deleted.


### Release Redshift public IP

In [46]:
ec2.release_address(AllocationId=redshift_ip['AllocationId'])

{'ResponseMetadata': {'RequestId': '534829e0-aaae-4e70-88a7-42e66efc6511',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '534829e0-aaae-4e70-88a7-42e66efc6511',
   'cache-control': 'no-cache, no-store',
   'strict-transport-security': 'max-age=31536000; includeSubDomains',
   'content-type': 'text/xml;charset=UTF-8',
   'content-length': '229',
   'date': 'Wed, 30 Jun 2021 16:19:09 GMT',
   'server': 'AmazonEC2'},
  'RetryAttempts': 0}}

### Delete Redshift security group

In [47]:
ec2.delete_security_group(GroupId=redshift_sg['GroupId'])

{'ResponseMetadata': {'RequestId': 'b9e75d7b-be44-4f9c-b812-6b9ab319b9dc',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'b9e75d7b-be44-4f9c-b812-6b9ab319b9dc',
   'cache-control': 'no-cache, no-store',
   'strict-transport-security': 'max-age=31536000; includeSubDomains',
   'content-type': 'text/xml;charset=UTF-8',
   'content-length': '239',
   'date': 'Wed, 30 Jun 2021 16:19:12 GMT',
   'server': 'AmazonEC2'},
  'RetryAttempts': 0}}

### Delete Redshift role

In [48]:
for attached_policy in iam.list_attached_role_policies(RoleName=redshift_role['RoleName'])['AttachedPolicies']:
        iam.detach_role_policy(RoleName=redshift_role['RoleName'], PolicyArn=attached_policy['PolicyArn'])
for policy_name in iam.list_role_policies(RoleName=redshift_role['RoleName'])['PolicyNames']:
    iam.delete_role_policy(RoleName=redshift_role['RoleName'], PolicyName=policy_name)
iam.delete_role(RoleName=redshift_role['RoleName'])

{'ResponseMetadata': {'RequestId': '494a9421-692a-426e-8b5f-bf01a4f2666c',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '494a9421-692a-426e-8b5f-bf01a4f2666c',
   'content-type': 'text/xml',
   'content-length': '200',
   'date': 'Wed, 30 Jun 2021 16:19:15 GMT'},
  'RetryAttempts': 0}}

### Cancel the spot instance request

In [49]:
ec2.cancel_spot_instance_requests(SpotInstanceRequestIds=[ ec2_spot_id ])
# ec2.cancel_spot_instance_requests(SpotInstanceRequestIds=[ 'sir-rk8sj4bj' ])

{'CancelledSpotInstanceRequests': [{'SpotInstanceRequestId': 'sir-6ytrsgdg',
   'State': 'cancelled'}],
 'ResponseMetadata': {'RequestId': '9c7b02ba-7d2d-4acc-a95b-da3665db49f4',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '9c7b02ba-7d2d-4acc-a95b-da3665db49f4',
   'cache-control': 'no-cache, no-store',
   'strict-transport-security': 'max-age=31536000; includeSubDomains',
   'content-type': 'text/xml;charset=UTF-8',
   'content-length': '426',
   'date': 'Wed, 30 Jun 2021 16:19:18 GMT',
   'server': 'AmazonEC2'},
  'RetryAttempts': 0}}

### Terminate the spot instance

In [50]:
ec2.terminate_instances(InstanceIds=[ ec2_vm_id ])
# ec2.terminate_instances(InstanceIds=[ 'i-080cf3bee321c8357' ])

{'TerminatingInstances': [{'CurrentState': {'Code': 32,
    'Name': 'shutting-down'},
   'InstanceId': 'i-0b3ac31d882114985',
   'PreviousState': {'Code': 16, 'Name': 'running'}}],
 'ResponseMetadata': {'RequestId': '643446d7-4d82-4c6b-8e74-4d1e7a8458f9',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '643446d7-4d82-4c6b-8e74-4d1e7a8458f9',
   'cache-control': 'no-cache, no-store',
   'strict-transport-security': 'max-age=31536000; includeSubDomains',
   'content-type': 'text/xml;charset=UTF-8',
   'transfer-encoding': 'chunked',
   'vary': 'accept-encoding',
   'date': 'Wed, 30 Jun 2021 16:19:21 GMT',
   'server': 'AmazonEC2'},
  'RetryAttempts': 0}}

### Wait for the instance to terminate and release the IP address 

In [51]:
ec2.get_waiter('instance_terminated').wait(InstanceIds=[ ec2_vm_id ])
ec2.release_address(AllocationId=ec2_ip['AllocationId'])

#if stuck needs to be removed manualy from the console ( VPC -> Elastic IPs)
# ec2.get_waiter('instance_terminated').wait(InstanceIds=['i-080cf3bee321c8357'])
# ec2.release_address(AllocationId='eipassoc-026af42e9e3b0fb02')



{'ResponseMetadata': {'RequestId': '15fb227e-a6f1-409f-b597-6fdbc3da1b3a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '15fb227e-a6f1-409f-b597-6fdbc3da1b3a',
   'cache-control': 'no-cache, no-store',
   'strict-transport-security': 'max-age=31536000; includeSubDomains',
   'content-type': 'text/xml;charset=UTF-8',
   'content-length': '229',
   'date': 'Wed, 30 Jun 2021 16:20:55 GMT',
   'server': 'AmazonEC2'},
  'RetryAttempts': 0}}

### Delete security group

In [None]:
ec2.delete_security_group(GroupId=ec2_sg['GroupId'])

# ec2.delete_security_group(GroupId='sg-0a1592d4d67e19433')

### Delete the key-pair

In [None]:
ec2.delete_key_pair(KeyName=ec2_pem_name)

### Detach and delete the role policies

In [None]:
for attached_policy in iam.list_attached_role_policies(RoleName=ec2_role['RoleName'])['AttachedPolicies']:
    iam.detach_role_policy(RoleName=ec2_role['RoleName'], PolicyArn=attached_policy['PolicyArn'])
for policy_name in iam.list_role_policies(RoleName=ec2_role['RoleName'])['PolicyNames']:
    iam.delete_role_policy(RoleName=ec2_role['RoleName'], PolicyName=policy_name)

### Remove role from instance profile

In [None]:
iam.remove_role_from_instance_profile(InstanceProfileName=ec2_instance_profile['InstanceProfileName'], RoleName=ec2_role['RoleName'])

### Delete instance profile

In [None]:
iam.delete_instance_profile(InstanceProfileName=ec2_instance_profile['InstanceProfileName'])

### Delete the role

In [None]:
iam.delete_role(RoleName=ec2_role['RoleName'])

## ALL DONE

## Refrences    
https://www.cloudwalker.io/2019/09/30/airflow-scale-out-with-redis-and-celery/   
https://aws.amazon.com/blogs/big-data/a-public-data-lake-for-analysis-of-covid-19-data/     
https://us-east-2.console.aws.amazon.com/cloudformation/home?region=us-east-2#/stacks/create/review?templateURL=https://covid19-lake.s3.us-east-2.amazonaws.com/cfn/CovidLakeStack.template.json&stackName=CovidLakeStack
https://aws.amazon.com/blogs/big-data/exploring-the-public-aws-covid-19-data-lake/   
https://medium.com/@hudsonmendes/data-pipeline-for-data-science-part-1-problem-solution-fit-3b092880efa3     


