In [1]:
import pandas as pd
import boto3
import json
import psycopg2
import configparser

## Loading Cluster Params

In [9]:
config = configparser.ConfigParser()
config.read_file(open('clusterparams.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":[DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })




Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DWH_DB,dwh
5,DWH_DB_USER,dwhuser
6,DWH_DB_PASSWORD,Passw0rd
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,dwhRole


## Creating Redshift, S3 and IAM clients

In [10]:
iam = boto3.client("iam",
                    region_name="us-west-2",
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET
                    )

redshift = boto3.client("redshift",
                    region_name="us-west-2",
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET
                    )

s3 = boto3.resource("s3",
                    region_name="us-west-2",
                    aws_access_key_id=config.get('AWS','KEY'),
                    aws_secret_access_key=config.get('AWS','SECRET')
                    )

ec2 = boto3.resource("ec2",
                    region_name="us-west-2",
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET
                    )

## Creating an IAM Role that makes Redshift able to access S3 bucket (ReadOnly)

In [11]:
# Create the IAM role
try:
    print('Creating a new IAM Role')
    dwhRole = iam.create_role(
           Path='/',
            RoleName="thisisthename",
            Description='Allow Redshift clusters to call AWS services on your behalf.',
            AssumeRolePolicyDocument=json.dumps(
                {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
               'Version': '2012-10-17'})
    )
except Exception as e:
    print(e)

Creating a new IAM Role


In [None]:
# Attach Policy
print('1.2 Attaching Policy')

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                      PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess",
                      )['ResponseMetadata']['HTTPStatusCode']

In [None]:
# Get the IAM role ARN
print('1.3 Get the IAM role ARN')
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)["Role"]['Arn']

# CREATING REDSHIFT CLUSTER

In [None]:
try:
    response = redshift.create_cluster(        
        # hardware
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),
        
        # identifiers & credentials
            DBName=DWH_DB,
            ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
            MasterUsername=DWH_DB_USER,
            MasterUserPassword=DWH_DB_PASSWORD,
        
        # parameter for role (to allow s3 access)
         IamRoles=[roleArn]
       
    )
except Exception as e:
    print(e)

In [None]:
# See cluster status

def prettyRedshiftProps(props):
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)


In [None]:
cluster_address = myClusterProps['Endpoint']['Address']
print('Cluster Address:', cluster_address)

In [None]:
IamRoleArn = myClusterProps['IamRoles'][0]['IamRoleArn']
print('IamRoleArn:', IamRoleArn)

## Opening an incoming TCP port to access the cluster ednpoint

In [None]:
try:
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    
    defaultSg.authorize_ingress(
        GroupName= defaultSg.group_name, 
        CidrIp='0.0.0.0/0',  
        IpProtocol='TCP',  
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

# Loading DB Params

In [None]:
configETL = configparser.ConfigParser()
configETL.read_file(open('dwh.cfg'))
LOG_DATA = configETL.get("S3","LOG_DATA")
LOGPATH = configETL.get("S3","LOG_JSONPATH")
SONG_DATA = configETL.get("S3","SONG_DATA")
IAMROLE = configETL.get("IAM_ROLE", "ARN")

## CONNECTING TO CLUSTER DB

In [None]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*configETL['CLUSTER'].values()))
cur = conn.cursor()
cur

## CONNECTING to S3 "udacitiy-dend" Bucket and Preview list of files

In [None]:
song_data = [filename.key for filename in s3.Bucket("udacity-dend").objects.filter(Prefix='song_data')]
song_data[:5]

In [None]:
log_data = [filename.key for filename in s3.Bucket("udacity-dend").objects.filter(Prefix='log_data')]
log_data[:5]

## DROP TABLES IF EXISTS

