--------

# PART1: STAGING TABLES

## This part is about:
#### 1. how to creata staging table in Redshift
#### 2. how to move data from AWS S3 to these staging table
#### 3. test whether we can access the data from these staging table or not.

In [1]:
# import the needed packages 
import configparser
import psycopg2

In [62]:
# read the contect which can help us to access to Redshift
config = configparser.ConfigParser()
config.read('dwh.cfg')

['dwh.cfg']

In [63]:
# create the connection and create the schema
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()
cur.execute('SET search_path TO Sparkify')

In [4]:
# drop staging tables (if they exist)
staging_songs_table_drop = 'DROP TABLE IF EXISTS "staging_songs_table"'
staging_events_table_drop = 'DROP TABLE IF EXISTS "staging_events_table"'

cur.execute(staging_songs_table_drop)
cur.execute(staging_events_table_drop)

In [5]:
# define the SQL syntax of creating staging tables and create staging tables in Redshift 
staging_events_table_create= ("""
    CREATE TABLE "staging_events_table" (
    "artist" varchar(max),
    "auth" varchar(10),
    "firstName" varchar(max),
    "gender" varchar(10),
    "iteminSession" int,
    "lastName" varchar(max),
    "length" double precision,
    "level" varchar(10),
    "location" varchar(max),
    "method" varchar(10),
    "page" varchar(max),
    "registration"bigint,
    "sessionId" int,
    "song" varchar(max),
    "status" int,
    "ts" bigint,
    "userAgent" varchar(max),
    "userId" varchar(max));
""")

staging_songs_table_create = ("""
    CREATE TABLE "staging_songs_table" (
    "num_songs" int,
    "artist_id" varchar(max),
    "artist_latitude" varchar(max),
    "artist_longitude" varchar(max),
    "artist_location" varchar(max),
    "artist_name" varchar(max),
    "song_id" varchar(max),
    "title" varchar(max),
    "duration" double precision,
    "year" int);
""")

# create staging tables
cur.execute(staging_events_table_create)
cur.execute(staging_songs_table_create)

In [8]:
# do commit command
conn.commit()

In [9]:
# define the SQL syntax of moving data from S3 to staging tables 
staging_events_copy = ("""COPY staging_events_table 
                            from 's3://udacity-dend/log_data'
                            credentials 'aws_iam_role=arn:aws:iam::582084574000:role/dwhRole'
                            region 'us-west-2'
                            JSON 's3://udacity-dend/log_json_path.json';
                        """)

staging_songs_copy = ("""COPY staging_songs_table 
                         from 's3://udacity-dend/song_data'
                         credentials 'aws_iam_role=arn:aws:iam::582084574000:role/dwhRole'
                         region 'us-west-2'
                         JSON 'auto';
                        """)

# dmove data from S3 to staging tables 
cur.execute(staging_songs_copy)
cur.execute(staging_events_copy)

In [None]:
# do commit command
conn.commit()

In [None]:
# test whether we can access data from staging_events_table
cur.execute("select * from staging_events_table")
rows=cur.fetchall()
for row in rows:
    print(row)

In [None]:
# test whether we can access data from staging_songs_table
cur.execute("select * from staging_songs_table")
rows=cur.fetchall()
for row in rows:
    print(row)

In [13]:
# do commit command
conn.commit()

In [56]:
# close the connection
conn.close()

-----

# PART2: ANALYTICAL TABLES

## This part is about:
#### 1. how to creata analytical tables in Redshift
#### 2. how to move data from staging tables to analytical tables
#### 3. test whether we can access the data from these staging table or not.

In [57]:
# create the connection and create the schema
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()
cur.execute('SET search_path TO Sparkify')

In [21]:
# define the SQL syntax of dropping tables
songplay_table_drop = 'DROP TABLE IF EXISTS "songplay_table"'
user_table_drop = 'DROP TABLE IF EXISTS "user_table"'
song_table_drop = 'DROP TABLE IF EXISTS "song_table"'
artist_table_drop = 'DROP TABLE IF EXISTS "artist_table"'
time_table_drop = 'DROP TABLE IF EXISTS "time_table"'
timetemp_table_drop = 'DROP TABLE IF EXISTS "timetemp"'

# drop tables (if they exist)
cur.execute(songplay_table_drop)
cur.execute(user_table_drop)
cur.execute(song_table_drop)
cur.execute(artist_table_drop)
cur.execute(time_table_drop)
cur.execute(timetemp_table_drop)

In [22]:
# define the SQL syntax of creating analytical tables
songplay_table_create = ("""
    CREATE TABLE "songplay_table" (
    "songplay_id" int identity(1,1) NOT NULL,
    "start_time" varchar(max) sortkey,
    "user_id" varchar(max),
    "level" varchar(5),
    "song_id" varchar(max),
    "artist_id" varchar(max),
    "session_id" int,
    "location" varchar(max),
    "user_agent" varchar(max),
    PRIMARY KEY (songplay_id),
    FOREIGN KEY (start_time) REFERENCES time_table (start_time),
    FOREIGN KEY (user_id) REFERENCES user_table (user_id),
    FOREIGN KEY (song_id) REFERENCES song_table (song_id),
    FOREIGN KEY (artist_id) REFERENCES artist_table (artist_id))
    diststyle even;
""")


user_table_create = ("""
    CREATE TABLE "user_table" (
    "user_id" varchar(max) sortkey NOT NULL,
    "first_name" varchar(max),
    "last_name" varchar(max),
    "gender" varchar(5),
    "level" varchar(5),
    PRIMARY KEY (user_id))
    diststyle all;
""")


