# **AWS DATA WAREHOUSE INTERFACE**

User will run code cells in this notebook to set the state of the data warehouse
- PREREQUISITE: IMPORT LIBRARIES
- COLLECT CLUSTER CONFIGURATION
- CONFIGURE ROLE AND POLICY
- CREATE REDSHIFT CLUSTER
- CONFIGURE REDSHIFT CLUSTER
- GET CLUSTER DESCRIPTION
- DELETE REDSHIFT CLUSTER

### **PREREQUISITE: IMPORT LIBRARIES**

In [None]:
import boto3
import json
import pandas as pd

import config

Reusable Functions:

In [None]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

### **COLLECT CLUSTER CONFIGURATION**

In [None]:
#config = configparser.ConfigParser()
#config.read('dwh.cfg')

#REGION = config.get('AWS', 'REGION')
#KEY = config.get('AWS', 'KEY')
#SECRET = config.get('AWS', 'SECRET')
#CLUSTER_TYPE = config.get('REDSHIFT', 'DWH_CLUSTER_TYPE')
#NUM_NODES = config.get('REDSHIFT', 'DWH_NUM_NODES')
#NODE_TYPE = config.get('REDSHIFT', 'DWH_NODE_TYPE')
#CLUSTER_ID = config.get('REDSHIFT', 'DWH_CLUSTER_IDENTIFIER')
#DB_NAME = config.get('REDSHIFT', 'DWH_DB')
#USER = config.get('REDSHIFT', 'DWH_DB_USER')
#PW = config.get('REDSHIFT', 'DWH_DB_PASSWORD')
#PORT = config.get('REDSHIFT', 'DWH_PORT')
#IAM_ROLE = config.get('REDSHIFT', 'DWH_IAM_ROLE_NAME')
#IAM_ROLE_ARN = config.get('IAM_ROLE', 'ARN')

### **CONFIGURE ROLE AND POLICY**

In [None]:
iam = boto3.client(
    'iam',
    aws_access_key_id=config.KEY,
    aws_secret_access_key=config.SECRET,
    region_name=config.REGION
)
print(config.KEY)
print(config.SECRET)
print(config.REGION)

try:
    print("Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=config.IAM_ROLE,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {
                'Statement': [
                    {
                        'Action': 'sts:AssumeRole',
                        'Effect': 'Allow',
                        'Principal': 
                        {
                            'Service': 'redshift.amazonaws.com'
                        }
                    }
                ],
                'Version': '2012-10-17'
            }
        )
    )    
except Exception as e:
    print(e)

print("Attaching policy to new role")
iam.attach_role_policy(
    RoleName=config.IAM_ROLE,
    PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
)['ResponseMetadata']['HTTPStatusCode']

roleArn = iam.get_role(RoleName=config.IAM_ROLE)['Role']['Arn']

### **CREATE REDSHIFT CLUSTER**

In [None]:
redshift_client = boto3.client(
    'redshift',
    region_name=config.REGION,
    aws_access_key_id=config.KEY,
    aws_secret_access_key=config.SECRET
)

try:
    redshift_client.create_cluster(
        ClusterType=config.CLUSTER_TYPE,
        NodeType=config.NODE_TYPE,
        NumberOfNodes=int(config.NUM_NODES),
        DBName=config.DB_NAME,
        ClusterIdentifier=config.CLUSTER_ID,
        MasterUsername=config.USER,
        MasterUserPassword=config.PW,
        IamRoles=[roleArn]
    )
    print(redshift_client.describe_clusters(ClusterIdentifier=config.CLUSTER_ID)['Clusters'][0])
except Exception as error:
    print(error)

### **GET CLUSTER DESCRIPTION**

This will be called repeatedly in this notebook. It should be idempotent.

In [None]:
myClusterProps = redshift_client.describe_clusters(ClusterIdentifier=config.CLUSTER_ID)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

In [None]:
dwh_endpoint = myClusterProps['Endpoint']['Address']
dwh_role_arn = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", dwh_endpoint)
print("DWH_ROLE_ARN :: ", dwh_role_arn)

In [None]:
import configparser

config.ENDPOINT = dwh_endpoint
config.IAM_ROLE_ARN = dwh_role_arn

cfg = configparser.ConfigParser()
cfg.read('dwh.cfg')
cfg.set('REDSHIFT', 'DWH_ENDPOINT', dwh_endpoint)
cfg.set('IAM_ROLE', 'ARN', dwh_role_arn)
with open('dwh.cfg', 'w') as configfile:
    cfg.write(configfile)