In [None]:
drop_staging_events = "DROP TABLE IF EXISTS staging_events"
drop_staging_songs = "DROP TABLE IF EXISTS staging_songs"
drop_fact_songplay = "DROP TABLE IF EXISTS fact_songplay"
drop_dim_users = "DROP TABLE IF EXISTS dim_users"
drop_dim_songs = "DROP TABLE IF EXISTS dim_songs"
drop_dim_artists = "DROP TABLE IF EXISTS dim_artists"
drop_dim_time = "DROP TABLE IF EXISTS dim_time"

tables_to_drop = [drop_staging_events,drop_staging_songs,drop_fact_songplay, 
                  drop_dim_users, drop_dim_songs,drop_dim_artists,drop_dim_time]

for table in tables_to_drop:
    cur.execute(table)
    conn.commit()
    print(table)

## DESIGNING STAGING, FACT & DIMENSION TABLES

In [None]:
# STAGING tables are used to stage before modeling into Star Schema

create_staging_events = ("""CREATE TABLE IF NOT EXISTS staging_events(
artist VARCHAR,
auth VARCHAR,
firstName VARCHAR,
gender VARCHAR,
itemInSession INTEGER,
lastName VARCHAR,
length FLOAT,
level VARCHAR,
location VARCHAR,
method VARCHAR,
page VARCHAR,
registration BIGINT,
sessionId INTEGER,
song VARCHAR,
status INTEGER,
ts TIMESTAMP,
userAgent VARCHAR,
userId INTEGER
)
""")


create_staging_songs = ("""CREATE TABLE IF NOT EXISTS staging_songs(
num_songs VARCHAR,
artist_id VARCHAR, 
artist_latitude FLOAT, 
artist_longitude FLOAT, 
artist_location VARCHAR, 
artist_name VARCHAR, 
song_id VARCHAR, 
title VARCHAR, 
duration FLOAT,
year INT
)
""")

create_fact_songplay = ("""CREATE TABLE IF NOT EXISTS fact_songplay
(
songplay_id INTEGER IDENTITY(0,1) PRIMARY KEY sortkey,
start_time TIMESTAMP,
user_id INTEGER, 
level VARCHAR, 
song_id VARCHAR,
artist_id VARCHAR,
session_id INTEGER,
location VARCHAR,
user_agent VARCHAR)
""")

create_dim_users = ("""CREATE TABLE IF NOT EXISTS dim_users
(
user_id INTEGER PRIMARY KEY distkey,
first_name VARCHAR,
last_name VARCHAR,
gender VARCHAR,
level VARCHAR)
""")

create_dim_songs = ("""CREATE TABLE IF NOT EXISTS dim_songs
(
song_id VARCHAR PRIMARY KEY,
title VARCHAR, 
artist_id VARCHAR distkey,
year INTEGER, 
duration FLOAT)
""")

create_dim_artists = ("""CREATE TABLE IF NOT EXISTS dim_artists
(
artist_id VARCHAR PRIMARY KEY distkey,
name VARCHAR, 
location VARCHAR, 
lattitude FLOAT, 
longitude FLOAT)
""")

create_dim_time = ("""CREATE TABLE IF NOT EXISTS dim_time
(
start_time TIMESTAMP PRIMARY KEY sortkey distkey, 
hour INTEGER, 
day INTEGER, 
week INTEGER, 
month INTEGER, 
year INTEGER, 
weekday INTEGER)
""")

## CREATING STAGING, FACT & DIMENSION TABLES

In [None]:
tables_to_create =[create_staging_events, create_staging_songs, create_fact_songplay, create_dim_users, create_dim_songs,
                   create_dim_artists, create_dim_time]

for table in tables_to_create:
    cur.execute(table)
    print("Table created")
    conn.commit()

### COPY staging_events

In [None]:
copy_staging_events = ("""
COPY staging_events FROM {}
CREDENTIALS 'aws_iam_role={}'
COMPUPDATE OFF region 'us-west-2'
TIMEFORMAT as 'epochmillisecs'
TRUNCATECOLUMNS BLANKSASNULL EMPTYASNULL
FORMAT AS JSON {};
""").format(LOG_DATA, IAMROLE, LOGPATH)

