# Notebook to create an AWS Redshift cluster

In [1]:
import pandas as pd
import boto3
import json

from time import time

#### Set verbosity to 0|1|2 (default=0)

In [2]:
VERBOSE = 1

## STEP-1:  Load cluster parameters from `myDWH.cfg`

In [3]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('../myDWH.cfg'))

# Retrieve the USER-related  parameters
USR_KEY                = config.get('USR','USR_KEY')
USR_SECRET             = config.get('USR','USR_SECRET')

# Retrieve the AWS-related  parameters
AWS_REGION             = config.get('AWS','AWS_REGION')

# Retrieve the CLUSTER-related  parameters
CLUSTER_NAME           = config.get("CLUSTER","CLUSTER_NAME")
CLUSTER_TYPE           = config.get("CLUSTER","CLUSTER_TYPE")
CLUSTER_NODE_TYPE      = config.get("CLUSTER","CLUSTER_NODE_TYPE")
CLUSTER_NODE_COUNT     = config.get("CLUSTER","CLUSTER_NODE_COUNT")

CLUSTER_DB_NAME        = config.get("CLUSTER","CLUSTER_DB_NAME")
CLUSTER_DB_USER        = config.get("CLUSTER","CLUSTER_DB_USER")
CLUSTER_DB_PASSWORD    = config.get("CLUSTER","CLUSTER_DB_PASSWORD")
CLUSTER_DB_PORT        = config.get("CLUSTER","CLUSTER_DB_PORT")

# Retrieve the IAM-related parameters
AWS_RESOURCE_NAME      = config.get("IAM_ROLE", "AWS_RESOURCE_NAME")

(CLUSTER_DB_USER, CLUSTER_DB_PASSWORD, CLUSTER_DB_NAME)

if VERBOSE > 0:
    df = pd.DataFrame({"Param":
                       ["CLUSTER_TYPE", "CLUSTER_NODE_COUNT", "CLUSTER_NODE_TYPE", "CLUSTER_NAME", "CLUSTER_DB_NAME", "CLUSTER_DB_USER", "CLUSTER_DB_PASSWORD", "CLUSTER_DB_PORT", "AWS_RESOURCE_NAME", "AWS_REGION"],
                       "Value":
                       [ CLUSTER_TYPE ,  CLUSTER_NODE_COUNT ,  CLUSTER_NODE_TYPE ,  CLUSTER_NAME ,  CLUSTER_DB_NAME ,  CLUSTER_DB_USER ,  CLUSTER_DB_PASSWORD ,  CLUSTER_DB_PORT ,  AWS_RESOURCE_NAME ,  AWS_REGION ]
                      })
    print(df)

                 Param       Value
0         CLUSTER_TYPE  multi-node
1   CLUSTER_NODE_COUNT           2
2    CLUSTER_NODE_TYPE   dc2.large
3         CLUSTER_NAME  dwhCluster
4      CLUSTER_DB_NAME         dwh
5      CLUSTER_DB_USER     dwhuser
6  CLUSTER_DB_PASSWORD    Passw0rd
7      CLUSTER_DB_PORT        5439
8    AWS_RESOURCE_NAME     dwhRole
9           AWS_REGION   us-west-2


## STEP-2: Create clients for EC2, S3, IAM, and Redshift   

In [4]:
import boto3

# Note-1: To use Boto3, we must indicate which services we are going to use.
#         In our case it will be: ec2, s3, iam and redshift.

# Note-2: If you have the AWS CLI installed, then you can use the AWS configure
#         command to configure the credentials file instead of passing them as parameters.

if VERBOSE > 0:
    print("Create clients for EC2, S3, IAM, and Redshift")

ec2 = boto3.resource('ec2',
                       region_name=AWS_REGION,
                       aws_access_key_id=USR_KEY,
                       aws_secret_access_key=USR_SECRET
                    )

s3 = boto3.resource('s3',
                       region_name=AWS_REGION,
                       aws_access_key_id=USR_KEY,
                       aws_secret_access_key=USR_SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=USR_KEY,
                     aws_secret_access_key=USR_SECRET,
                     region_name=AWS_REGION
                  )