song_table_create = ("""
    CREATE TABLE "song_table" (
    "song_id" varchar(max) sortkey NOT NULL,
    "title" varchar(max),
    "artist_id" varchar(max),
    "year" int,
    "duration" double precision,
    PRIMARY KEY (song_id))
    diststyle all;
""")


artist_table_create = ("""
    CREATE TABLE "artist_table" (
    "artist_id" varchar(max) sortkey NOT NULL,
    "name" varchar(max),
    "location" varchar(max),
    "latitude" varchar(max),
    "longitude" varchar(max),
    PRIMARY KEY (artist_id))
    diststyle all;
""")


time_table_create = ("""
    CREATE TABLE "time_table" (
    "start_time" varchar(max) sortkey NOT NULL,
    "hour" varchar(max) NOT NULL,
    "day" varchar(max) NOT NULL,
    "week" varchar(max) NOT NULL,
    "month" varchar(max) NOT NULL,
    "year" varchar(max) NOT NULL,
    "weekday" varchar(max) NOT NULL,
    PRIMARY KEY (start_time))
    diststyle all;
""")


In [23]:
# create analytical tables
cur.execute(user_table_create)
cur.execute(song_table_create)
cur.execute(artist_table_create)
cur.execute(time_table_create)
cur.execute(songplay_table_create)

In [24]:
# do commit command
conn.commit()

In [25]:
# create a timetemp table, this table is a temporary table which is used for saving the 
# transformed ts data from staging_events_table (in timestamp format). This data will be
# used for inserting data to time_table (extract data from this table and put into time
# table)
cur.execute("""create table timetemp as 
        SELECT DISTINCT TIMESTAMP 'epoch' + ts/1000 * interval '1 second' AS time
        FROM staging_events_table
        """)

In [26]:
# do commit command
conn.commit()

In [None]:
# check whether we can access data from timetemp or not
cur.execute("select * from timetemp")
rows=cur.fetchall()
for row in rows:
    print(row)

In [28]:
# define the SQL syntax of inserting data into time table
time_table_insert = ("""
INSERT INTO time_table (start_time, hour, day, week, month, year, weekday)
SELECT time AS start_time,
    EXTRACT(hour FROM time) AS hour,
    EXTRACT(day FROM time) AS day,
    EXTRACT(week FROM time) AS week,
    EXTRACT(month FROM time) AS month,
    EXTRACT(year FROM time) AS year,
    EXTRACT(dow FROM time) AS weekday
FROM timetemp;
""")

# insert data into time table
cur.execute(time_table_insert)

In [None]:
# check whether we can access data from time_table or not
cur.execute("select * from time_table")
rows=cur.fetchall()
for row in rows:
    print(row)

In [31]:
# define the SQL syntax of inserting data into artist_table
artist_table_insert = ("""
INSERT INTO artist_table (artist_id, name, location, latitude, longitude)
SELECT DISTINCT artist_id AS artist_id,
       artist_name AS name,
       artist_location AS location,
       artist_latitude AS latitude,
       artist_longitude AS longitude
FROM staging_songs_table;
""")

# insert data into artist_table
cur.execute(artist_table_insert)

In [None]:
# check whether we can access data from artist_table or not
cur.execute("select * from artist_table")
rows=cur.fetchall()
for row in rows:
    print(row)

In [32]:
# define the SQL syntax of inserting data into song_table
song_table_insert = ("""
INSERT INTO song_table (song_id, title, artist_id, year, duration)
SELECT DISTINCT song_id AS song_id,
       title AS title,
       artist_id AS artist_id,
       year AS year,
       duration AS duration
FROM staging_songs_table;
""")

# insert data into song_table
cur.execute(song_table_insert)

In [None]:
# check whether we can access data from song_table or not
cur.execute("select * from song_table")
rows=cur.fetchall()
for row in rows:
    print(row)

In [35]:
# define the SQL syntax of inserting data into user_table
user_table_insert = ("""
INSERT INTO user_table (user_id, first_name, last_name, gender, level)
SELECT DISTINCT userId AS user_id,
       firstName AS first_name,
       lastName AS last_name,
       gender AS gender,
       level AS level
FROM staging_events_table;
""")

# insert data into user_table
cur.execute(user_table_insert)

In [None]:
# check whether we can access data from user_table or not
cur.execute("select * from user_table")
rows=cur.fetchall()
for row in rows:
    print(row)

In [38]:
# define the SQL syntax of inserting data into songplay_table
songplay_table_insert = ("""
INSERT INTO songplay_table (start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
SELECT TIMESTAMP 'epoch' + e.ts/1000 * interval '1 second' AS start_time,
       e.userId AS user_id,
       e.level AS level,
       s.song_id AS song_id,
       s.artist_id AS artist_id,
       e.sessionId AS session_id,
       s.artist_location AS location,
       e.userAgent AS user_agent
FROM staging_events_table e
JOIN staging_songs_table s
on 
(e.artist = s.artist_name 
AND e.song = s.title
AND e.length = s.duration)
WHERE e.page = 'NextSong';
""")

# insert data into songplay_table
cur.execute(songplay_table_insert)

In [None]:
# check whether we can access data from songplay_table or not
cur.execute("select * from songplay_table")
rows=cur.fetchall()
for row in rows:
    print(row)

In [41]:
# do commit command
conn.commit()

In [42]:
# close the connection
conn.close()