### Imports & Settings

In [1]:
## Imports

# From standard library
import json
import logging
import time
from typing import Tuple

# From third party
import boto3
import botocore
from dotenv import dotenv_values
import pandas as pd
import psycopg2

In [2]:
# Logging configuration
logging.basicConfig(
    filename="./project.log",
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    level=logging.INFO,
)

In [3]:
# Overall region
REGION_NAME = "us-west-2"

# Policies required
REQUIRED_POLICIES = ["AmazonRedshiftFullAccess", "AmazonS3ReadOnlyAccess", "IAMFullAccess", "AdministratorAccess"]

# IAM Role Global Variables
ROLE_NAME = "dwhRole"
ASSUME_ROLE_POLICY_DOCUMENT = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": "sts:AssumeRole",
            "Effect": "Allow",
            "Principal": {"Service": "redshift.amazonaws.com"},
        }
    ],
}
POLICY_ARN = "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"  # For attaching policies

# Redshift Global Variables
CLUSTER_IDENTIFIER = "dwhCluster"
CLUSTER_TYPE = "multi-node"
NUMBER_OF_NODES = 4
NODE_TYPE = "dc2.large"
DB_NAME = "dwh"
MASTER_USER_NAME = "dwhuser"
MASTER_USER_PASSWORD = "Passw0rd"

# Connection Global Variables
PORT = 5439
CIDR_IP = "0.0.0.0/0"
IP_PROTOCOL = "TCP"

# S3 Global Variables
LOG_DATA = "s3://udacity-dend/log_data"
LOG_JSONPATH = "s3://udacity-dend/log_json_path.json"
SONG_DATA = "s3://udacity-dend/song_data"

In [4]:
# Get AWS credentials from .env file
env = dotenv_values()

AWS_ACCESS_KEY_ID = env["AWS_ACCESS_KEY_ID"]
AWS_SECRET_ACCESS_KEY = env["AWS_SECRET_ACCESS_KEY"]

### Create Clients

In [5]:
def create_clients(
        aws_access_key_id: str, 
        aws_secret_access_key: str, 
        region_name: str = REGION_NAME
    ) -> Tuple[botocore.client, ...]:
    """Creates the required clients for the project."""
    try:
        sts = boto3.client(
            "sts",
            region_name=region_name,
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
        )
        iam = boto3.client(
            "iam",
            region_name=region_name,
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
        )
        s3 = boto3.client(
            "s3",
            region_name=region_name,
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
        )
        redshift = boto3.client(
            "redshift",
            region_name=region_name,
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
        )
        ec2 = boto3.client(
            "ec2",
            region_name=region_name,
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
        )
        return sts, iam, s3, redshift, ec2
    except Exception as e:
        logging.error(e)
        raise e

In [6]:
sts, iam, s3, redshift, ec2 = create_clients(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, region_name=REGION_NAME)
sts, iam, s3, redshift, ec2

(<botocore.client.STS at 0x1244fdb50>,
 <botocore.client.IAM at 0x1268eaa90>,
 <botocore.client.S3 at 0x126c44f50>,
 <botocore.client.Redshift at 0x126f5af10>,
 <botocore.client.EC2 at 0x12aa265d0>)

### Check User

In [7]:
def get_user_name(
        sts: botocore.client
    ) -> str:
    """Get the user name from STS client."""
    try:
        user_info = sts.get_caller_identity()
        return user_info["Arn"].split("/")[1]
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == 'InvalidClientTokenId':
            print('The key and/or secret is invalid.')
        else:
            logging.error(e)
            raise e
    except Exception as e:
        logging.error(e)
        raise e

In [8]:
session = boto3.Session(region_name=REGION_NAME, aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)

In [9]:
# Create an STS client
sts_client = session.client('sts')

# Retrieve information about the caller identity
try:
    print(session.client("sts").get_caller_identity().get("Arn").split(":")[-1].split("/")[1])
except botocore.exceptions.ClientError as e:
    raise e

airflow_redshift_user


In [10]:
get_user_name(sts)

'airflow_redshift_user'

In [11]:
user_name = get_user_name(sts)
user_name

'airflow_redshift_user'

