# cloud_data_wh_project_sparkify
<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/9/93/Amazon_Web_Services_Logo.svg/2560px-Amazon_Web_Services_Logo.svg.png" width="100" height="100">

In [13]:
import json
import configparser

import pandas as pd

#!pip install boto3
import boto3
%load_ext sql

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


## 1.2. Load configuration from a file

In [None]:
# Load configuration file
config = configparser.ConfigParser()
config.read_file(open("dwh.cfg"))

# AWS Credentials
aws_key = config.get("AWS", "KEY")
aws_secret = config.get("AWS", "SECRET")

# AWS Region
dwh_region = config.get("DWH", "DWH_REGION")

# IAM Role
dwh_iam_role_name = config.get("DWH", "DWH_IAM_ROLE_NAME")

# Redshift Cluster Configuration
dwh_cluster_type = config.get("DWH", "DWH_CLUSTER_TYPE")
dwh_node_type = config.get("DWH", "DWH_NODE_TYPE")
dwh_num_nodes = config.get("DWH", "DWH_NUM_NODES")
dwh_cluster_identifier = config.get("DWH", "DWH_CLUSTER_IDENTIFIER")

# Database Configuration
dwh_db = config.get("DWH", "DWH_DB")
dwh_db_user = config.get("DWH", "DWH_DB_USER")
dwh_db_password = config.get("DWH", "DWH_DB_PASSWORD")
dwh_port = config.get("DWH", "DWH_PORT")

# Display parameters in a DataFrame
pd.DataFrame(
    {
        "Param": [
            "DWH_REGION",
            "DWH_IAM_ROLE_NAME",
            "DWH_CLUSTER_TYPE",
            "DWH_NODE_TYPE",
            "DWH_NUM_NODES",
            "DWH_CLUSTER_IDENTIFIER",
            "DWH_DB",
            "DWH_DB_USER",
            "DWH_DB_PASSWORD",
            "DWH_PORT",
        ],
        "Value": [
            dwh_region,
            dwh_iam_role_name,
            dwh_cluster_type,
            dwh_node_type,
            dwh_num_nodes,
            dwh_cluster_identifier,
            dwh_db,
            dwh_db_user,
            dwh_db_password,
            dwh_port,
        ],
    }
)

# 2. Initialize AWS services

In [3]:
iam = boto3.client(
    "iam",
    region_name=dwh_region,
    aws_access_key_id=aws_key,
    aws_secret_access_key=aws_secret,
)

ec2 = boto3.resource(
    "ec2",
    region_name=dwh_region,
    aws_access_key_id=aws_key,
    aws_secret_access_key=aws_secret,
)

s3 = boto3.resource(
    "s3",
    region_name=dwh_region,
    aws_access_key_id=aws_key,
    aws_secret_access_key=aws_secret,
)

redshift = boto3.client(
    "redshift",
    region_name=dwh_region,
    aws_access_key_id=aws_key,
    aws_secret_access_key=aws_secret,
)

## 2.1. IAM Role Setup
Create an IAM role that allows Redshift to access S3 bucket (ReadOnly)

In [4]:
# Create the role
try:
    print("1.1 Creating a new IAM Role")
    dwh_role = iam.create_role(
        Path="/",
        RoleName=dwh_iam_role_name,
        Description="Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps({
            "Statement": [{
                "Action": "sts:AssumeRole",
                "Effect": "Allow",
                "Principal": {"Service": "redshift.amazonaws.com"}
            }],
            "Version": "2012-10-17"
        })
    )
except Exception as e:
    print(e)

# Attach Policy
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=dwh_iam_role_name,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

# Get and print the IAM role ARN
print("1.3 Get the IAM role ARN")
role_arn = iam.get_role(RoleName=dwh_iam_role_name)["Role"]["Arn"]

print(role_arn)