redshift = boto3.client('redshift',
                       region_name=AWS_REGION,
                       aws_access_key_id=USR_KEY,
                       aws_secret_access_key=USR_SECRET
                       )

Create clients for EC2, S3, IAM, and Redshift


## STEP-3: Create an IAM Role

In [5]:
from botocore.exceptions import ClientError

### STEP-3.1: CREATE IAM ROLE #############################
if VERBOSE > 0:
    print("Creating IAM Role")
    
try:
    dwhRole = iam.create_role(
        Path='/',
        RoleName=AWS_RESOURCE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
               'Version': '2012-10-17'})
    )
except Exception as e:
    print("\nWARNING: A role with similar name most likely already exists.")
    if VERBOSE > 0:
        print(e)
else:
    if VERBOSE > 0:
        print("\tFYI - Details of the current role with name \'%s\':" % dwhRole['Role']['RoleName'])
        print(dwhRole)
        print()

### STEP-3.2: ATTACH POLICY ###############################
response = iam.attach_role_policy(RoleName=AWS_RESOURCE_NAME, 
                                  PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                                 )['ResponseMetadata']['HTTPStatusCode']
if VERBOSE > 0:
    print("Attaching Policy - Allow access to all Amazon S3 buckets (ReadOnly)") 
    # See also - https://docs.aws.amazon.com/redshift/latest/dg/c-getting-started-using-spectrum-create-role.html
    print('Response: %i\n' % response)

### STEP-3.3: RETRIEVE THE IAM ROLE ARN ###################
roleArn = iam.get_role(RoleName=AWS_RESOURCE_NAME)['Role']['Arn']
if VERBOSE > 0:
    print('Getting the IAM role ARN')
    print(roleArn + '\n')

Creating IAM Role

An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhRole already exists.
Attaching Policy - Allow access to all Amazon S3 buckets (ReadOnly)
Response: 200

Getting the IAM role ARN
arn:aws:iam::449575054145:role/dwhRole