In [12]:
def user_has_required_policies(
        iam: botocore.client,
        user_name: str,
        required_policies: list = REQUIRED_POLICIES
    ) -> bool:
    """Check if the user has the required policies from IAM client."""
    try:
        user_policies = iam.list_attached_user_policies(UserName=user_name)["AttachedPolicies"]
        user_policies = [policy["PolicyName"] for policy in user_policies]
        return all(policy in user_policies for policy in required_policies)
    except Exception as e:
        logging.error(e)
        raise e

In [13]:
user_has_required_policies(iam, user_name=user_name, required_policies=REQUIRED_POLICIES)

True

### Role Handling

In [14]:
def role_exists(
    iam: botocore.client, 
    role_name: str = ROLE_NAME
) -> bool:
    """Check if the role exists from IAM client."""
    try:
        iam.get_role(RoleName=role_name)
        return True
    except iam.exceptions.NoSuchEntityException:
        return False
    except botocore.exceptions.ClientError as e:
        logging.error(e)
        raise e

In [15]:
role_exists(iam, role_name=ROLE_NAME)

False

In [16]:
def create_role(
        iam: botocore.client, 
        role_name: str = ROLE_NAME, 
        assume_role_policy_document: str = ASSUME_ROLE_POLICY_DOCUMENT
    ) -> None:
    """Create the role from IAM client."""
    if not role_exists(iam, role_name):
        try:
            iam.create_role(
                RoleName=role_name,
                AssumeRolePolicyDocument=json.dumps(assume_role_policy_document),
            )
            waiter = iam.get_waiter("role_exists")
            waiter.wait(RoleName=role_name)
            assert role_exists(iam, role_name), "Something went wrong. Role was not created."
        except Exception as e:
            logging.error(e)
            raise e

In [17]:
create_role(iam)

In [18]:
role_exists(iam)

True

In [19]:
def role_assumes_relavant_role_policy_document(
    iam: botocore.client,
    role_name: str = ROLE_NAME, 
    assume_role_policy_document: dict = ASSUME_ROLE_POLICY_DOCUMENT
) -> bool:
    """Check if the role has the right required trust relationship from IAM client."""
    if role_exists(iam, role_name):
        try:
            role = iam.get_role(RoleName=role_name)
            return role["Role"]["AssumeRolePolicyDocument"] == assume_role_policy_document
        except Exception as e:
            logging.error(e)
            raise e
    else:
        return False

In [20]:
role_assumes_relavant_role_policy_document(iam, role_name=ROLE_NAME, assume_role_policy_document=ASSUME_ROLE_POLICY_DOCUMENT)

True

In [21]:
def role_has_required_policy_attached(
    iam: botocore.client,
    role_name: str = ROLE_NAME, 
    policy_arn: str = POLICY_ARN
) -> bool:
    """Check if the role has the required policy attached from IAM client."""
    if role_exists(iam, role_name):
        try:
            role_policies = iam.list_attached_role_policies(RoleName=role_name)["AttachedPolicies"]
            role_policies = [policy["PolicyArn"] for policy in role_policies]
            return policy_arn in role_policies
        except Exception as e:
            logging.error(e)
            raise e
    else:
        return False

In [22]:
role_has_required_policy_attached(iam, role_name=ROLE_NAME, policy_arn=POLICY_ARN)

False

In [23]:
def attach_required_policy_to_role(
    iam: botocore.client, 
    role_name: str = ROLE_NAME, 
    policy_arn: str = POLICY_ARN
    ) -> None:
    """Attach the required policy to the user from IAM client."""
    if role_exists(iam, role_name) and not role_has_required_policy_attached(iam, role_name, policy_arn):
        try:
            iam.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
            # time.sleep(10)
            assert role_has_required_policy_attached(iam, role_name, policy_arn), "Something went wrong, Policy is not attached."
        except Exception as e:
            logging.error(e)
            raise e

In [24]:
attach_required_policy_to_role(iam, role_name=ROLE_NAME, policy_arn=POLICY_ARN)

In [25]:
role_has_required_policy_attached(iam, ROLE_NAME, POLICY_ARN)

True

