In [None]:
import boto3
import pandas as pd
import configparser
import json

In [None]:
conf = configparser.ConfigParser()
conf.read_file(open('dwh.cfg'))

KEY=conf.get("AWS","KEY")
SECRET=conf.get("AWS","SECRET")
DWH_CLUSTER_IDENTIFIER = conf.get("CLUSTER","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = conf.get("CLUSTER","DB_NAME")
DWH_DB_USER            = conf.get("CLUSTER","DB_USER")
DWH_DB_PASSWORD        = conf.get("CLUSTER","DB_PASSWORD")
DWH_PORT               = conf.get("CLUSTER","DB_PORT")
DWH_ENDPOINT           = conf.get("CLUSTER","HOST")

In [None]:
s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                     )


In [None]:
log_bucket = s3.Bucket("udacity-dend")
song_bucket = s3.Bucket(conf.get("S3","SONG_DATA"))
JSONPATH_bucket = s3.Bucket(conf.get("S3","LOG_JSONPATH"))

In [None]:
for obj in log_bucket.objects.filter(Prefix='log-data'):
    print(obj)

In [None]:
#for obj in song_bucket.objects.filter(Prefix='song-data/A/A/A'):
#    print(obj)

In [None]:
#for obj in song_bucket.objects.filter(Prefix='log_json_path'):
#    print(obj)

In [None]:
%load_ext sql

In [None]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

In [None]:
## cell below function for date to convert from timestamp to date

In [None]:
%%sql
CREATE OR REPLACE FUNCTION from_unixtime(epoch BIGINT)
  RETURNS TIMESTAMP  AS
'from datetime import datetime

x = lambda epoch : datetime.fromtimestamp(epoch/1000.0)

return x(epoch)

#return datetime.datetime.fromtimestamp(epoch)
'
LANGUAGE plpythonu IMMUTABLE;


In [None]:
%%sql
Drop table if exists events;
create table if not exists events 
(
artist text,
auth varchar(200),
firstName varchar(200),
gender varchar(200),
itemInSession int,
lastName varchar(200),
length float,
level varchar(200),
location varchar(50),
method varchar(50),
page varchar(50),
registration float,
session_id int,
song varchar(200),
status int,
ts bigint,
userAgent text,
userId int
)

In [None]:
LOG_DATA=conf.get("S3","LOG_DATA")
ARN=conf.get("IAM_ROLE","ARN")
JSONPATH_DATA = conf.get("S3","LOG_JSONPATH")

staging_events_copy = (""" 
    copy events from {}
    credentials 'aws_iam_role={}'
    FORMAT AS json {};
""").format(LOG_DATA,ARN,JSONPATH_DATA)


In [None]:
%sql $staging_events_copy

In [None]:
%%sql
SELECT count(*) FROM events

In [None]:
%%sql 
SELECT * FROM events limit 5

In [None]:
%%sql 
SELECT from_unixtime(ts) from events limit 5;


In [None]:
%%sql 
select top 5 * from stl_load_errors 

In [None]:
%%sql
drop table if exists songs;

In [None]:
SONG_DATA=conf.get("S3","SONG_DATA")


In [None]:
sampleDbBucket =  s3.Bucket("udacity-dend")

In [None]:
#for obj in sampleDbBucket.objects.filter(Prefix="song-data/A/A"):
 #   print(obj)

In [None]:
%%sql
Create table if not exists songs 
(
num_songs int,
artist_id varchar(200) NOT NULL ,
artist_latitude DOUBLE PRECISION,
artist_longitude DOUBLE PRECISION,
artist_location varchar(2000),
artist_name varchar(2000),
song_id     varchar(200),
title  varchar(200),
duration DOUBLE PRECISION,
year int,
PRIMARY KEY (artist_id)
)

In [None]:
SONG_DATA=conf.get("S3","SONG_DATA")


staging_songs_copy = ("""
    copy songs from 's3://udacity-dend/song-data/A/A'
    credentials 'aws_iam_role={}'
    json 'auto'
    region 'us-west-2' 
 COMPUPDATE OFF STATUPDATE OFF
;
""").format(ARN)


In [None]:
%sql $staging_songs_copy

In [None]:
%%sql 
Select * from songs limit 5

In [None]:
%%sql 
Select count(*) from songs

## dim_songs 

song_id, title, artist_id, year, duration


In [None]:
%%sql 


DROP TABLE if exists dim_songs ;

CREATE TABLE if not exists dim_songs 
(
song_key int IDENTITY(0,1) sortkey,
song_id varchar(200) NOT NULL,
title varchar(200),
artist_id varchar(200),
year int,
duration DOUBLE PRECISION,
PRIMARY KEY (song_key)
);



In [None]:
%%sql 

insert into dim_songs (song_id,title,artist_id,year,duration)
select 
DISTINCT song_id,
title,
artist_id,
year,
duration
from songs

In [None]:
%%sql 

select * from dim_songs limit 5

## users table

In [None]:
%%sql 
DROP TABLE if exists users ;