## STEP-4:  Create the Redshift cluster
- Create a [RedShift Cluster](https://console.aws.amazon.com/redshiftv2/home)
- For complete arguments to `create_cluster`, see [docs](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.create_cluster)

In [15]:
if VERBOSE > 0:
    print("Creating Redshift cluster")

try:
    response = redshift.create_cluster(
        # Parameters for HW
        ClusterType   = CLUSTER_TYPE,
        NodeType      = CLUSTER_NODE_TYPE,
        NumberOfNodes = int(CLUSTER_NODE_COUNT),
        # Parameters for Identifiers & Credentials
        #  - DBName (string) -
        #      The name of the first database to be created when the cluster is created.
        #  - ClusterIdentifier (string) –
        #      A unique identifier for the cluster. You use this identifier to refer to
        #      the cluster for any subsequent cluster operations such as deleting or modifying.
        #  - MasterUsername (string) –
        #      The user name associated with the admin user for the cluster that is being created.
        DBName             = CLUSTER_DB_NAME,
        ClusterIdentifier  = CLUSTER_NAME,
        MasterUsername     = CLUSTER_DB_USER,
        MasterUserPassword = CLUSTER_DB_PASSWORD,       
        # Parameters for Roles (to allow s3 access)
        #   - IamRoles (list) – 
        #      A list of IAM roles that can be used by the cluster to access other AWS services.
        #      You must supply the IAM roles in their Amazon Resource Name (ARN) format.
        IamRoles=[roleArn]
    )
    if VERBOSE > 0:
        print(response)
except Exception as e:
    print(e)

Creating Redshift cluster
An error occurred (ClusterAlreadyExists) when calling the CreateCluster operation: Cluster already exists


## STEP-5:  Wait for the cluster to become available

In [6]:
import time

def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

while redshift.describe_clusters(ClusterIdentifier=CLUSTER_NAME)['Clusters'][0]['ClusterStatus'] != 'available':
    if VERBOSE > 0:
        print("Waiting for cluster to come up...")
    time.sleep(5)
print('Cluster is available')

# Describe the created cluster
cluster_props = redshift.describe_clusters(ClusterIdentifier=CLUSTER_NAME)['Clusters'][0]

if VERBOSE > 0:
    prettyRedshiftProps(cluster_props)

# Display the cluster endpoint and role ARN 
CLUSTER_ENDPOINT = cluster_props['Endpoint']['Address']
CLUSTER_ROLE_ARN = cluster_props['IamRoles'][0]['IamRoleArn']
print("\nFYI: This is the created cluster ENDPOINT and cluster Role ARN")
print("\tCLUSTER_ENDPOINT :: ", CLUSTER_ENDPOINT)
print("\tCLUSTER_ROLE_ARN :: ", CLUSTER_ROLE_ARN)


Cluster is available

FYI: This is the created cluster ENDPOINT and cluster Role ARN
	CLUSTER_ENDPOINT ::  dwhcluster.c4p6b3uqdbp8.us-west-2.redshift.amazonaws.com
	CLUSTER_ROLE_ARN ::  arn:aws:iam::449575054145:role/dwhRole


## STEP-6: Open a TCP port to access the cluster endpoint

In [7]:
if VERBOSE > 0:
    print("Opening a TCP port to the endpoint")

vpc = ec2.Vpc(id=cluster_props['VpcId'])
    
# Get the list of security groups.
#  - A security group acts as a firewall for the traffic to and from the resources in your VPC.
default_sg = list(vpc.security_groups.all())[0]
if VERBOSE > 0:
    print("Default security group: %s" % default_sg)    
    
try:
    default_sg.authorize_ingress(
        GroupName=default_sg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(CLUSTER_DB_PORT),
        ToPort=int(CLUSTER_DB_PORT)
    )
except Exception as e:
    print(e)

Opening a TCP port to the endpoint
Default security group: ec2.SecurityGroup(id='sg-001ef50f88654cbd0')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


## STEP-7: Make sure you can connect to the cluster

In [8]:
%load_ext sql

In [9]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(CLUSTER_DB_USER, CLUSTER_DB_PASSWORD, CLUSTER_ENDPOINT, CLUSTER_DB_PORT, CLUSTER_DB_NAME)
print(conn_string)
%sql $conn_string

postgresql://dwhuser:Passw0rd@dwhcluster.c4p6b3uqdbp8.us-west-2.redshift.amazonaws.com:5439/dwh


'Connected: dwhuser@dwh'

# ===== TEST-AND-DEBUG-SECTION ===== 

### Let's try to check out our own bucket on S3 ("fab-se4s-bucket")

In [13]:
mySe4sBucket = s3.Bucket("fab-se4s-bucket")

# Iterate over bucket objects starting with "ssbgz" and print
for obj in mySe4sBucket.objects.all():
    print(obj)

s3.ObjectSummary(bucket_name='fab-se4s-bucket', key='fab-cartoon.jpg')


### Let's take a look at the `song_data` set on S3

In [12]:
udacity_bucket = s3.Bucket("udacity-dend")

# Iterate over bucket objects starting with "song_data/A/A" and print
for obj in udacity_bucket.objects.filter(Prefix="song_data/A/A/A/TRAAAA"):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAK128F9318786.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAV128F421A322.json')


### Let's take a look at the log_data set on S3¶

In [13]:
udacity_bucket = s3.Bucket("udacity-dend")

# Iterate over bucket objects starting with "log_data" and print
for obj in udacity_bucket.objects.filter(Prefix="log_data/2018/11/2018-11-12"):
    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-12-events.json')


### Let's try to run a COPY query

In [14]:
table = "staging_events"
SQL_COPY = """
COPY {} FROM 's3://udacity-dend/log_data'
    credentials 'aws_iam_role=arn:aws:iam::449575054145:role/dwhRole'
    region 'us-west-2'
    TIMEFORMAT AS 'epochmillisecs' 
    JSON 's3://udacity-dend/log_json_path.json';
""".format(table)

print("======= LOADING TABLE: ** {} ** =======".format(table))
print(SQL_COPY)

t0 = time()
%sql $SQL_COPY
loadTime = time()-t0



COPY staging_events FROM 's3://udacity-dend/log_data'
    credentials 'aws_iam_role=arn:aws:iam::449575054145:role/dwhRole'
    region 'us-west-2'
    TIMEFORMAT AS 'epochmillisecs' 
    JSON 's3://udacity-dend/log_json_path.json';

 * postgresql://dwhuser:***@dwhcluster.c4p6b3uqdbp8.us-west-2.redshift.amazonaws.com:5439/dwh
Done.


### dimSong: Let's try to LOAD  the table by INSERTING new data from the staging tables while handling duplicates.

In [10]:
query = """
    SELECT song_id, title, artist_id, year, duration FROM staging_songs WHERE year=2000;
"""
%sql $query

 * postgresql://dwhuser:***@dwhcluster.c4p6b3uqdbp8.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


song_id,title,artist_id,year,duration
SOHOZBI12A8C132E3C,Smash It Up,AR0MWD61187B9B2B12,2000,195


In [41]:
query = """SELECT * FROM staging_songs WHERE year=2009; """
%sql $query


 * postgresql://dwhuser:***@dwhcluster.c4p6b3uqdbp8.us-west-2.redshift.amazonaws.com:5439/dwh
5 rows affected.


artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
ARJNIUY12298900C91,,,,Adelitas Way,213,1,SOBLFFE12AF72AA5BA,Scream,2009
ARJNIUY12298900C91,,,,Adelitas Way,213,1,SOBLFFE12AF72AA5BA,Scream,2009
ARJNIUY12298900C91,,,,Adelitas Way,213,1,SOBLFFE12AF72AA5BA,Scream,2009
ARJNIUY12298900C91,,,,Adelitas Way,213,1,SOBLFFE12AF72AA5BA,Scream,2009
ARJNIUY12298900C91,,,,Adelitas Way,213,1,SOBLFFE12AF72AA5BA,Scream,2009


In [40]:
# STEP-1: Delete *ALL* the rows in the table before inserting again
#-------------------------------------------------------------------
delete_raws = """DELETE FROM dimSong;"""
%sql $delete_raws

# STEP-2: Load the 'dimSong' table by inserting new data from the 
#     staging tables while handling duplicates.
#-------------------------------------------------------------------
# insert_with_duplicates = """
#     INSERT INTO dimSong (song_id, title, artist_id, year, duration)
#     SELECT song_id, title, artist_id, year, duration
#     FROM staging_songs;
# """
# %sql $insert_with_duplicates

rank_the_duplicates = """
    (SELECT
        song_id, 
        title, 
        artist_id, 
        year, 
        duration, 
        ROW_NUMBER() OVER (PARTITION BY song_id) AS song_id_ranked 
    FROM staging_songs);
"""
%sql $rank_the_duplicates

insert_wo_duplicates = """
    INSERT INTO dimSong (song_id, title, artist_id, year, duration)
    SELECT song_id, title, artist_id, year, duration 
    FROM
        (SELECT
            song_id, 
            title, 
            artist_id, 
            year, 
            duration, 
            ROW_NUMBER() OVER (PARTITION BY song_id) AS song_id_ranked 
        FROM staging_songs)
    WHERE song_id_ranked = 1;
"""
%sql $insert_wo_duplicates



 * postgresql://dwhuser:***@dwhcluster.c4p6b3uqdbp8.us-west-2.redshift.amazonaws.com:5439/dwh
48 rows affected.
 * postgresql://dwhuser:***@dwhcluster.c4p6b3uqdbp8.us-west-2.redshift.amazonaws.com:5439/dwh
48 rows affected.
 * postgresql://dwhuser:***@dwhcluster.c4p6b3uqdbp8.us-west-2.redshift.amazonaws.com:5439/dwh
24 rows affected.


[]

In [10]:
query = """
    SELECT song_id, COUNT(*) as count 
    FROM dimsong 
    GROUP BY song_id ORDER BY count;
"""
%sql $query

 * postgresql://dwhuser:***@dwhcluster.c4p6b3uqdbp8.us-west-2.redshift.amazonaws.com:5439/dwh
24 rows affected.


song_id,count
SOEKAZG12AB018837E,5
SOAPERH12A58A787DC,5
SOSMJFC12A8C13DE0C,5
SOBRKGM12A8C139EF6,5
SOAFBCP12A8C13CC7D,5
SOCIWDW12A8C13D406,5
SOBLFFE12AF72AA5BA,5
SODZYPO12A8C13A91E,5
SOIGICF12A8C141BC5,5
SOERIDA12A6D4F8506,5


### dimArtist: Let's try to LOAD  the table by INSERTING new data from the staging tables while handling duplicates.

In [None]:
query ="""
    INSERT INTO dimArtist (artist_id, artist_name, artist_location,
                           artist_latitude, artist_longitude)
        SELECT artist_id, artist_name, artist_location, 
                artist_latitude, artist_longitude
        FROM 
            staging_songs;
"""
%sql $query

### dimTime: Let's try to LOAD  the table by INSERTING new data from the staging tables while handling duplicates.

In [30]:
query = """
    INSERT INTO dimTime (start_time, hour, day, week, month, year, weekday)
        SELECT
            ts                         AS start_time,
            EXTRACT(hour      FROM ts) AS hour,
            EXTRACT(day       FROM ts) AS day,
            EXTRACT(week      FROM ts) AS week,
            EXTRACT(month     FROM ts) AS month,
            EXTRACT(year      FROM ts) AS year,
            EXTRACT(dayofweek FROM ts) AS weekday
    FROM
        staging_events
    WHERE ts IS NOT NULL;    
"""

%sql $query

 * postgresql://dwhuser:***@dwhcluster.c4p6b3uqdbp8.us-west-2.redshift.amazonaws.com:5439/dwh
24168 rows affected.


[]

### dimUser: Let's try to LOAD the table by INSERTING new data from the staging tables while handling duplicates.

In [51]:
rank_the_duplicates = """
INSERT INTO dimUser (user_id, first_name, last_name, gender, level)
    SELECT
        userid    AS user_id,
        firstname AS first_name,
        lastname  AS last_name,
        gender    AS gender,
        level     AS level
    FROM
        (SELECT
            userid,
            firstname, 
            lastname, 
            gender, 
            level, 
            ROW_NUMBER() OVER (PARTITION BY userid) AS userid_ranked 
        FROM 
            staging_events)
    WHERE userid_ranked=1 AND userid IS NOT NULL;
"""
%sql $rank_the_duplicates

 * postgresql://dwhuser:***@dwhcluster.c4p6b3uqdbp8.us-west-2.redshift.amazonaws.com:5439/dwh
97 rows affected.


[]

### dimSongpaly: Let's try to LOAD the table by INSERTING new data from the staging tables while handling duplicates.

In [75]:

insert_query = """
INSERT INTO factSongplay (start_time, user_id, level, song_id, artist_id,
                          session_id,  location, user_agent)
    SELECT DISTINCT
        staging_events.ts        AS start_time,
        staging_events.userId    AS user_id,
        staging_events.level     AS level,
        staging_songs.song_id    AS song_id,
        staging_songs.artist_id  As artist_id,
        staging_events.sessionId AS session_id,
        staging_events.location  AS location,
        staging_events.userAgent AS user_agent
    FROM
        staging_events
    JOIN staging_songs
        ON staging_events.song   = staging_songs.title AND 
           staging_events.artist = staging_songs.artist_name        
    WHERE
        staging_events.page   = 'NextSong'
    ;
"""

%sql $insert_query

 * postgresql://dwhuser:***@dwhcluster.c4p6b3uqdbp8.us-west-2.redshift.amazonaws.com:5439/dwh
16 rows affected.


[]

## EMERGENCY STEPS TO CLEAR THE CLUSTER

<b><font color='red'>- - - - -> Please consider using the `delete_cluster.py` instead <- - - - - - <br/>

In [1]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=CLUSTER_NAME,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

NameError: name 'redshift' is not defined

- Run this block several times until the cluster really deleted

In [None]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=CLUSTER_NAME)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

In [None]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
iam.detach_role_policy(RoleName=AWS_RESOURCE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=AWS_RESOURCE_NAME)
#### CAREFUL!!