In [1]:
import pandas as pd
import boto3
import json
import configparser

Pre-step1: load data warehouse parameters

In [2]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")
DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")

DB_NAME                = config.get("DWH","DB_NAME")
DB_USER                = config.get("DWH","DB_USER")
DB_PASSWORD            = config.get("DWH","DB_PASSWORD")
DB_PORT                = config.get("DWH","DB_PORT")

IAM_ROLE_NAME          = config.get("DWH", "IAM_ROLE_NAME")


pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT", "IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DB_NAME, DB_USER, DB_PASSWORD, DB_PORT, IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhCluster
4,DB_NAME,dwh
5,DB_USER,dwhuser
6,DB_PASSWORD,Passw0rd
7,DB_PORT,5439
8,IAM_ROLE_NAME,dwhRole


Pre-step 2: Create clients for IAM, EC2, S3, and Redshift

In [3]:
import boto3

ec2 = boto3.resource('ec2',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

Pre-step3: check out the data sources on S3

In [4]:
data_sources = s3.Bucket("udacity-dend")
log_data = [file for file in data_sources.objects.filter(Prefix="log-data")]
log_data[:5]

[s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-01-events.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-02-events.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-03-events.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='log-data/2018/11/2018-11-04-events.json')]

In [5]:
song_data = [file for file in data_sources.objects.filter(Prefix="song-data/A/A/B")]
song_data[:5]

[s3.ObjectSummary(bucket_name='udacity-dend', key='song-data/A/A/B/TRAABCL128F4286650.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song-data/A/A/B/TRAABDL12903CAABBA.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song-data/A/A/B/TRAABEV12903CC53A4.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song-data/A/A/B/TRAABFH128F92C812E.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song-data/A/A/B/TRAABGU12903CC8DCF.json')]

Step 1: IAM ROLE

Create an iam role that enables redshift to access S2 bucket read only.

In [6]:
from botocore.exceptions import ClientError

#1.1 Create the role, 
print("1.1 Creating a new IAM Role") 
dwhRole = iam.create_role(
    Path='/',
    RoleName=IAM_ROLE_NAME,
    Description = "Allows Redshift clusters to call AWS services on your behalf.",
    AssumeRolePolicyDocument=json.dumps(
        {'Statement': [{'Action': 'sts:AssumeRole',
                        'Effect': 'Allow',
                        'Principal': {'Service': 'redshift.amazonaws.com'}
                       }],
        'Version': '2012-10-17'})
)

1.1 Creating a new IAM Role


In [7]:
# 1.2 Attach the policy   
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

1.2 Attaching Policy


200

In [8]:
print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=IAM_ROLE_NAME)['Role']['Arn']

roleArn

1.3 Get the IAM role ARN


'arn:aws:iam::156097180916:role/dwhRole'

Step 2: Redshift Cluster

2.1 Create a redshift cluster

In [9]:
response = redshift.create_cluster(        
        #HW
    ClusterType=DWH_CLUSTER_TYPE,
    NodeType=DWH_NODE_TYPE,
    NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
    DBName=DB_NAME,
    ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
    MasterUsername=DB_USER,
    MasterUserPassword=DB_PASSWORD,
        
    #Roles (for s3 access)
    IamRoles=[roleArn]  
    )

2.2 Describe the cluster and check its status

In [10]:
def RedshiftProperties(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

In [12]:
ClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
RedshiftProperties(ClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.cgwvtrmff8lt.us-west-2.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-05514a400a5909259
7,NumberOfNodes,4


2.3 Cluster endpoint and role ARN 

In [13]:
DWH_ENDPOINT = ClusterProps['Endpoint']['Address']
DWH_ENDPOINT

'dwhcluster.cgwvtrmff8lt.us-west-2.redshift.amazonaws.com'

In [14]:
DWH_ROLE_ARN = ClusterProps['IamRoles'][0]['IamRoleArn']
DWH_ROLE_ARN

'arn:aws:iam::156097180916:role/dwhRole'

2.4: Open an incoming TCP port to access the cluster endpoint

In [15]:
try:
    vpc = ec2.Vpc(id=ClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.all())[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DB_PORT),
        ToPort=int(DB_PORT)
        )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-08dd3b472fbff46c0')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


2.5: Connect to redshift

In [16]:
%load_ext sql

In [17]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DB_USER, DB_PASSWORD, DWH_ENDPOINT, DB_PORT, DB_NAME)
print(conn_string)
%sql $conn_string