In [26]:
def role_completed(
        iam: botocore.client, 
        role_name: str = ROLE_NAME, 
        assume_role_policy_document: dict = ASSUME_ROLE_POLICY_DOCUMENT, 
        policy_arn: str = POLICY_ARN
    ) -> bool:
    """Check if the role is completed from IAM client."""
    return (
        role_exists(iam, role_name) & 
        role_assumes_relavant_role_policy_document(iam, role_name, assume_role_policy_document) & 
        role_has_required_policy_attached(iam, role_name, policy_arn)
    )

In [27]:
role_completed(iam, role_name=ROLE_NAME, assume_role_policy_document=ASSUME_ROLE_POLICY_DOCUMENT, policy_arn=POLICY_ARN)

True

In [28]:
def build_role_as_necessary(
    iam: botocore.client, 
    role_name: str = ROLE_NAME, 
    assume_role_policy_document: 
    dict = ASSUME_ROLE_POLICY_DOCUMENT, 
    policy_arn: str = POLICY_ARN
) -> str:
    """Builds the role and the required compenents 
    if not already existing from IAM client and returns the ARN."""
    if not role_exists(iam, role_name):
        create_role(iam, role_name, assume_role_policy_document)
    if not role_assumes_relavant_role_policy_document(iam, role_name, assume_role_policy_document):
        error_message = "Role already exists but with a different trust relationship. Please visit the AWS console to amend the policy."
        logging.error(error_message)
        raise Exception(error_message)
    if not role_has_required_policy_attached(iam, role_name, policy_arn):
        attach_required_policy_to_role(iam, role_name, policy_arn)
    assert role_completed(iam, role_name, assume_role_policy_document, policy_arn), "Something went wrong. The role is not completed."
    return iam.get_role(RoleName=role_name)["Role"]["Arn"]

In [29]:
iam_roles = build_role_as_necessary(iam, role_name=ROLE_NAME, assume_role_policy_document=ASSUME_ROLE_POLICY_DOCUMENT, policy_arn=POLICY_ARN)
iam_roles

'arn:aws:iam::130246939765:role/dwhRole'

In [30]:
def destroy_role(
    iam: botocore.client, 
    role_name: str = ROLE_NAME
) -> None:
    """Destroys the role from IAM client by detaching all policies and deleting the role."""
    if role_exists(iam, role_name):
        try:
            role_policies = iam.list_attached_role_policies(RoleName=role_name)["AttachedPolicies"]
            for policy in role_policies:
                iam.detach_role_policy(RoleName=role_name, PolicyArn=policy["PolicyArn"])
            iam.delete_role(RoleName=role_name)
            assert not role_exists(iam, role_name), "Something went wrong. The role still exists."
        except Exception as e:
            logging.error(e)
            raise e

In [31]:
destroy_role(iam)

In [32]:
role_exists(iam, role_name=ROLE_NAME)

False

In [33]:
iam_roles = build_role_as_necessary(iam, role_name=ROLE_NAME, assume_role_policy_document=ASSUME_ROLE_POLICY_DOCUMENT, policy_arn=POLICY_ARN)
iam_roles

'arn:aws:iam::130246939765:role/dwhRole'

### Redshift Handling

In [34]:
def redshift_cluster_exists(redshift: botocore.client, cluster_identifier: str = CLUSTER_IDENTIFIER) -> bool:
    """Check if the cluster exists from Redshift client."""
    try:
        clusters = redshift.describe_clusters(ClusterIdentifier=cluster_identifier)["Clusters"]
        return len(clusters) == 1
    except redshift.exceptions.ClusterNotFoundFault as e:
        return False
    except Exception as e:
        logging.error(e)
        raise e

In [35]:
redshift_cluster_exists(redshift, cluster_identifier=CLUSTER_IDENTIFIER)

False

