# SQL Queries

## Set up Database

In [1]:
import psycopg2 as pg2
import json
from typing import Tuple
from psycopg2.extensions import cursor, connection


def load_credentials(path: str) -> Tuple[str, str, str]:
    "Extract db credentials from json file."
    try:
        with open(path) as f:
            data = json.load(f)
            USER = data['username']
            PASS = data['password']
            NAME = data['name']
    except Exception as e:
        print(e)
    
    return (NAME, USER, PASS)

def create_connect_db(NAME: str, USER: str, PASS: str) -> Tuple[connection, cursor]:
    "Stablish connection and cursor to db given the credentials."
    try:
        conn = pg2.connect(
            host='localhost',
            database=NAME,
            user=USER,
            password=PASS,
        )
        conn.set_session(autocommit=True)
        cur = conn.cursor()

        cur.execute("DROP DATABASE IF EXISTS sparkifydb")
        cur.execute("CREATE DATABASE sparkifydb WITH ENCODING 'utf8' TEMPLATE template0")

        cur.close()
        conn.close()

        conn = pg2.connect(
            host='localhost',
            database='sparkifydb',
            user=USER,
            password=PASS
        )
        conn.set_session(autocommit=True)
        cur = conn.cursor()
    except Exception as e:
        print(e)

    return (conn, cur)

## Connect to Database

In [2]:
path = './db_credentials.json'
NAME, USER, PASS = load_credentials(path)
conn, cur = create_connect_db(NAME, USER, PASS)

## DROP and CREATE

In [3]:
drop_songplays = "DROP TABLE IF EXISTS songplays"

create_songplays = """
CREATE TABLE IF NOT EXISTS songplays (
    songplay_id SERIAL PRIMARY KEY,
    start_time TIMESTAMP,
    user_id INT,
    level VARCHAR(10),
    song_id VARCHAR(20),
    artist_id VARCHAR(20),
    session_id INT,
    location VARCHAR(50),
    user_agent VARCHAR(150)
)
"""

drop_users = "DROP TABLE IF EXISTS users"

create_users = """
CREATE TABLE IF NOT EXISTS users (
    user_id INT PRIMARY KEY,
    first_name VARCHAR(50),
    last_name VARCHAR(50),
    gender CHAR(1),
    level VARCHAR(10)
)
"""

drop_songs = "DROP TABLE IF EXISTS songs"

create_songs = """
CREATE TABLE IF NOT EXISTS songs (
    song_id VARCHAR(20) PRIMARY KEY,
    title VARCHAR(100),
    artist_id VARCHAR(20) NOT NULL,
    year INT,
    duration FLOAT(5)
)
"""

drop_artists = "DROP TABLE IF EXISTS artists"

create_artists = """
CREATE TABLE IF NOT EXISTS artists (
    artist_id VARCHAR(20) PRIMARY KEY,
    name VARCHAR(100),
    location VARCHAR(100),
    latitude FLOAT(5),
    longitude FLOAT(5)
)
"""

drop_time = "DROP TABLE IF EXISTS time"

create_time = """
CREATE TABLE IF NOT EXISTS time (
    start_time TIMESTAMP PRIMARY KEY,
    hour INT,
    day INT,
    week SMALLINT,
    month SMALLINT,
    year SMALLINT,
    weekday SMALLINT
)
"""

drop_queries = [drop_songplays, drop_songs, drop_artists, drop_time, drop_users]
create_queries = [create_songplays, create_songs, create_artists, create_time, create_users]

## Run DROP and CREATE

In [4]:
for query in drop_queries:
    "DROP TABLE IF EXISTS ..."
    try:
        cur.execute(query)
    except Exception as e:
        print(e)

for query in create_queries:
    "CREATE TABLE IF NOT EXISTS ..."
    try:
        cur.execute(query)
    except Exception as e:
        print(e)

## INSERT INTO

In [5]:
insert_songplays = ("""
INSERT INTO songplays
(start_time, user_id, level, song_id, artist_id, session_id, location, user_agent)
VALUES 
(%s, %s, %s, %s, %s, %s, %s, %s)
""")

insert_users = ("""
INSERT INTO users (user_id, first_name, last_name, gender, level)
VALUES (%s, %s, %s, %s, %s) ON CONFLICT (user_id) DO UPDATE SET level = EXCLUDED.level;
""")

insert_songs = ("""
INSERT INTO songs (song_id, title, artist_id, year, duration)
VALUES (%s, %s, %s, %s, %s) ON CONFLICT DO NOTHING;
""")

insert_artists = ("""
INSERT INTO artists (artist_id, name, location, latitude, longitude)
VALUES (%s, %s, %s, %s, %s) ON CONFLICT DO NOTHING;
""")


insert_time = ("""
INSERT INTO time (start_time, hour, day, week, month, year, weekday)
VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING;
""")

INSERT = {
    'songplays': insert_songplays,
    'users': insert_users,
    'time': insert_time,
    'artists': insert_artists,
    'sogns': insert_songs,
}

## Test Creation

In [6]:
test_songplays = "SELECT COUNT(*) FROM songplays"

test_users = "SELECT COUNT(*) FROM users"

test_artists = "SELECT COUNT(*) FROM artists"

test_songs = "SELECT COUNT(*) FROM songs"

test_time = "SELECT COUNT(*) FROM time"


test_queries = [test_songplays, test_users, test_artists, test_songs, test_time]
table_names = ['test_songplays', 'test_users', 'test_artists', 'test_songs', 'test_time']

for table_name, query in zip(table_names, test_queries):
    try:
        cur.execute(query)
        row = cur.fetchone()

        print('Table:', table_name)
        if not row:
            print('No data yet.\n')
        while row:
            print(row)
            row = cur.fetchone()
    except Exception as e:
        print(e)
    

