In [None]:
%load_ext sql

In [None]:
from time import time
import configparser
import matplotlib.pyplot as plt
import pandas as pd
import boto3

# STEP 1: Get the params of the created redshift cluster 

In [None]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))
KEY=config.get('AWS','KEY')
SECRET= config.get('AWS','SECRET')
DWH_DB= config.get("CLUSTER","DB_NAME")
DWH_ENDPOINT = config.get('CLUSTER', 'END_POINT')
DWH_DB_USER= config.get("CLUSTER","DB_USER")
DWH_DB_PASSWORD= config.get("CLUSTER","DB_PASSWORD")
DWH_PORT = config.get("CLUSTER","DB_PORT")
DWH_ROLE_ARN = config.get("IAM_ROLE", "ARN")

# STEP 2: Connect to the Redshift Cluster

In [None]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

# STEP 3: Create Staging Tables

In [None]:
%%sql 
DROP TABLE IF EXISTS staging_songs;
CREATE TABLE "staging_songs" (
    "num_songs" INTEGER ,
    "artist_id" VARCHAR(60),
    "artist_latitude" VARCHAR(50),
    "artist_longitude" VARCHAR(50),
    "artist_location" VARCHAR(200),
    "artist_name" VARCHAR(100),
    "song_id" VARCHAR(60),
    "title" VARCHAR(200),
    "duration" numeric(10,5),
    "year" INTEGER
);

DROP TABLE IF EXISTS staging_events;
CREATE TABLE "staging_events" (
    "artist" VARCHAR(200),
    "auth" VARCHAR(12),
    "firstName" VARCHAR(30),
    "gender" VARCHAR(1),
    "itemInSession" SMALLINT,
    "lastName" VARCHAR(30),
    "length" numeric(10,5),
    "level" VARCHAR(6),
    "location" VARCHAR(300), 
    "method" VARCHAR(3),
    "page" VARCHAR(20),
    "registeration" DECIMAL,
    "sessionId" BIGINT,
    "song" VARCHAR(300),
    "status" VARCHAR(4),
    "ts" BIGINT,
    "userAgent" VARCHAR(300),
    "userId" INTEGER

);

# STEP 4: Write ETL to copy data from JSON files in S3 to Redshift

In [None]:
%%time
song_data = config.get('S3','SONG_DATA')
qry = """
    copy staging_songs from {}
    credentials 'aws_iam_role={}'
    format as json 'auto' compupdate off STATUPDATE ON TRUNCATECOLUMNS region 'us-west-2';
""".format(song_data, DWH_ROLE_ARN)

%sql $qry

In [None]:
%%time
log_data = config.get('S3','LOG_DATA')
log_json_path = config.get('S3', 'LOG_JSONPATH')
qry = """
    copy staging_events from {}
    credentials 'aws_iam_role={}'
    format as json {} compupdate off TRUNCATECOLUMNS region 'us-west-2';
""".format(log_data, DWH_ROLE_ARN, log_json_path)

%sql $qry

# STEP 4: Create Analytics Tables

In [None]:
%%sql 
DROP TABLE IF EXISTS songplays;
DROP TABLE IF EXISTS users;
DROP TABLE IF EXISTS songs;
DROP TABLE IF EXISTS artists;
DROP TABLE IF EXISTS times;

CREATE TABLE songs 
(
    song_id VARCHAR PRIMARY KEY NOT NULL sortkey distkey,
    title VARCHAR,
    artist_id VARCHAR,
    year INTEGER,
    duration numeric(10,5)
);

CREATE TABLE artists 
(
    artist_id VARCHAR PRIMARY KEY NOT NULL sortkey,
    name VARCHAR,
    location VARCHAR, 
    lattitude VARCHAR,
    longitude VARCHAR
);

CREATE TABLE songplays 
(
    songplay_id INTEGER IDENTITY(0,1) PRIMARY KEY,
    start_time TIMESTAMP NOT NULL,
    user_id INTEGER NOT NULL,
    level VARCHAR,
    song_id VARCHAR distkey, 
    artist_id VARCHAR sortkey,
    session_id INTEGER,
    location VARCHAR,
    user_agent VARCHAR
);

CREATE TABLE users 
(
    user_id INTEGER PRIMARY KEY NOT NULL sortkey,
    first_name VARCHAR,
    last_name VARCHAR,
    gender VARCHAR,
    level VARCHAR
);

CREATE TABLE times 
(
    starttime TIMESTAMP PRIMARY KEY NOT NULL sortkey,
    hour INTEGER,
    day INTEGER,
    week INTEGER,
    month INTEGER,
    year INTEGER,
    weekday INTEGER
);

# STEP 6: Insert from staging tables into analytics tables

In [None]:
%%time

qry = """
    INSERT INTO times
    SELECT 
        distinct starttime,
        EXTRACT(hour from starttime) as hour,
        EXTRACT(day from starttime) as day,
        EXTRACT(week from starttime) as week,
        EXTRACT(month from starttime) as month,
        EXTRACT(year from starttime) as year,
        EXTRACT(weekday from starttime) as weekday 
        FROM   
            (SELECT TIMESTAMP 'epoch' + e.ts/1000 * interval '1 second' AS starttime
            FROM staging_events e)
     """

%sql $qry

In [None]:
%%time

qry = """
    INSERT INTO users
    SELECT 
        DISTINCT userId,
        firstName,
        lastName,
        gender,
        level FROM staging_events where page != 'NextSong' 
        AND userID is not NULL"""

%sql $qry

In [None]:
%%time

qry = """
    INSERT INTO artists
    SELECT 
        DISTINCT artist_id,
        artist_name,
        artist_location,
        artist_latitude,
        artist_longitude FROM staging_songs where artist_id is not null """

%sql $qry

In [None]:
%%time

qry = """
    INSERT INTO songs
    SELECT 
        DISTINCT song_id,
        title,
        artist_id,
        year,
        duration FROM staging_songs where song_id is not null"""

%sql $qry

In [None]:
%%time

qry = """
    INSERT INTO songplays (start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
    SELECT 
        TIMESTAMP 'epoch' + e.ts/1000 * interval '1 second' AS starttime,
        userId,
        level,
        song_id,
        artist_id,
        sessionId,
        location,
        userAgent
        FROM staging_songs s JOIN staging_events e
        ON s.artist_name = e.artist 
        AND s.title = e.song
        AND e.length = s.duration
        WHERE e.page ='NextSong' """

%sql $qry