cur.execute(copy_staging_events)
print("staging events copied")
conn.commit()

In [None]:
# PREVIEW staging_events
##  artist, auth, firstName, gender, itemInSession, lastName, length, level, location, method, page, registration, sessionId, song, status, ts, userAgent, userId

query = cur.execute("""SELECT * FROM staging_events""")
for i in range(2):
        row = cur.fetchone()
        if row == None:
            break
        print(row)
        print("")

## COPY staging_songs

In [None]:
copy_staging_songs = ("""
COPY staging_songs FROM {}
CREDENTIALS 'aws_iam_role={}'
COMPUPDATE OFF region 'us-west-2'
FORMAT AS JSON 'auto'
TRUNCATECOLUMNS BLANKSASNULL EMPTYASNULL
""").format(SONG_DATA, IAMROLE)

cur.execute(copy_staging_songs)
print("staging songs copied")
conn.commit()

In [None]:
# PREVIEW staging_songs
## num_songs, artist_id,  artist_latitude ,  artist_longitude ,  artist_location,  artist_name,  song_id,  title,  duration, year

query = cur.execute("""SELECT * FROM staging_songs""")
for i in range(2):
        row = cur.fetchone()
        if row == None:
            break
        print(row)

## INSERT INTO FACT_songplay

In [None]:
# PREVIEW QUERY B4 INSERTING
query = cur.execute("""SELECT DISTINCT e.ts,
                e.userId as user_id,
                e.level as level,
                s.song_id as song_id,
                s.artist_id as artist_id,
                e.sessionId as session_id,
                e.location as location,
                e.userAgent as user_agent
FROM staging_events e
JOIN staging_songs s ON e.song = s.title AND e.artist = s.artist_name
WHERE e.page='NextSong'
""")

for i in range(2):
        row = cur.fetchone()
        if row == None:
            break
        print(row)
        print("")

In [None]:
cur.execute("""INSERT INTO fact_songplay (start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
                                    SELECT DISTINCT e.ts,
                                                    e.userId as user_id,
                                                    e.level as level,
                                                    s.song_id as song_id,
                                                    s.artist_id as artist_id,
                                                    e.sessionId as session_id,
                                                    e.location as location,
                                                    e.userAgent as user_agent
                                    FROM staging_events e
                                    JOIN staging_songs s ON e.song = s.title AND e.artist = s.artist_name
                                    WHERE e.page='NextSong'
                                  """)

# Preview newly created fact_songplay table
query = cur.execute("""SELECT * FROM fact_songplay""")
for i in range(3):
        row = cur.fetchone()
        if row == None:
            break
        print(row)
        print("")

## INSERT INTO dim_users 

In [None]:
# PREVIEW QUERY B4 INSERTING
query = cur.execute("""SELECT DISTINCT userId as user_id,
                firstName as first_name,
                lastName as last_name,
                gender as gender,
                level as level
FROM staging_events
where userId IS NOT NULL;
""")

for i in range(3):
        row = cur.fetchone()
        if row == None:
            break
        print(row)
        print("")

In [None]:
cur.execute("""INSERT INTO dim_users(user_id, first_name, last_name, gender, level)
                        SELECT DISTINCT userId as user_id,
                                        firstName as first_name,
                                        lastName as last_name,
                                        gender as gender,
                                        level as level
                        FROM staging_events
                        where userId IS NOT NULL;

""")

# Preview newly created dim_users table 
query = cur.execute("""SELECT * FROM dim_users""")
for i in range(3):
        row = cur.fetchone()
        if row == None:
            break
        print(row)
        print("")

## INSERT INTO dim_songs

In [None]:
# PREVIEW query b4 inserting
query = cur.execute("""SELECT DISTINCT song_id, title, artist_id, year, duration
                    FROM staging_songs
                    WHERE song_id IS NOT NULL
                    """)