1.1 Creating a new IAM Role
An error occurred (EntityAlreadyExists) when calling the CreateRole operation: Role with name dwhRole already exists.
1.2 Attaching Policy
1.3 Get the IAM role ARN
arn:aws:iam::905621050021:role/dwhRole


## 2.2. S3: check sample data, verify dataset presence.  
The code lists and prints objects in the S3 bucket "udacity" that have keys starting with "tickets".

In [5]:
sample_song_db_bucket = s3.Bucket("udacity-dend")

for obj in sample_song_db_bucket.objects.filter(Prefix="song_data"):
    print(obj)

# Uncomment the following lines to list all objects in the bucket
#for obj in sample_db_bucket.objects.all():
#    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAK128F9318786.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAV128F421A322.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAABD128F429CF47.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAACN128F9355673.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEA128F935A30D.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAED128E0783FAB.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEM128F93347B9.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEW128F42930C0.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAFD128F92F423A.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAGR128F425B14B.json')
s3.ObjectSummary(

KeyboardInterrupt: 

In [6]:
sample_log_db_bucket = s3.Bucket("udacity-dend")

for obj in sample_log_db_bucket.objects.filter(Prefix="log_data"):
    print(obj)

# Uncomment the following lines to list all objects in the bucket
#for obj in sample_db_bucket.objects.all():
#    print(obj)

s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-01-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-02-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-03-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-04-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-05-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-06-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-07-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-08-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-09-events.json')
s3.ObjectSummary(bucket_name='udacity-dend', key='log_data/2018/11/2018-11-10-events.json')
s3.ObjectSummary(b

## 2.3. Redshift: create cluster

In [7]:
try:
    response = redshift.create_cluster(
        # Parameters for hardware
        ClusterType=dwh_cluster_type,
        NodeType=dwh_node_type,
        NumberOfNodes=int(dwh_num_nodes),

        # Parameters for identifiers & credentials
        ClusterIdentifier=dwh_cluster_identifier,
        DBName=dwh_db,
        MasterUsername=dwh_db_user,
        MasterUserPassword=dwh_db_password,

        # Parameter for role (to allow S3 access)
        IamRoles=[role_arn],

        # Make the cluster publicly accessible
        PubliclyAccessible=True  
    )
except Exception as e:
    print(e)

An error occurred (ClusterAlreadyExists) when calling the CreateCluster operation: Cluster already exists


## 2.4. Monitor cluster status
Format and display the cluster properties in a DataFrame. Run this block multiple times until **ClusterStatus** is **Available**.

In [76]:
def pretty_redshift_props(props):
    pd.set_option("display.max_colwidth", None)
    
    keys_to_show = [
        "ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", 
        "DBName", "Endpoint", "NumberOfNodes", "VpcId", "PubliclyAccessible"
    ]
    
    x = [(k, v) for k, v in props.items() if k in keys_to_show]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

my_cluster_props = redshift.describe_clusters(ClusterIdentifier=dwh_cluster_identifier)["Clusters"][0]
pretty_redshift_props(my_cluster_props)

Unnamed: 0,Key,Value
0,ClusterIdentifier,dwhcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,dwhuser
4,DBName,dwh
5,Endpoint,"{'Address': 'dwhcluster.cohyd9mb0d9g.us-east-1.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-0953b28bef8130913
7,NumberOfNodes,1
8,PubliclyAccessible,True


## 2.5 Take note of endpoint & IAM role
Do not run this unless **ClusterStatus** is **Available**.

In [10]:
dwh_endpoint = my_cluster_props["Endpoint"]["Address"]
dwh_role_arn = my_cluster_props["IamRoles"][0]["IamRoleArn"]

print("DWH_ENDPOINT:", dwh_endpoint)
print("DWH_ROLE_ARN:", dwh_role_arn)

DWH_ENDPOINT: dwhcluster.cohyd9mb0d9g.us-east-1.redshift.amazonaws.com
DWH_ROLE_ARN: arn:aws:iam::905621050021:role/dwhRole


# 3. Open network access
## 3.1 Allow inbound traffic to Redshift by updating the security group.

In [11]:
try:
    vpc = ec2.Vpc(id=my_cluster_props["VpcId"])
    default_sg = list(vpc.security_groups.all())[0]
    print(default_sg)

    default_sg.authorize_ingress(
        GroupName=default_sg.group_name,
        CidrIp="0.0.0.0/0",
        IpProtocol="TCP",
        FromPort=int(dwh_port),
        ToPort=int(dwh_port),
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-0a14bf0e53b9d569a')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


## 3.2. PostgreSQL: connect to the cluster

In [12]:
conn_string = f"postgresql://{dwh_db_user}:{dwh_db_password}@{dwh_endpoint}:{dwh_port}/{dwh_db}?sslmode=verify-full&sslrootcert=system"
%sql $conn_string
%sql SELECT current_user;

 * postgresql://dwhuser:***@dwhcluster.cohyd9mb0d9g.us-east-1.redshift.amazonaws.com:5439/dwh?sslmode=verify-full&sslrootcert=system
1 rows affected.


current_user
dwhuser


# 4. Data ingestion into Redshift
## 4.1. Create tables with a distribution strategy in the song_play_analysis schema

In [220]:
%%sql

CREATE SCHEMA IF NOT EXISTS song_play_analysis;
SET search_path TO song_play_analysis;

DROP TABLE IF EXISTS staging_events;
DROP TABLE IF EXISTS staging_songs;
DROP TABLE IF EXISTS users;
DROP TABLE IF EXISTS songs;
DROP TABLE IF EXISTS artists;
DROP TABLE IF EXISTS time;
DROP TABLE IF EXISTS songplays;

CREATE TABLE staging_events (
    artist varchar(max),
    auth varchar(10),
    firstName varchar(25),
    gender varchar(1),
    itemInSession integer sortkey distkey,
    lastName varchar(25),
    length float,
    level varchar(4),
    location varchar(max),
    method varchar(4),
    page varchar(max),
    registration float,
    sessionId integer,
    song varchar(max),
    status smallint,
    ts bigint,
    user_agent varchar(max),
    user_id integer
);

CREATE TABLE staging_songs (
    num_songs integer sortkey distkey,
    artist_id varchar(20),
    artist_latitude float,
    artist_longitude float,
    artist_location varchar(max),
    artist_name varchar(max),
    song_id varchar(20),
    title varchar(max),
    duration float,
    year smallint
);

CREATE TABLE songplays (
    songplay_id integer identity(0,1) not null sortkey distkey,
    start_time timestamp not null,
    user_id integer not null,
    level varchar(4) not null,
    song_id varchar(20) not null,
    artist_id varchar(20) not null,
    session_id integer not null,
    location varchar(max),
    user_agent varchar(max) not null
);

CREATE TABLE users (
    user_id integer not null sortkey distkey,
    first_name varchar(25) not null,
    last_name varchar(25) not null,
    gender varchar(1) not null,
    level varchar(4) not null
);

CREATE TABLE songs (
    song_id varchar(20) not null sortkey distkey,
    title varchar(max) not null,
    artist_id varchar(20) not null,
    year smallint not null,
    duration float not null
);

CREATE TABLE artists (
    artist_id varchar(20) not null sortkey distkey,
    name varchar(max),
    location varchar(max),
    latitude float,
    longitude float
);

CREATE TABLE time (
    time_id integer identity(0,1) not null sortkey distkey,
    start_time timestamp not null,
    hour smallint not null,
    day smallint not null,
    week smallint not null,
    month smallint not null,
    year smallint not null,
    weekday integer not null
);

 * postgresql://dwhuser:***@dwhcluster.cohyd9mb0d9g.us-east-1.redshift.amazonaws.com:5439/dwh?sslmode=verify-full&sslrootcert=system
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.


[]

## 4.2. Load partitioned events and songs data into the cluster

In [221]:
qry = """
COPY staging_events
    FROM 's3://udacity-dend/log_data/'
    credentials 'aws_iam_role={}'
    JSON 's3://udacity-dend/log_json_path.json'
    COMPUPDATE off 
    REGION 'us-west-2'
""".format(dwh_role_arn)

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.cohyd9mb0d9g.us-east-1.redshift.amazonaws.com:5439/dwh?sslmode=verify-full&sslrootcert=system
Done.


[]

In [222]:
qry = """
    COPY staging_songs 
    FROM 's3://udacity-dend/song_data/'
    credentials 'aws_iam_role={}'
    JSON 'auto'
    COMPUPDATE off 
    REGION 'us-west-2'
""".format(dwh_role_arn)

%sql $qry

 * postgresql://dwhuser:***@dwhcluster.cohyd9mb0d9g.us-east-1.redshift.amazonaws.com:5439/dwh?sslmode=verify-full&sslrootcert=system
Done.


[]

## 4.3. Check for data loading errors

In [None]:
%%sql
SELECT *
FROM stl_load_errors
ORDER BY starttime DESC
LIMIT 5;

## 4.4. Check for created and loaded tables

In [150]:
%sql SELECT tablename FROM pg_tables WHERE schemaname = 'song_play_analysis';

 * postgresql://dwhuser:***@dwhcluster.cohyd9mb0d9g.us-east-1.redshift.amazonaws.com:5439/dwh?sslmode=verify-full&sslrootcert=system
7 rows affected.


tablename
staging_events
staging_songs
songplays
users
songs
time
artists


# 5. Insert data into the 5 tables

In [223]:
%%sql

INSERT INTO songplays (
    start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
)
SELECT
    TIMESTAMP 'epoch' + se.ts/1000 * INTERVAL '1 second' AS start_time,
    se.user_id,
    se.level,
    ss.song_id,
    ss.artist_id,
    se.sessionId,
    se.location,
    se.user_agent
FROM staging_events se
JOIN staging_songs ss
  ON se.song = ss.title AND se.artist = ss.artist_name
WHERE se.page = 'NextSong';

INSERT INTO users (
    user_id, first_name, last_name, gender, level
)
SELECT DISTINCT
    user_id,
    firstName,
    lastName,
    gender,
    level
FROM staging_events
WHERE page = 'NextSong' AND user_id IS NOT NULL;

INSERT INTO songs (
    song_id, title, artist_id, year, duration
)
SELECT DISTINCT
    song_id,
    title,
    artist_id,
    year,
    duration
FROM staging_songs;

INSERT INTO artists (
    artist_id, name, location, latitude, longitude
)
SELECT DISTINCT
    artist_id,
    artist_name AS name,
    artist_location AS location,
    artist_latitude AS latitude,
    artist_longitude AS longitude
FROM staging_songs;

INSERT INTO time (
    start_time, hour, day, week, month, year, weekday
)
SELECT DISTINCT
    start_time,
    EXTRACT(hour FROM start_time),
    EXTRACT(day FROM start_time),
    EXTRACT(week FROM start_time),
    EXTRACT(month FROM start_time),
    EXTRACT(year FROM start_time),
    EXTRACT(weekday FROM start_time)
FROM (
    SELECT TIMESTAMP 'epoch' + ts/1000 * INTERVAL '1 second' AS start_time
    FROM staging_events
    WHERE page = 'NextSong'
) AS time_data;

 * postgresql://dwhuser:***@dwhcluster.cohyd9mb0d9g.us-east-1.redshift.amazonaws.com:5439/dwh?sslmode=verify-full&sslrootcert=system
333 rows affected.
104 rows affected.
14896 rows affected.
10025 rows affected.
6813 rows affected.


[]

In [225]:
%%sql
SET search_path TO song_play_analysis;

SELECT table_name, column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_schema = 'song_play_analysis'
ORDER BY table_name, column_name;

 * postgresql://dwhuser:***@dwhcluster.cohyd9mb0d9g.us-east-1.redshift.amazonaws.com:5439/dwh?sslmode=verify-full&sslrootcert=system
Done.
60 rows affected.


table_name,column_name,data_type,is_nullable,column_default
artists,artist_id,character varying,NO,
artists,latitude,double precision,YES,
artists,location,character varying,YES,
artists,longitude,double precision,YES,
artists,name,character varying,YES,
songplays,artist_id,character varying,NO,
songplays,level,character varying,NO,
songplays,location,character varying,YES,
songplays,session_id,integer,NO,
songplays,song_id,character varying,NO,


# 6. Data analysis
## 6.1. Top 5 most played artists

In [226]:
%%sql
SELECT a.name, COUNT(sp.artist_id) AS artist_count
FROM artists a
JOIN songplays sp ON a.artist_id = sp.artist_id
GROUP BY a.name
ORDER BY artist_count DESC
LIMIT 5;

 * postgresql://dwhuser:***@dwhcluster.cohyd9mb0d9g.us-east-1.redshift.amazonaws.com:5439/dwh?sslmode=verify-full&sslrootcert=system
5 rows affected.


name,artist_count
Dwight Yoakam,37
Kid Cudi / Kanye West / Common,10
Kid Cudi,10
Lonnie Gordon,9
Ron Carter,9


## 6.2. Top listeners by gender

In [227]:
%%sql
SELECT gender, COUNT(gender) AS gender_count
FROM staging_events
WHERE page = 'NextSong'
GROUP BY gender
ORDER BY gender_count DESC;

 * postgresql://dwhuser:***@dwhcluster.cohyd9mb0d9g.us-east-1.redshift.amazonaws.com:5439/dwh?sslmode=verify-full&sslrootcert=system
2 rows affected.


gender,gender_count
F,4887
M,1933


## 6.3. Listening peak hours

In [228]:
%%sql
SELECT hour, COUNT(hour) AS peak_hour_count
FROM time
GROUP BY hour
ORDER BY peak_hour_count DESC
LIMIT 5;

 * postgresql://dwhuser:***@dwhcluster.cohyd9mb0d9g.us-east-1.redshift.amazonaws.com:5439/dwh?sslmode=verify-full&sslrootcert=system
5 rows affected.


hour,peak_hour_count
16,541
18,497
17,493
15,477
14,430


## 6.3.1. Listening peak hours by gender

In [243]:
%%sql
SELECT se.gender, t.hour, COUNT(*) AS peak_hour_count
FROM songplays sp
JOIN time t ON sp.start_time = t.start_time
JOIN staging_events se ON sp.session_id = se.sessionId
WHERE se.page = 'NextSong'
GROUP BY se.gender, t.hour
ORDER BY peak_hour_count DESC
LIMIT 15;

 * postgresql://dwhuser:***@dwhcluster.cohyd9mb0d9g.us-east-1.redshift.amazonaws.com:5439/dwh?sslmode=verify-full&sslrootcert=system
15 rows affected.


gender,hour,peak_hour_count
F,17,1188
F,15,832
F,20,794
F,14,714
F,18,709
F,16,616
F,19,609
F,11,517
F,7,500
F,21,482


# 6. Clean up AWS resources
## 6.1. Delete cluster

In [None]:
redshift.delete_cluster( ClusterIdentifier=dwh_cluster_identifier, SkipFinalClusterSnapshot=True)

Run the following block several times until the cluster is deleted = `ClusterNotFoundFault`.

In [None]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=dwh_cluster_identifier)['Clusters'][0]
pretty_redshift_props(myClusterProps)

## 6.2. Delete IAM role

In [None]:
iam.detach_role_policy(RoleName=dwh_iam_role_name, PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess")
iam.delete_role(RoleName=dwh_iam_role_name)