postgresql://dwhuser:Passw0rd@dwhcluster.cgwvtrmff8lt.us-west-2.redshift.amazonaws.com:5439/dwh


'Connected: dwhuser@dwh'

STEP 3 Load and Transform data

- Create tables: staging_events, staging_songs, songplay, users, song, artist, time.
- Extract source data in S3 and stage them to staging_events and staging_songs in redshift
- Transform the data into a set of dimensional tables, i.e. songplay, users, song, artist, time
- songplay is a fact table, while users, song, artist and time are dimensional tables.

In [18]:
import configparser


# CONFIG
config = configparser.ConfigParser()
config.read('dwh.cfg')

IAM_ARN  = config.get('IAM_ROLE', 'ARN')
LOG_DATA = config.get('S3', 'LOG_DATA')
LOG_JSONPATH = config.get('S3', 'LOG_JSONPATH')
SONG_DATA = config.get('S3', 'SONG_DATA')


# DROP TABLES

staging_events_table_drop = "DROP TABLE IF EXISTS staging_events"
staging_songs_table_drop = "DROP TABLE IF EXISTS staging_songs"
songplay_table_drop = "DROP TABLE IF EXISTS songplay"
user_table_drop = "DROP TABLE IF EXISTS users"
song_table_drop = "DROP TABLE IF EXISTS song"
artist_table_drop = "DROP TABLE IF EXISTS artist"
time_table_drop = "DROP TABLE IF EXISTS time"

# CREATE TABLES

staging_events_table_create= ("""
CREATE TABLE IF NOT EXISTS staging_events
(
artist          VARCHAR,
auth            VARCHAR,
firstName       VARCHAR,
gender          VARCHAR,
itemInSession   INTEGER,
lastName        VARCHAR,
length          FLOAT,
level           VARCHAR,
location        VARCHAR,
method          VARCHAR,
page            VARCHAR,
registration    BIGINT,
sessionId       INTEGER,
song            VARCHAR,
status          INTEGER,
ts              TIMESTAMP,
userAgent       VARCHAR,
userId          INTEGER
);
""")

staging_songs_table_create = ("""
CREATE TABLE IF NOT EXISTS staging_songs
(num_songs        INTEGER,
artist_id         VARCHAR,
artist_latitude   FLOAT,
artist_longitude  FLOAT,
artist_location   VARCHAR,
artist_name       VARCHAR,
song_id           VARCHAR,
title             VARCHAR,
duration          FLOAT,
year              INTEGER
);
""")

songplay_table_create = ("""
CREATE TABLE IF NOT EXISTS songplay
(
songplay_id      INTEGER IDENTITY(0,1) PRIMARY KEY, 
start_time       TIMESTAMP,
user_id          INTEGER, 
level            VARCHAR,
song_id          VARCHAR, 
artist_id        VARCHAR, 
session_id       INTEGER, 
location         VARCHAR, 
user_agent       VARCHAR);
""")

user_table_create = ("""
CREATE TABLE IF NOT EXISTS users
(
user_id     INTEGER, 
first_name  VARCHAR, 
last_name   VARCHAR, 
gender      VARCHAR, 
level       VARCHAR);
""")

song_table_create = ("""
CREATE TABLE IF NOT EXISTS song
(
song_id     VARCHAR PRIMARY KEY, 
title       VARCHAR, 
artist_id   VARCHAR, 
year        INTEGER, 
duration    FLOAT
);
""")