Table: test_songplays
(0,)
Table: test_users
(0,)
Table: test_artists
(0,)
Table: test_songs
(0,)
Table: test_time
(0,)


## SELECT specific song

In [7]:
select_song = """
SELECT songs.song_id, artists.artist_id FROM songs
JOIN artists ON songs.artist_id = artists.artist_id
WHERE songs.title = %s
AND artists.name = %s
AND songs.duration = %s
"""

## ETL Pipeline

In [8]:
import os
import glob
from typing import List


current_dir = os.getcwd()

def get_all_files(path: str) -> List[str]:
    all_files = []
    for root, sub_dirs, files in os.walk(path):
        for file in files:
            if file.endswith('.json'):
                file_path = os.path.join(root, file)
                all_files.append(file_path)

    return all_files

song_path = './song_data/'
song_files = get_all_files(song_path)

log_path = './log_data/'
log_files = get_all_files(log_path)

print(song_files[:5])
print()
print(log_files[:5])

['./song_data/A/B/B/TRABBAM128F429D223.json', './song_data/A/B/B/TRABBNP128F932546F.json', './song_data/A/B/B/TRABBXU128F92FEF48.json', './song_data/A/B/B/TRABBOR128F4286200.json', './song_data/A/B/B/TRABBVJ128F92F7EAA.json']

['./log_data/2018/11/2018-11-07-events.json', './log_data/2018/11/2018-11-22-events.json', './log_data/2018/11/2018-11-18-events.json', './log_data/2018/11/2018-11-04-events.json', './log_data/2018/11/2018-11-01-events.json']


In [9]:
import numpy as np
import pandas as pd

sample_song = pd.read_json(song_files[0], lines=True)
sample_log = pd.read_json(log_files[0], lines=True)

In [10]:
sample_song.head()

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,ARBGXIG122988F409D,37.77916,-122.42005,California - SF,Steel Rain,SOOJPRH12A8C141995,Loaded Like A Gun,173.19138,0


In [11]:
sample_log.head(1)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Miami Horror,Logged In,Kate,F,88,Harrell,250.8273,paid,"Lansing-East Lansing, MI",PUT,NextSong,1540473000000.0,293,Sometimes,200,1541548876796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",97


## INSERT files

In [12]:
def insert_songs_data(song_files: List[str], cur=cur, base_query=insert_songs) -> None:
    for path in song_files:
        df = pd.read_json(path, lines=True)
        data = df[['song_id', 'title', 'artist_id', 'year', 'duration']].values[0].tolist()
        cur.execute(base_query, data)

insert_songs_data(song_files)

In [13]:
def insert_user_data(log_files: List[str], cur=cur, base_query=insert_users) -> None:
    for path in log_files:
        df = pd.read_json(path, lines=True)
        data = df[['userId', 'firstName', 'lastName', 'gender', 'level']].values[0].tolist()
        if data[0] == '':
            continue
        cur.execute(base_query, data)

insert_user_data(log_files)

In [14]:
def insert_artists_data(song_files: List[str], base_query=insert_artists) -> None:
    for path in song_files:
        df = pd.read_json(path, lines=True)
        data = df[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']].values[0].tolist()
        cur.execute(base_query, data)

insert_artists_data(song_files)

In [15]:
def insert_time_data(log_files: List[str], base_query=insert_time) -> None:
    for path in log_files:
        df = pd.read_json(path, lines=True)
        df = df[df['page'] == 'NextSong']
        df['ts'] = pd.to_datetime(df['ts'], unit='ms')
        for row_idx in range(len(df)):
            start_time = df['ts'].iloc[row_idx]
            data = [start_time, start_time.hour, start_time.day, start_time.week, start_time.month, start_time.year, start_time.weekday()]
            cur.execute(base_query, data)

insert_time_data(log_files)

In [16]:
def insert_songplay_data(log_files: List[str], base_query=insert_songplays) -> None:

    for path in log_files:
        df = pd.read_json(path, lines=True)
        df = df[df['page'] == 'NextSong']
        df['ts'] = pd.to_datetime(df['ts'], unit='ms')
        
        for index, row in df.iterrows():
            cur.execute(select_song, (row.song, row.artist, row.length))
            results = cur.fetchone()
            song_id, artist_id = None, None
            if results:
                song_id, artist_id = results

            songplay_data = (row.ts, row.userId, row.level, song_id, \
                             artist_id, row.sessionId, row.location, row.userAgent)
            
            cur.execute(base_query, songplay_data)

insert_songplay_data(log_files)

In [17]:
for table_name, query in zip(table_names, test_queries):
    try:
        cur.execute(query)
        row = cur.fetchone()

        print('Table:', table_name)
        if not row:
            print('No data yet.\n')
        while row:
            print(row)
            row = cur.fetchone()
    except Exception as e:
        print(e)
    

Table: test_songplays
(6820,)
Table: test_users
(25,)
Table: test_artists
(69,)
Table: test_songs
(71,)
Table: test_time
(6813,)


In [18]:
def close_db(conn, cur) -> None:
    "Drop db and tables, close connection and cursor."
    for query in drop_queries:
        cur.execute(query)
    cur.close()
    conn.close()
    NAME, USER, PASS = load_credentials('./db_credentials.json')
    conn = pg2.connect(database=NAME, user=USER, password=PASS)
    conn.set_session(autocommit=True)
    cur = conn.cursor()
    cur.execute('DROP DATABASE IF EXISTS sparkifydb')
    cur.close()
    conn.close()
    print('Connection to databse was closed successfully.')

close_db(conn, cur)

Connection to databse was closed successfully.