### **CONFIGURE REDSHIFT CLUSTER**

In [None]:
ec2 = boto3.resource(
    'ec2',
    region_name=config.REGION,
    aws_access_key_id=config.KEY,
    aws_secret_access_key=config.SECRET
)

cluster_props = redshift_client.describe_clusters(ClusterIdentifier=config.CLUSTER_ID)['Clusters'][0]
vpc = ec2.Vpc(id=cluster_props['VpcId'])
defaultSg = list(vpc.security_groups.all())[0]
defaultSg.authorize_ingress(
    GroupName=defaultSg.group_name,
    CidrIp='0.0.0.0/0',
    IpProtocol='TCP',
    FromPort=int(config.PORT),
    ToPort=int(config.PORT)
)

### **INSPECT REDSHIFT DATABASE**

In [None]:
import psycopg2

conn = psycopg2.connect(f"""
        host={config.ENDPOINT} 
        dbname={config.DB_NAME} 
        user={config.USER} 
        password={config.PW} 
        port={config.PORT}
""")
cur = conn.cursor()
q = """
SELECT DISTINCT tablename
FROM PG_TABLE_DEF
WHERE schemaname='public';
"""
cur.execute(q)
print(cur.fetchall())
#pd.DataFrame(data = cur.fetchall)
conn.close()

In [None]:
%load_ext sql

In [None]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(config.USER, config.PW, config.ENDPOINT, config.PORT, config.DB_NAME)
%sql $conn_string

In [None]:
%sql SELECT * FROM stl_load_errors LIMIT 5

In [None]:
%sql SELECT * FROM staging_events LIMIT 5

In [None]:
%sql SELECT * FROM staging_songs LIMIT 5

In [None]:
%sql SELECT * FROM songs LIMIT 5

In [None]:
%sql SELECT * FROM artists LIMIT 5

In [None]:
%sql SELECT * FROM users LIMIT 5

In [None]:
%sql SELECT * FROM time LIMIT 5

In [None]:
%sql SELECT * FROM songplays LIMIT 5

In [None]:
q = """
WITH uniq_staging_events AS (
        SELECT userId, firstName, lastName, gender, level, ROW_NUMBER() OVER(PARTITION BY userId ORDER BY ts DESC) AS rank
        FROM staging_events
        WHERE userId != NULL
    )
SELECT userId, firstName, lastName, gender, level
FROM uniq_staging_events
WHERE rank = 1;
"""
%sql :q

In [None]:
data = %sql SELECT * FROM staging_events WHERE userId = 69 LIMIT 1

In [None]:
df = pd.DataFrame(data, columns=[
        'artist', 'auth', 'firstName', 'gender', 'itemInSession', 'lastName',
        'length', 'location', 'method', 'page', 'registration',
        'sessionId', 'song', 'status', 'ts', 'userAgent', 'userId', 'level'
    ]
)
print(df)

In [None]:
import config

conn = psycopg2.connect(f"""
        host={config.ENDPOINT}
        dbname={config.DB_NAME}
        user={config.USER}
        password={config.PW}
        port={config.PORT}
    """)
cur = conn.cursor()
cur.execute('SELECT * FROM staging_events;')
staging_events_data = cur.fetchall()
staging_events_df = pd.DataFrame(
    staging_events_data,
    columns=[
        'artist', 'auth', 'firstName', 'gender', 'itemInSession', 'lastName',
        'length', 'level', 'location', 'method', 'page', 'registration',
        'sessionId', 'song', 'status', 'ts', 'userAgent', 'userId'
    ]
)
for index, row in staging_events_df.iterrows():
    if row['firstName'] == "Anabelle":
        print(row)
        print(row['userId'])
        check = None
        if type(row['userId']) == float:
            check = int(row['userId'])
        if type(row['userId']) == str:
            check = int(float(row['userId']))
        print(check)
        break

### **DELETE REDSHIFT CLUSTER**

The next cell will delete the cluster. Make sure to run the cell to get cluster description after deleting the cluster to confirm its deletion.

In [None]:
try:
    redshift_client.delete_cluster(ClusterIdentifier=config.CLUSTER_ID, SkipFinalClusterSnapshot=True)
except Exception as error:
    print(error)

In [None]:
myClusterProps = redshift_client.describe_clusters(ClusterIdentifier=config.CLUSTER_ID)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Clean up resources

In [None]:
iam.detach_role_policy(RoleName=config.IAM_ROLE, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=config.IAM_ROLE)