for i in range(3):
        row = cur.fetchone()
        if row == None:
            break
        print(row)

In [None]:
cur.execute("""INSERT INTO dim_songs (song_id, title, artist_id, year, duration)
                SELECT DISTINCT song_id, title, artist_id, year, duration
                FROM staging_songs
                WHERE song_id IS NOT NULL
                """)


# Preview newly created dim_users table 
query = cur.execute("""SELECT * FROM dim_songs""")
for i in range(3):
        row = cur.fetchone()
        if row == None:
            break
        print(row)
        print("")


## INSERTING INTO dim_artist

In [None]:
# PREVIEW query b4 inserting
query = cur.execute("""SELECT DISTINCT artist_id, e.artist as name, s.artist_location, s.artist_latitude, s.artist_longitude
                        FROM staging_events e
                        JOIN staging_songs s ON e.artist = s.artist_name
                        WHERE e.artist IS NOT NULL
                    """)
for i in range(3):
        row = cur.fetchone()
        if row == None:
            break
        print(row)

In [None]:
cur.execute("""INSERT INTO dim_artists(artist_id, name, location, lattitude, longitude)
                   SELECT DISTINCT artist_id, e.artist as name, s.artist_location, s.artist_latitude, s.artist_longitude
                   FROM staging_events e
                   JOIN staging_songs s ON e.artist = s.artist_name
                   WHERE e.artist IS NOT NULL
            """)

# Preview newly created dim_artist table 
query = cur.execute("""SELECT * FROM dim_artists""")
for i in range(3):
        row = cur.fetchone()
        if row == None:
            break
        print(row)
        print("")

## INSERTING INTO dim_time

In [None]:
# Preview query to insert
cur.execute("""SELECT DISTINCT ts, 
               extract(h from ts) AS hour, 
               extract(d from ts) AS day, 
               extract(w from ts) AS week, 
               extract(mon from ts) AS month, 
               extract(year from ts) AS year, 
               extract(dow from ts) AS weekday
               FROM staging_events WHERE ts IS NOT NULL
""")
cur.fetchone()

In [None]:
cur.execute("""INSERT INTO dim_time(start_time, hour, day, week, month, year, weekday)
               SELECT DISTINCT ts, 
               extract(h from ts) AS hour, 
               extract(d from ts) AS day, 
               extract(w from ts) AS week, 
               extract(mon from ts) AS month, 
               extract(year from ts) AS year, 
               extract(dow from ts) AS weekday
               FROM staging_events WHERE ts IS NOT NULL
            """)

# Preview newly created dim_time table 
query = cur.execute("""SELECT * FROM dim_time""")
for i in range(3):
        row = cur.fetchone()
        if row == None:
            break
        print(row)
        print("")

In [None]:
# Join each dim table to the fact table and preview it

cur.execute("""SELECT * FROM fact_songplay fs
JOIN dim_users on fs.user_id = dim_users.user_id
JOIN dim_artists on fs.artist_id = dim_artists.artist_id
JOIN dim_songs on fs.song_id = dim_songs.song_id
JOIN dim_time on fs.start_time = dim_time.start_time
""")
df = pd.DataFrame(cur.fetchone()).T


for value in list(df.values[0]):
    print(value)
df

# Delete your cluster and resources after no longer needed

In [None]:
# Delete Cluster
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)

In [None]:
# Check deletion status
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)


In [12]:
# Detach role policy & DELETE role
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName="DWH_IAM_ROLE_NAME")

{'ResponseMetadata': {'RequestId': '7b0f3bf9-c99b-4b0e-a6b1-6c3871407371',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '7b0f3bf9-c99b-4b0e-a6b1-6c3871407371',
   'content-type': 'text/xml',
   'content-length': '200',
   'date': 'Tue, 03 Aug 2021 17:41:04 GMT'},
  'RetryAttempts': 0}}