CREATE TABLE if not exists users 
(
user_key int IDENTITY(0,1) sortkey,
userid int NOT NULL,
firstName varchar(200),
lastName varchar(200),
gender varchar(200),
level varchar(200),
PRIMARY KEY (user_key)
);


insert into users (userid,firstName,lastName,gender,level) 
SELECT
userid,
firstName,
lastName,
gender,
level
from (
        select
        userid,
        firstName,
        lastName,
        gender,
        level,
        from_unixtime(ts) as date,
        row_number() OVER (
            PARTITION BY userid
            ORDER BY date DESC
        ) as row_number

        from  events )
where userid is not null
and row_number = 1
;


In [None]:
%%sql 
select * from users limit 5

## time


In [None]:
%%sql
DROP Table if exists dim_time;
CREATE TABLE if not exists dim_time (
date_key int IDENTITY(0,1) sortkey,
start_time bigint NOT NULL,
hour int ,
day int ,
month int,
year int ,
dayofweek int,
PRIMARY KEY (date_key) 
)
diststyle all;



In [None]:
date = %sql Select from_unixtime(ts) from events

In [None]:
#date

In [None]:
#df = date.DataFrame()

In [None]:
#from datetime import datetime


In [None]:

#t = df['ts'].apply(lambda x : datetime.fromtimestamp(x/1000.0))
#
#start_time=df['ts']
#hour = t.dt.hour
#day = t.dt.day
#weekofyear = t.dt.weekofyear
#month = t.dt.month
#year = t.dt.year
#dayofweek = t.dt.dayofweek
## insert time data records
#time_data = list(zip(start_time,hour,day,weekofyear,month,year,dayofweek))
#column_labels = ('start_time','hour','day','weekofyear','month','year','dayofweek')
#time_df = pd.DataFrame(data=time_data,columns=column_labels)


In [None]:
%%sql 
Select date_part(hour,from_unixtime(ts)) from events limit 5


In [None]:
%%sql 

SELECT (TIMESTAMP 'epoch' + ts/1000 * INTERVAL '1 Second ') as mytimestamp
from events 
limit 5



In [None]:
%%sql 
INSERT INTO dim_time
(start_time,hour,day,month,year,dayofweek)
select 
ts,
EXtract(hour from mytimestamp),
EXtract(day from mytimestamp),
EXtract(month from mytimestamp),
EXtract(year from mytimestamp),
EXtract(dow from mytimestamp)

from (
    SELECT ts,(TIMESTAMP 'epoch' + ts/1000 * INTERVAL '1 Second ') as mytimestamp
from events 
)

In [None]:
%%sql
select count(*) from dim_time

In [None]:
%%sql 
select * from dim_time limit 5

## artist

In [None]:
%%sql 
DROP TABLE IF EXISTS dim_artists


In [None]:
%%sql
CREATE TABLE IF NOT EXISTS dim_artists
(
artist_key int IDENTITY(0,1) sortkey,
artist_id varchar(200) NOT NULL,
artist_name varchar(200),
artist_location varchar(200)
,artist_latitude DOUBLE PRECISION,
artist_longitude DOUBLE PRECISION
)
diststyle all

In [None]:
%%sql 
INSERT INTO dim_artists 
(artist_id,artist_name,artist_location,artist_latitude,artist_longitude)
select DISTINCT artist_id,artist_name,artist_location,artist_latitude,artist_longitude from songs


In [None]:
%%sql 
select * from dim_artists limit 5

In [None]:
%sql DROP TABLE IF EXISTS songplays

In [None]:
%%sql

create table if not exists songplays 
(
songplay_key int IDENTITY(0,1)  NOT NULL sortkey distkey,
start_time varchar(50),
song_id varchar(200),
artist_id varchar(200),
level varchar(50),
session_id int NOT NULL,
location varchar(50),
user_agent varchar(200),
user_id int NOT NULL,
PRIMARY KEY (songplay_key)
)
;

In [None]:
%%sql 

insert into songplays 
(artist_id,user_id,start_time,song_id,level,session_id,location,user_agent)
select 
artist_id,
userId,
ts as start_time,
song_id,
level,
session_id,
location,
userAgent
from events e
join songs s
on e.song = s.title
where e.page = 'NextSong'


In [None]:
%%sql 
Select * from songplays

In [None]:
staging_events_table_create= ("""
create table if not exists events 
(
artist text,\
auth varchar(200),\
firstName varchar(200),\
gender varchar(200),\
itemInSession int,\
lastName varchar(200),\
length float,\
level varchar(200),\
location varchar(50),\
method varchar(50),\
page varchar(50),\
registration float,\
session_id int,\
song varchar(200),\
status int,\
ts bigint,\
userAgent text,\
userId int
)
""")

staging_songs_table_create = ("""
Create table if not exists songs 
(
num_songs int,\
artist_id varchar(200) NOT NULL,\
artist_latitude DOUBLE PRECISION,\
artist_longitude DOUBLE PRECISION,\
artist_location varchar(2000),\
artist_name varchar(2000),\
song_id     varchar(200),\
title  varchar(200),\
duration DOUBLE PRECISION,\
year int,\
PRIMARY KEY (artist_id)
)


""")


In [None]:
%%sql

SELECT * from dim_time limit 20