artist_table_create = ("""
CREATE TABLE IF NOT EXISTS artist
(
artist_id    VARCHAR PRIMARY KEY, 
name         VARCHAR, 
location     VARCHAR, 
lattitude    FLOAT, 
longitude    FLOAT
);
""")

time_table_create = ("""
CREATE TABLE IF NOT EXISTS time
(
start_time   TIMESTAMP PRIMARY KEY, 
hour         INTEGER, 
day          INTEGER, 
week         INTEGER, 
month        INTEGER, 
year         INTEGER, 
weekday      INTEGER);
""")

# STAGING TABLES

staging_events_copy = ("""

copy staging_events from {}
credentials 'aws_iam_role={}'
compupdate off region 'us-west-2'
timeformat as 'epochmillisecs'
json {} 
;
""").format(LOG_DATA, IAM_ARN, LOG_JSONPATH)

staging_songs_copy = ("""
copy staging_songs from {}
credentials 'aws_iam_role={}'
compupdate off region 'us-west-2'
timeformat as 'epochmillisecs'
json 'auto';
""").format(SONG_DATA, IAM_ARN)

# FINAL TABLES

songplay_table_insert = ("""
INSERT INTO songplay(start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
SELECT ts as start_time,
       userId  as user_id,
       level,
       song_id,
       artist_id,
       sessionId as session_id,
       location,
       userAgent as user_agent
FROM staging_events e
left join staging_songs s on e.song = s.title and e.artist = s.artist_name
WHERE page = 'NextSong'
""")

user_table_insert = ("""
INSERT INTO users(user_id, first_name, last_name, gender, level)
SELECT DISTINCT userId  as user_id, 
       firstName as first_name,
       lastName as last_name,
       gender,
       level
FROM staging_events
WHERE userId IS NOT NULL
""")

song_table_insert = ("""
INSERT INTO song(song_id, title, artist_id, year, duration)
SELECT DISTINCT song_id,
       title,
       artist_id,
       year,
       duration
FROM staging_songs
WHERE song_id IS NOT NULL
""")

artist_table_insert = ("""
INSERT INTO artist (artist_id, name, location, lattitude, longitude)
SELECT DISTINCT artist_id,
       artist_name as name,
       artist_location as location,
       artist_latitude as lattitude,
       artist_longitude as longitude
FROM staging_songs
WHERE artist_id IS NOT NULL
""")

time_table_insert = ("""
INSERT INTO time(start_time, hour, day, week, month, year, weekday)
SELECT DISTINCT start_time,
       EXTRACT(hour from start_time) as hour,
       EXTRACT(day from start_time) as day,
       EXTRACT(week from start_time) as week,
       EXTRACT(month from start_time) as month,
       EXTRACT(year from start_time) as year,
       EXTRACT(weekday from start_time) as weekday
FROM songplay
WHERE start_time IS NOT NULL
""")

# QUERY LISTS

create_table_queries = [staging_events_table_create, staging_songs_table_create, songplay_table_create, user_table_create, song_table_create, artist_table_create, time_table_create]
drop_table_queries = [staging_events_table_drop, staging_songs_table_drop, songplay_table_drop, user_table_drop, song_table_drop, artist_table_drop, time_table_drop]
copy_table_queries = [staging_events_copy, staging_songs_copy]
insert_table_queries = [songplay_table_insert, user_table_insert, song_table_insert, artist_table_insert, time_table_insert]


In [19]:
import configparser
import psycopg2


def drop_tables(cur, conn):
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()


def create_tables(cur, conn):
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()


def main():
    config = configparser.ConfigParser()
    config.read('dwh.cfg')

    conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
    cur = conn.cursor()

    drop_tables(cur, conn)
    create_tables(cur, conn)

    conn.close()


if __name__ == "__main__":
    main()

In [20]:
import configparser
import psycopg2


def load_staging_tables(cur, conn):
    for query in copy_table_queries:
        cur.execute(query)
        conn.commit()


