# ETL

A notebook with ETL code for testing ETL pipeline and checking data.

In [3]:
%load_ext sql

import pandas as pd
import boto3
import json
import amazon_redshift_cluster as aws

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [15]:
# for reload only
import amazon_redshift_cluster as aws
import importlib
importlib.reload(aws)

<module 'amazon_redshift_cluster' from '/home/workspace/amazon_redshift_cluster.py'>

## Create cluster

In [17]:
conn_string = aws.Cluster.getConnString()
%sql $conn_string

'Connected: dwhuser@dwh'

In [None]:
cluster = aws.Cluster('<key>', '<secret key>')
cluster.show_config()
cluster.create()

In [4]:
cluster.get_status()

'available'

## Open TCP port and connect to DWH

In [6]:
# Open TCP port and connect to Redshift database
cluster.open_tcp()
conn_string = cluster.get_conn_string()
%sql $conn_string

ec2.SecurityGroup(id='sg-9afb0fb8')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists


'Connected: dwhuser@dwh'

In [7]:
%%sql 

select table_catalog, table_schema, table_name, table_type
from information_schema.tables 
where table_schema = 'public';

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
7 rows affected.


table_catalog,table_schema,table_name,table_type
dwh,public,staging_events,BASE TABLE
dwh,public,staging_songs,BASE TABLE
dwh,public,time,BASE TABLE
dwh,public,users,BASE TABLE
dwh,public,songs,BASE TABLE
dwh,public,songplays,BASE TABLE
dwh,public,artists,BASE TABLE


## ETL: Copy into staging_songs 

In [8]:
%%sql  
    
    DROP TABLE IF EXISTS staging_songs;
    
    CREATE TABLE IF NOT EXISTS staging_songs (
        num_songs int NOT NULL,
        artist_id varchar(18) NOT NULL,
        artist_latitude varchar(30) NULL,
        artist_longitude varchar(30) NULL,
        artist_location varchar(200) NULL,
        artist_name varchar(200) NOT NULL,
        song_id varchar(18) NOT NULL,
        title varchar(200) NOT NULL,
        duration numeric NOT NULL,
        year smallint NOT NULL
    );

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

In [20]:
%%time
%%sql

    COPY staging_songs 
    FROM 's3://udacity-dend/song_data'
    CREDENTIALS '<credentials>'
    REGION 'us-west-2'
    JSON 'auto';

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
CPU times: user 4.16 ms, sys: 11 µs, total: 4.17 ms
Wall time: 2min 58s


[]

In [10]:
%sql select count(*) from staging_songs

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
14896


In [11]:
%sql select * from stl_load_errors order by starttime desc limit 1;

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
0 rows affected.


userid,slice,tbl,starttime,session,query,filename,line_number,colname,type,col_length,position,raw_line,raw_field_value,err_code,err_reason


#### Detect nullable columns

 - artist_latitude
 - artist_longitude
 - artist_location

In [109]:
%%sql

select count(*) from staging_songs
where year is null

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
0


#### Detect varchar length

In [113]:
%%sql

select max(len(artist_id)) as max_artist_id
    , max(len(artist_latitude)) as max_artist_latitude
    , max(len(artist_longitude)) as max_artist_longitude
    , max(len(artist_location)) as max_artist_location
    , max(len(artist_name)) as max_artist_name
    , max(len(song_id)) as max_song_id
    , max(len(title)) as max_title
from staging_songs

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


max_artist_id,max_artist_latitude,max_artist_longitude,max_artist_location,max_artist_name,max_song_id,max_title
18,19,22,176,177,18,173


## ETL: Copy into staging_events

In [13]:
%%sql  
    
    DROP TABLE IF EXISTS staging_events;
    
    CREATE TABLE IF NOT EXISTS staging_events (
        artist varchar(200) NULL,
        auth varchar(10) NOT NULL,
        firstName varchar(20) NULL,
        gender char(1) NULL,
        itemInSession int NOT NULL,
        lastName varchar(50) NULL,
        length numeric NULL,
        level char(4) NOT NULL,
        location varchar(50) NULL,
        method char(3) NOT NULL,
        page varchar(30) NOT NULL,
        registration bigint NULL,
        sessionId int NOT NULL,
        song varchar(200) NULL,
        status char(3) NOT NULL,
        ts bigint NOT NULL,
        userAgent varchar(200) NULL,
        userId int NULL
    );

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