In [36]:
def create_redshift_cluster(
    redshift: botocore.client, 
    iam_roles: str, 
    cluster_identifier: str = CLUSTER_IDENTIFIER, 
    cluster_type: str = CLUSTER_TYPE, 
    node_type: str = NODE_TYPE, 
    number_of_nodes: int = NUMBER_OF_NODES, 
    db_name: str = DB_NAME, 
    master_user_name: str = MASTER_USER_NAME,
    master_user_password: str = MASTER_USER_PASSWORD, 
) -> str:
    """Creates the cluster from Redshift client."""
    try:
        redshift.create_cluster(
            IamRoles=[iam_roles],
            ClusterIdentifier=cluster_identifier, 
            ClusterType=cluster_type, 
            NodeType=node_type, 
            NumberOfNodes=number_of_nodes, 
            DBName=db_name, 
            MasterUsername=master_user_name, 
            MasterUserPassword=master_user_password, 
        )
        waiter = redshift.get_waiter("cluster_available")
        waiter.wait(ClusterIdentifier=cluster_identifier)
        assert redshift_cluster_exists(redshift, cluster_identifier), "Something went wrong. The cluster does not exist."
        return redshift.describe_clusters(ClusterIdentifier=cluster_identifier)["Clusters"][0]["VpcSecurityGroups"][0]["VpcSecurityGroupId"]
    except redshift.exceptions.ClusterAlreadyExistsFault as e:
        return redshift.describe_clusters(ClusterIdentifier=cluster_identifier)["Clusters"][0]["VpcSecurityGroups"][0]["VpcSecurityGroupId"]
    except Exception as e:
        logging.error(e)
        raise e 

In [37]:
group_id = create_redshift_cluster(redshift, iam_roles)
group_id

'sg-02c4055629a91d3aa'

In [38]:
redshift_cluster_exists(redshift, cluster_identifier=CLUSTER_IDENTIFIER)

True

In [39]:
def redshift_cluster_is_available(
    redshift: botocore.client,
    cluster_identifier: str = CLUSTER_IDENTIFIER
) -> bool:
    """Check if the cluster is available from Redshift client."""
    if redshift_cluster_exists(redshift, cluster_identifier):
        try:
            cluster_status = redshift.describe_clusters(ClusterIdentifier=cluster_identifier)["Clusters"][0]["ClusterStatus"]
            return cluster_status == "available"
        except Exception as e:
            logging.error(e)
            raise e

In [40]:
redshift_cluster_is_available(redshift, cluster_identifier=CLUSTER_IDENTIFIER)

True

In [41]:
def cluster_is_complete(
    redshift: botocore.client, 
    iam_roles: str,
    cluster_identifier: str = CLUSTER_IDENTIFIER,
    node_type: str = NODE_TYPE,
    number_of_nodes: int = NUMBER_OF_NODES,
    db_name: str = DB_NAME,
    master_user_name: str = MASTER_USER_NAME,
    # port: int = PORT
) -> bool:
    """Check if the cluster is complete from Redshift client."""
    try:
        if not redshift_cluster_exists(redshift, cluster_identifier):
            return False
        elif not redshift_cluster_is_available(redshift, cluster_identifier):
            return False
        else:
            cluster = redshift.describe_clusters(ClusterIdentifier=cluster_identifier)["Clusters"][0]
            return (
                cluster["NodeType"] == node_type and 
                cluster["NumberOfNodes"] == number_of_nodes and 
                cluster["IamRoles"][0]["IamRoleArn"] == iam_roles and
                cluster["DBName"] == db_name and
                cluster["MasterUsername"] == master_user_name 
            )
    except Exception as e:
        logging.error(e)
        raise e

In [42]:
cluster_is_complete(redshift, iam_roles, cluster_identifier=CLUSTER_IDENTIFIER, node_type=NODE_TYPE, number_of_nodes=NUMBER_OF_NODES, db_name=DB_NAME, master_user_name=MASTER_USER_NAME)#, port=PORT)

True

In [43]:
def get_redshift_host(redshift: botocore.client, cluster_identifier: str = CLUSTER_IDENTIFIER) -> str:
    """Get the host from Redshift client."""
    try:
        return redshift.describe_clusters(ClusterIdentifier=cluster_identifier)["Clusters"][0]["Endpoint"]["Address"]
    except Exception as e:
        logging.error(e)
        raise e

In [44]:
host = get_redshift_host(redshift, cluster_identifier=CLUSTER_IDENTIFIER)
host