def insert_tables(cur, conn):
    for query in insert_table_queries:
        cur.execute(query)
        conn.commit()


def main():
    config = configparser.ConfigParser()
    config.read('dwh.cfg')

    conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
    cur = conn.cursor()
    
    load_staging_tables(cur, conn)
    insert_tables(cur, conn)

    conn.close()


if __name__ == "__main__":
    main()

Step 4 Test the data

In [21]:
%sql select count(*) from staging_events

 * postgresql://dwhuser:***@dwhcluster.cgwvtrmff8lt.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
8056


In [22]:
%sql select count(*) from staging_songs

 * postgresql://dwhuser:***@dwhcluster.cgwvtrmff8lt.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
14896


In [23]:
%sql select count(*) from songplay

 * postgresql://dwhuser:***@dwhcluster.cgwvtrmff8lt.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
6820


In [24]:
%%sql
select count(*)
from staging_events e
left join staging_songs s on e.song = s.title and e.artist = s.artist_name
where page= 'NextSong'

 * postgresql://dwhuser:***@dwhcluster.cgwvtrmff8lt.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
6820


In [25]:
%sql select count(*) from users

 * postgresql://dwhuser:***@dwhcluster.cgwvtrmff8lt.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
105


In [26]:
%sql select count(*) from artist

 * postgresql://dwhuser:***@dwhcluster.cgwvtrmff8lt.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
10025


In [27]:
%sql select count(*) from song

 * postgresql://dwhuser:***@dwhcluster.cgwvtrmff8lt.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
14896


In [28]:
%sql select count(*) from time

 * postgresql://dwhuser:***@dwhcluster.cgwvtrmff8lt.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
6813


In [41]:
%%sql
select hour, count(*) as song_plays
from songplay sp left join users u on sp. user_id = u.user_id 
left join time on sp.start_time = time.start_time
where u.level = 'free'
group by hour
order by hour

 * postgresql://dwhuser:***@dwhcluster.cgwvtrmff8lt.us-west-2.redshift.amazonaws.com:5439/dwh
24 rows affected.


hour,song_plays
0,67
1,65
2,71
3,81
4,96
5,101
6,135
7,117
8,103
9,141


In [53]:
%%sql
with cte as (select sp.user_id, count(*) as activity_counts
from songplay sp left join users u on sp.user_id = u.user_id
where u.level = 'free'
group by sp.user_id)
select cte.user_id, first_name, last_name, activity_counts
from cte, users u
where cte.user_id = u.user_id and level='free'
order by 4 desc
limit 20

 * postgresql://dwhuser:***@dwhcluster.cgwvtrmff8lt.us-west-2.redshift.amazonaws.com:5439/dwh
20 rows affected.


user_id,first_name,last_name,activity_counts
49,Chloe,Cuevas,689
80,Tegan,Levine,665
15,Lily,Koch,463
29,Jacqueline,Lynch,346
88,Mohammad,Rodriguez,270
36,Matthew,Jones,248
16,Rylan,George,223
85,Kinsley,Young,179
26,Ryan,Smith,114
32,Lily,Burns,56


STEP 5 Clean up resources

In [61]:
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)

ClusterNotFoundFault: An error occurred (ClusterNotFound) when calling the DeleteCluster operation: Cluster dwhcluster not found.

In [62]:
ClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
RedshiftProperties(ClusterProps)

ClusterNotFoundFault: An error occurred (ClusterNotFound) when calling the DescribeClusters operation: Cluster dwhcluster not found.

In [63]:
iam.detach_role_policy(RoleName=IAM_ROLE_NAME, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")

NoSuchEntityException: An error occurred (NoSuchEntity) when calling the DetachRolePolicy operation: The role with name dwhRole cannot be found.

In [64]:
iam.delete_role(RoleName=IAM_ROLE_NAME)

NoSuchEntityException: An error occurred (NoSuchEntity) when calling the DeleteRole operation: The role with name dwhRole cannot be found.