In [16]:
%%time
%%sql 
    
    COPY staging_events
    FROM 's3://udacity-dend/log_data'
    CREDENTIALS '<credentials>'
    REGION 'us-west-2'
    FORMAT AS JSON 's3://udacity-dend/log_json_path.json';

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
CPU times: user 4.16 ms, sys: 0 ns, total: 4.16 ms
Wall time: 1.65 s


[]

In [23]:
%sql select count(*) from staging_events

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
8056


In [18]:
%sql select * from staging_events order by ts limit 3;

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
3 rows affected.


artist,auth,firstname,gender,iteminsession,lastname,length,level,location,method,page,registration,sessionid,song,status,ts,useragent,userid
,Logged In,Walter,M,0,Frye,,free,"San Francisco-Oakland-Hayward, CA",GET,Home,1540919166796,38,,200,1541105830796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36""",39
,Logged In,Kaylee,F,0,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Home,1540344794796,139,,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36""",8
Des'ree,Logged In,Kaylee,F,1,Summers,246.0,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36""",8


#### Detect nullable columns

 - artist
 - firstName
 - lastName
 - gender
 - userId
 - length
 - location
 - registration
 - song
 - userAgent

#### Detect varchar length

In [86]:
%sql select distinct gender from staging_events;

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
3 rows affected.


gender
F
M
""


In [87]:
%sql select distinct level from staging_events;

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
2 rows affected.


level
free
paid


In [24]:
%sql select distinct status from staging_events;

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
3 rows affected.


status
200
307
404


In [89]:
%sql select distinct page from staging_events;

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
13 rows affected.


page
NextSong
Home
Logout
Login
Settings
Error
Downgrade
Help
About
Upgrade


In [90]:
%%sql

select max(len(artist)) as max_artist
    , max(len(auth)) as max_auth
    , max(len(firstName)) as max_firstName
    , max(len(lastName)) as max_lastName
    , max(len(location)) as max_location
    , max(len(method)) as max_method
    , max(len(page)) as max_page
    , max(len(song)) as max_song
    , max(len(userAgent)) as max_userAgent
from staging_events

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


max_artist,max_auth,max_firstname,max_lastname,max_location,max_method,max_page,max_song,max_useragent
89,10,10,9,46,3,16,151,139


## Start Schema

### Songplays

In [10]:
%%sql

    DROP TABLE IF EXISTS songplays;
    DROP TABLE IF EXISTS users;
    DROP TABLE IF EXISTS songs;
    DROP TABLE IF EXISTS artists;
    DROP TABLE IF EXISTS time;
      
    CREATE TABLE IF NOT EXISTS users (
        user_id int NOT NULL PRIMARY KEY sortkey,
        first_name varchar(20) NOT NULL,
        last_name varchar(50) NOT NULL,
        gender char(1) NOT NULL,
        level char(4) NOT NULL
    )
    diststyle all;
    
    CREATE TABLE IF NOT EXISTS artists (
        artist_id varchar(20) NOT NULL PRIMARY KEY sortkey,
        name varchar(200) NOT NULL,
        location varchar(200) NULL,
        latitude varchar(30) NULL,
        longitude varchar(30) NULL
    )
    diststyle all;
    
    CREATE TABLE IF NOT EXISTS songs (
        song_id varchar(18) NOT NULL PRIMARY KEY sortkey distkey,
        title varchar(200) NOT NULL,
        artist_id varchar(18) NOT NULL REFERENCES artists(artist_id),
        year int NOT NULL,
        duration numeric NOT NULL
    );
    
    CREATE TABLE IF NOT EXISTS time (
        start_time timestamp NOT NULL PRIMARY KEY sortkey,
        hour int NOT NULL,
        day int NOT NULL,
        week int NOT NULL,
        month int NOT NULL,
        year int NOT NULL,
        weekday int NOT NULL
    )
    diststyle all;
       
    CREATE TABLE IF NOT EXISTS songplays (
        songplay_id int IDENTITY(0,1) PRIMARY KEY,
        start_time timestamp NOT NULL UNIQUE sortkey,
        user_id int NOT NULL REFERENCES users(user_id),
        level char(4) NOT NULL,
        song_id varchar(18) NOT NULL REFERENCES songs(song_id) distkey,
        artist_id varchar(18) NOT NULL REFERENCES artists(artist_id),
        session_id int NOT NULL,
        location varchar(50) NOT NULL,
        user_agent varchar(200) NOT NULL
    );    

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.