'dwhcluster.cu0kxfsouwy2.us-west-2.redshift.amazonaws.com'

In [45]:
def authorize_ingress(
    ec2: botocore.client, 
    group_id: str, 
    cidr_ip: str = CIDR_IP,
    ip_protocol: str = IP_PROTOCOL,
    port: int = PORT
) -> None:
    """Authorizes the ingress from EC2 client."""
    try:
        ec2.authorize_security_group_ingress(
            GroupId=group_id,
            CidrIp=cidr_ip,
            IpProtocol=ip_protocol,
            FromPort=port,
            ToPort=port,
            )
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == 'InvalidPermission.Duplicate':
            print('The rule already exists.')
        else:
            logging.error(e)
            raise e
    except Exception as e:
        logging.error(e)
        raise e

In [46]:
authorize_ingress(ec2, group_id, cidr_ip=CIDR_IP, ip_protocol=IP_PROTOCOL, port=PORT)

In [47]:
def ingress_is_authorized(
    ec2: botocore.client, 
    group_id: str, 
    cidr_ip: str = CIDR_IP,
    ip_protocol: str = IP_PROTOCOL,
    port: int = PORT
) -> bool:
    """Check if the ingress is authorized from EC2 client."""
    try:
        ip_permission = ec2.describe_security_groups(GroupIds=[group_id])["SecurityGroups"][0]["IpPermissions"][0]
        if ip_permission.get("IpRanges") == []:
            return False
        else:
            return all([
                ip_permission["IpProtocol"] == ip_protocol.lower(), 
                ip_permission["IpRanges"][0]["CidrIp"] == cidr_ip if ip_permission["IpRanges"] != [] else False,
                ip_permission["IpRanges"][0]["CidrIp"] == cidr_ip,
                ip_permission["FromPort"] == port,
                ip_permission["ToPort"] == port,
            ])
    except Exception as e:
        logging.error(e)
        raise e

In [48]:
ingress_is_authorized(ec2, group_id, cidr_ip=CIDR_IP, ip_protocol=IP_PROTOCOL, port=PORT)

True

##### Check Database Connection

In [49]:
configuration = {
    "host": host,
    "port": PORT,
    "database": DB_NAME,
    "user": MASTER_USER_NAME,
    "password": MASTER_USER_PASSWORD,
}

connection = psycopg2.connect(**configuration)
cursor = connection.cursor()
cursor.connection.encoding

'UNICODE'

In [None]:
print(host, PORT, DB_NAME, MASTER_USER_NAME, MASTER_USER_PASSWORD)

In [None]:
def revoke_ingress(ec2: botocore.client, group_id: str, cidr_ip: str = CIDR_IP, ip_protocol: str = IP_PROTOCOL, port: int = PORT) -> None:
    """Revokes the ingress from EC2 client."""
    try:
        ec2.revoke_security_group_ingress(
            GroupId=group_id,
            IpProtocol=ip_protocol,
            CidrIp=cidr_ip,
            FromPort=port,
            ToPort=port,
            )
    except Exception as e:
        logging.error(e)
        raise e        

In [None]:
revoke_ingress(ec2, group_id, cidr_ip=CIDR_IP, ip_protocol=IP_PROTOCOL, port=PORT)

In [None]:
ingress_is_authorized(ec2, group_id, cidr_ip=CIDR_IP, ip_protocol=IP_PROTOCOL, port=PORT)

In [None]:
def delete_cluster(redshift: botocore.client, cluster_identifier: str = CLUSTER_IDENTIFIER) -> None:
    try:
        redshift.delete_cluster(ClusterIdentifier=CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True)
        waiter = redshift.get_waiter("cluster_deleted")
        waiter.wait(ClusterIdentifier=CLUSTER_IDENTIFIER)
    except botocore.exceptions.ClientError as e:
        if e.response["Error"]["Code"] == "ClusterNotFound":
            print("Cluster not found.")
        else:
            logging.error(e)
            raise e
    except Exception as e:
        logging.error(e)
        raise e

In [None]:
delete_cluster(redshift, cluster_identifier=CLUSTER_IDENTIFIER)

In [None]:
destroy_role(iam, role_name=ROLE_NAME)