In [1]:
import configparser
import psycopg2
from sql_queries import create_table_queries, drop_table_queries
%load_ext sql

In [2]:
config = configparser.ConfigParser()
config.read('dwh.cfg')

['dwh.cfg']

In [3]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [4]:
DB_ENDPOINT = config.get("CLUSTER","HOST")
DB_NAME= config.get("CLUSTER","DB_NAME")
DB_USER= config.get("CLUSTER","DB_USER")
DB_PASSWORD= config.get("CLUSTER","DB_PASSWORD")
DB_PORT = config.get("CLUSTER","DB_PORT")


IAM_ROLE = config['IAM_ROLE']['ARN']
LOG_DATA = config['S3']['LOG_DATA']
LOG_JSONPATH = config['S3']['LOG_JSONPATH']
SONG_DATA = config['S3']['SONG_DATA']


In [5]:
conn_string = "postgresql://{}:{}@{}:{}/{}" \
                        .format(DB_USER, DB_PASSWORD, DB_ENDPOINT, DB_PORT, DB_NAME)

In [6]:
%sql $conn_string

'Connected: dwhuser@dwh'

In [7]:
%%sql

DROP TABLE IF EXISTS staging_events;
DROP TABLE IF EXISTS staging_songs;


CREATE TABLE IF NOT EXISTS staging_events (
    artist character varying(150),
    auth character varying(20) NOT NULL,
    firstName character varying(20),
    gender character varying (1),
    itemInSession double precision NOT NULL,
    lastName character varying(20),
    length double precision,
    level character varying(20) NOT NULL,
    location character varying(50),
    method character varying(20) NOT NULL,
    page character varying(20) NOT NULL,
    registration double precision,
    sessionId double precision NOT NULL,
    song character varying(200),
    status double precision NOT NULL,
    ts double precision NOT NULL,
    userAgent character varying(200),
    userId character varying(20) NOT NULL,
    PRIMARY KEY(userId, sessionId, itemInSession)
    );

CREATE TABLE IF NOT EXISTS staging_songs (
    num_songs bigint IDENTITY(0,1) PRIMARY KEY,
    artist_id character varying(50) NOT NULL,
    artist_latitude character varying(50),
    artist_longitude character varying(50),
    artist_location character varying(200),
    artist_name character varying(200) NOT NULL,
    song_id character varying(50) NOT NULL,
    title character varying(200) NOT NULL,
    duration double precision NOT NULL,
    year numeric(4,0) NOT NULL
    )

 * postgresql://dwhuser:***@dwhcluster.cbkbyouhw7x4.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.
Done.
Done.


[]

In [8]:
staging_events_copy = ("""COPY staging_events FROM {}
credentials 'aws_iam_role={}'
region 'us-west-2'
format as json {};
""").format(LOG_DATA,IAM_ROLE,LOG_JSONPATH)

staging_songs_copy = ("""COPY staging_songs FROM {}
credentials 'aws_iam_role={}'
region 'us-west-2'
json 'auto'
""").format(SONG_DATA, IAM_ROLE)

%sql $staging_events_copy
%sql $staging_songs_copy

 * postgresql://dwhuser:***@dwhcluster.cbkbyouhw7x4.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
 * postgresql://dwhuser:***@dwhcluster.cbkbyouhw7x4.us-west-2.redshift.amazonaws.com:5439/dwh
Done.


[]

In [9]:
%%sql

DROP TABLE IF EXISTS songplays;

CREATE TABLE IF NOT EXISTS songplays (
    songplay_id bigint IDENTITY(0,1) PRIMARY KEY,
    start_time timestamp NOT NULL,
    user_id character varying(20) NOT NULL,
    level character varying(20) NOT NULL,
    song_id character varying(50) NOT NULL,
    artist_id character varying(50) NOT NULL,
    session_id double precision NOT NULL,
    location character varying(50),
    user_agent character varying(200)
);


 * postgresql://dwhuser:***@dwhcluster.cbkbyouhw7x4.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.


[]

In [112]:
%%sql

select * from staging_events limit 1

 * postgresql://dwhuser:***@dwhcluster.cbkbyouhw7x4.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


artist,auth,firstname,gender,iteminsession,lastname,length,level,location,method,page,registration,sessionid,song,status,ts,useragent,uderid
,Logged Out,,,0.0,,,free,,PUT,Login,,52.0,,307.0,1541207073796.0,,


In [111]:
songplay_table_insert = ("""
    INSERT INTO songplays (start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
    SELECT DISTINCT 
        timestamp 'epoch' + se.ts/1000 * interval '1 second' as start_time, 
        se.userId as user_id, 
        se.level, 
        ss.song_id, 
        ss.artist_id, 
        se.sessionid as session_id, 
        se.location,
        se.userAgent as user_agent
    FROM staging_events se
    JOIN staging_songs ss on se.artist = ss.artist_name and se.song = ss.title and se.length = ss.duration
""")

%sql $songplay_table_insert

 * postgresql://dwhuser:***@dwhcluster.cbkbyouhw7x4.us-west-2.redshift.amazonaws.com:5439/dwh
(psycopg2.ProgrammingError) column se.userid does not exist
 [SQL: "INSERT INTO songplays (start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)\n    SELECT DISTINCT \n        timestamp 'epoch' + se.ts/1000 * interval '1 second' as start_time, \n        se.userId as user_id, \n        se.level, \n        ss.song_id, \n        ss.artist_id, \n        se.sessionid as session_id, \n        se.location,\n        se.userAgent as user_agent\n    FROM staging_events se\n    JOIN staging_songs ss on se.artist = ss.artist_name and se.song = ss.title and se.length = ss.duration"]