[]

###  songplays

In [61]:
%%sql    
/*
    Songs from events table that do not match with the staging_songs table.
    Suppose the mismatch comes from the INCOMPLETE staging data.
*/
    SELECT COUNT(*)
    FROM staging_events AS A
    LEFT OUTER JOIN staging_songs AS B
        ON A.song = B.title
        AND A.artist = B.artist_name
    WHERE A.page = 'NextSong'
        AND B.song_id IS NULL

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
6487


In [11]:
%%time
%%sql

    INSERT INTO songplays
    (start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
    SELECT 
        TIMESTAMP 'epoch' + (A.ts/1000)::BIGINT * INTERVAL '1 second'
        , A.userId::INTEGER
        , A.level
        , B.song_id
        , B.artist_id
        , A.sessionId
        , A.location
        , A.userAgent
    FROM staging_events AS A
    INNER JOIN staging_songs AS B
        ON A.song = B.title
        AND A.artist = B.artist_name;


 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
333 rows affected.
CPU times: user 8.33 ms, sys: 29 µs, total: 8.36 ms
Wall time: 743 ms


[]

In [13]:
%%sql

select * 
from songplays 
order by start_time
limit 3

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
3 rows affected.


songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
550,2018-11-01 21:11:13,8,free,SOEIQUY12AF72A086A,ARHUC691187B9AD27F,139,"Phoenix-Mesa-Scottsdale, AZ","""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.153 Safari/537.36"""
422,2018-11-02 16:35:00,50,free,SOBONKR12A58A7A7E0,AR5E44Z1187B9A1D74,156,"New Haven-Milford, CT","""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"""
606,2018-11-02 17:31:45,10,free,SOHTKMO12AB01843B0,AR5EYTL1187B98EDA0,182,"Washington-Arlington-Alexandria, DC-VA-MD-WV","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.77.4 (KHTML, like Gecko) Version/7.0.5 Safari/537.77.4"""


### users

In [64]:
%%sql

select count(distinct userId) from staging_events

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
97


In [67]:
%%sql

/*
    The both keys (userId) and (userId, firstName, lastName, gender)
    have the same aggregation power.
*/

select count(*)
from (
    select distinct userId, firstName, lastName, gender  
    from staging_events
    where userId is not null
) t

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
97


In [97]:
%%sql

/*
    Let's check NULL users.
*/

    SELECT COUNT(*)
    FROM staging_events
    WHERE userId IS NULL;
    

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


count
286


In [14]:
%%time
%%sql

/* 
    First we'll insert user data with default (=free) level
    since the level attribute is a user-time attribute.
*/

    INSERT INTO users
    (user_id, first_name, last_name, gender, level)
    SELECT DISTINCT A.userId::INTEGER, A.firstName, A.lastName, A.gender, A.level
    FROM staging_events AS A
    INNER JOIN (
        SELECT userId, MAX(ts) AS tsLast
        FROM staging_events
        GROUP BY userId
    ) AS B
        ON A.userId = B.userId
        AND A.ts = B.tsLast
    WHERE A.userId IS NOT NULL;
    

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
97 rows affected.
CPU times: user 6.86 ms, sys: 0 ns, total: 6.86 ms
Wall time: 397 ms


[]

In [76]:
%%sql

/* 
    Check the differences between first and last user's level.
*/

    SELECT DISTINCT A.userId::INTEGER, A.firstName, A.lastName, A.gender, A.level
    INTO #TEMP
    FROM staging_events AS A
    INNER JOIN (
        SELECT userId, MIN(ts) AS tsFirst
        FROM staging_events
        GROUP BY userId
    ) AS B
        ON A.userId = B.userId
        AND A.ts = B.tsFirst
    WHERE A.userId IS NOT NULL;
    

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
Done.


[]

In [78]:
%%sql

/* 
    A list of users with different firs-last levels.
*/

SELECT A.user_id, B.level AS FirstLevel, A.level AS LastLevel
FROM users AS A
INNER JOIN #TEMP AS B ON A.user_id = B.userId
WHERE B.level <> A.level;

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
7 rows affected.


user_id,firstlevel,lastlevel
80,free,paid
88,free,paid
16,free,paid
29,free,paid
85,free,paid
36,free,paid
49,free,paid


### artists

In [99]:
%%time
%%sql

/*
    We opt for inserting ALL staging artists.
    The other option would be to insert only artists from MATCHING songs.
*/

    INSERT INTO artists
    (artist_id, name, location, latitude, longitude)
    SELECT artist_id, MIN(artist_name), MIN(artist_location), MIN(artist_latitude), MIN(artist_longitude)
    FROM staging_songs
    GROUP BY artist_id;
    

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
14896 rows affected.
CPU times: user 4.69 ms, sys: 0 ns, total: 4.69 ms
Wall time: 4.47 s


[]

### songs

In [48]:
%%time
%%sql

/*
    We opt for inserting ALL staging songs.
    The other option would be to insert only MATCHING songs.
*/

    INSERT INTO songs
    (song_id, title, artist_id, year, duration)
    SELECT song_id, MIN(title), MIN(artist_id), MIN(year), MIN(duration)
    FROM staging_songs
    GROUP BY song_id;
    

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
14896 rows affected.
CPU times: user 4.63 ms, sys: 100 µs, total: 4.73 ms
Wall time: 592 ms


[]

### time

In [101]:
%%time
%%sql

    INSERT INTO time
    (start_time, hour, day, week, month, year, weekday)
    SELECT A.start_time
        , EXTRACT(hour FROM A.start_time)
        , EXTRACT(day FROM A.start_time)
        , EXTRACT(week FROM A.start_time)
        , EXTRACT(month FROM A.start_time)
        , EXTRACT(year FROM A.start_time)
        , EXTRACT(weekday FROM A.start_time)        
    FROM (
        SELECT DISTINCT TIMESTAMP 'epoch' + (ts/1000)::BIGINT 
            * INTERVAL '1 second' AS start_time
        FROM staging_events
    ) AS A;
    

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
8023 rows affected.
CPU times: user 7.19 ms, sys: 666 µs, total: 7.85 ms
Wall time: 1.05 s


[]

#### Song Uniqueness

Song uniqueness is obtained by the key (**song**, **artist**).

In [18]:
%%sql    
    
    select a.title, a.artist_name
    from staging_songs as a
    where exists (
        select 1
        from staging_events as aa 
        where aa.song = a.title
        and aa.artist = a.artist_name
    )
    group by a.title, a.artist_name
    having count(*) > 1

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
0 rows affected.


title,artist_name


#### Check row count

We'll check if the row count of a fact table equals the row count of a denormalized (inner joined) data.

In [60]:
%%sql

/* Row count of a fact table */

SELECT COUNT(*) AS row_count
FROM songplays;

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


row_count
333


In [61]:
%%sql

/* Row count of a denormalized data */

SELECT COUNT(*) AS row_count
FROM songplays AS A
INNER JOIN users AS B ON A.user_id = B.user_id
INNER JOIN songs AS C ON A.song_id = C.song_id
INNER JOIN artists AS D ON A.artist_id = D.artist_id
INNER JOIN time AS E ON A.start_time = E.start_time;

 * postgresql://dwhuser:***@dwhcluster.czw6kev3ol8q.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


row_count
333


## Delete cluster

In [62]:
cluster.delete()

Amazon Redshift cluster is being deleted. Please wait...


In [63]:
cluster.get_status()

ClusterNotFoundFault: An error occurred (ClusterNotFound) when calling the DescribeClusters operation: Cluster dwhcluster not found.

In [64]:
cluster.delete_iam_role()

IAM role deleted.