In [34]:
%%sql
select * from songplays limit 1

 * postgresql://dwhuser:***@dwhcluster.cbkbyouhw7x4.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
4,2018-11-04 09:19:03.796000,44,paid,SOCSXKQ12A6D4F95A0,ARRE7IQ1187FB4CF13,196.0,"Waterloo-Cedar Falls, IA",Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:31.0) Gecko/20100101 Firefox/31.0


In [115]:
time_table_create = ("""
    CREATE TABLE IF NOT EXISTS time (
        start_time timestamp PRIMARY KEY,
        hour numeric(2,0) NOT NULL,
        day numeric(2,0) NOT NULL,
        week numeric(2,0) NOT NULL,
        month numeric(2,0) NOT NULL,
        year numeric(4,0) NOT NULL,
        weekday numeric(1,0) NOT NULL
        );
""")
%sql $time_table_create

 * postgresql://dwhuser:***@dwhcluster.cbkbyouhw7x4.us-west-2.redshift.amazonaws.com:5439/dwh
Done.


[]

In [116]:
time_table_insert = ("""
    INSERT INTO time (start_time, hour, day, week, month, year, weekday)
    SELECT start_time,
    EXTRACT(hour FROM start_time) as hour,
    EXTRACT(day FROM start_time) as day,
    EXTRACT(week FROM start_time) as week,
    EXTRACT(month FROM start_time) as month,
    EXTRACT(year FROM start_time) as year,
    EXTRACT(weekday FROM start_time) as weekday
    FROM songplays
""")
%sql $time_table_insert

 * postgresql://dwhuser:***@dwhcluster.cbkbyouhw7x4.us-west-2.redshift.amazonaws.com:5439/dwh
0 rows affected.


[]

In [10]:
%%sql
DROP TABLE IF EXISTS users;
DROP TABLE IF EXISTS songs;
DROP TABLE IF EXISTS artists;


CREATE TABLE IF NOT EXISTS users (
    user_id character varying(20) NOT NULL,
    first_name character varying(20),
    last_name character varying(20),
    gender character varying (1),
    level character varying(20) NOT NULL
);

CREATE TABLE IF NOT EXISTS songs (
    song_id character varying(50) PRIMARY KEY,
    title character varying(200) NOT NULL,
    artist_id character varying(50) NOT NULL,
    year numeric(4,0) NOT NULL,
    duration double precision NOT NULL
);

CREATE TABLE IF NOT EXISTS artists (
    artist_id character varying(50) PRIMARY KEY,
    name character varying(200) NOT NULL,
    location character varying(200),
    latitude character varying(50),
    longitude character varying(50)
);


 * postgresql://dwhuser:***@dwhcluster.cbkbyouhw7x4.us-west-2.redshift.amazonaws.com:5439/dwh
Done.
Done.
Done.
Done.
Done.
Done.


[]

In [91]:
%%sql
select len(artist_latitude), len(artist_longitude) from staging_songs
where len(artist_latitude) >25 or len(artist_longitude)>25

 * postgresql://dwhuser:***@dwhcluster.cbkbyouhw7x4.us-west-2.redshift.amazonaws.com:5439/dwh
0 rows affected.


len,len_1


In [95]:
%%sql

INSERT INTO artists (artist_id, name, location, latitude, longitude)
SELECT DISTINCT
    artist_id,
    artist_name as name,
    artist_location as location,
    artist_latitude as latitude,
    artist_longitude as longitude
FROM staging_songs

 * postgresql://dwhuser:***@dwhcluster.cbkbyouhw7x4.us-west-2.redshift.amazonaws.com:5439/dwh
10025 rows affected.


[]

In [60]:
%%sql

INSERT INTO songs (song_id, title, artist_id, year, duration)
SELECT DISTINCT 
    song_id,
    title,
    artist_id,
    year,
    duration
FROM staging_songs

 * postgresql://dwhuser:***@dwhcluster.cbkbyouhw7x4.us-west-2.redshift.amazonaws.com:5439/dwh
14896 rows affected.


[]

In [14]:
%%sql

INSERT INTO users (user_id, first_name, last_name, gender, level)
SELECT 
    userId as user_id,
    firstName as first_name,
    lastName as last_name,
    gender,
    level
FROM staging_events
WHERE page = 'NextSong'
ON CONFLICT (user_id)
DO UPDATE
    SET level = EXCLUDED.level

 * postgresql://dwhuser:***@dwhcluster.cbkbyouhw7x4.us-west-2.redshift.amazonaws.com:5439/dwh
(psycopg2.ProgrammingError) syntax error at or near "ON"
LINE 10: ON CONFLICT (user_id)
         ^
 [SQL: "INSERT INTO users (user_id, first_name, last_name, gender, level)\nSELECT \n    userId as user_id,\n    firstName as first_name,\n    lastName as last_name,\n    gender,\n    level\nFROM staging_events\nWHERE page = 'NextSong'\nON CONFLICT (user_id)\nDO UPDATE\n    SET level = EXCLUDED.level"]


In [54]:
%%sql
select * from staging_events limit 1

 * postgresql://dwhuser:***@dwhcluster.cbkbyouhw7x4.us-west-2.redshift.amazonaws.com:5439/dwh
1 rows affected.


artist,auth,firstname,gender,iteminsession,lastname,length,level,location,method,page,registration,sessionid,song,status,ts,useragent,userid
,Logged In,Theodore,M,0.0,Smith,,free,"Houston-The Woodlands-Sugar Land, TX",GET,Home,1540306145796.0,154.0,,200.0,1541290555796.0,Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0,52


In [105]:
conn.close()