In [3]:
import psycopg2
from sql_queries import create_table_queries, drop_table_queries


def create_database():
    """
    - Creates and connects to the sparkifydb
    - Returns the connection and cursor to sparkifydb
    """
    
    # connect to default database
    conn = psycopg2.connect("host=127.0.0.1 dbname=studentdb user=student password=student")
    conn.set_session(autocommit=True)
    cur = conn.cursor()
    
    # create sparkify database with UTF8 encoding
    cur.execute("DROP DATABASE IF EXISTS sparkifydb")
    cur.execute("CREATE DATABASE sparkifydb WITH ENCODING 'utf8' TEMPLATE template0")

    # close connection to default database
    conn.close()    
    
    # connect to sparkify database
    conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")
    cur = conn.cursor()
    
    return cur, conn


def drop_tables(cur, conn):
    """
    Drops each table using the queries in `drop_table_queries` list.
    """
    for query in drop_table_queries:
        cur.execute(query)
        conn.commit()


def create_tables(cur, conn):
    """
    Creates each table using the queries in `create_table_queries` list. 
    """
    for query in create_table_queries:
        cur.execute(query)
        conn.commit()


def main():
    """
    - Drops (if exists) and Creates the sparkify database. 
    
    - Establishes connection with the sparkify database and gets
    cursor to it.  
    
    - Drops all the tables.  
    
    - Creates all tables needed. 
    
    - Finally, closes the connection. 
    """
    cur, conn = create_database()
    
    drop_tables(cur, conn)
    create_tables(cur, conn)

    conn.close()


if __name__ == "__main__":
    main()

OperationalError: database "sparkifydb" is being accessed by other users
DETAIL:  There are 3 other sessions using the database.


In [None]:
# DROP TABLES
songplay_table_drop = "DROP TABLE IF EXISTS songplays"
user_table_drop = "DROP TABLE IF EXISTS users"
song_table_drop = "DROP TABLE IF EXISTS songs"
artist_table_drop = "DROP TABLE IF EXISTS artists"
time_table_drop = "DROP TABLE IF EXISTS time"


# CREATE TABLES
songplay_table_create = ("""CREATE TABLE songplays(
songplay_id SERIAL,
start_time TIMESTAMP REFERENCES time(start_time),
user_id VARCHAR(50) REFERENCES users(user_id),
level VARCHAR(50),
song_id VARCHAR(100) REFERENCES songs(song_id),
artist_id VARCHAR(100) REFERENCES artists(artist_id),
session_id BIGINT,
location VARCHAR(255),
user_agent TEXT,
PRIMARY KEY (songplay_id)) """)

user_table_create = ("""CREATE TABLE users(
user_id VARCHAR,
firstName VARCHAR(255),
lastName VARCHAR(255),
gender VARCHAR(1),
level VARCHAR(50),
PRIMARY KEY (user_id)) """)

song_table_create = ("""CREATE TABLE songs(
song_id VARCHAR(100),
title VARCHAR(255),
artist_id VARCHAR(100),
year INTEGER,
duration DOUBLE PRECISION,
PRIMARY KEY (song_id)) """)

artist_table_create = ("""CREATE TABLE artists(
artist_id VARCHAR(100),
name VARCHAR(255),
location VARCHAR(255),
latitude DOUBLE PRECISION,
longitude DOUBLE PRECISION,
PRIMARY KEY (artist_id)) """)


time_table_create = ("""CREATE TABLE time(
start_time TIMESTAMP,
hour INTEGER,
day INTEGER,
week INTEGER,
month INTEGER,
year INTEGER,
weekday INTEGER,
PRIMARY KEY (start_time)) """)

# INSERT RECORDS

songplay_table_insert = ("""INSERT INTO songplays (start_time, user_id, level, song_id, artist_id, session_id, location, user_agent) 
 VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """)

user_table_insert = ("""INSERT INTO users (user_id, firstName, lastName, gender, level) VALUES (%s, %s, %s, %s, %s) 
ON CONFLICT (user_id) DO UPDATE SET firstName=users.firstName, lastName=users.lastName, gender=users.gender, level=users.level """)

song_table_insert = ("""INSERT INTO songs (song_id, title, artist_id, year, duration) VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (song_id) DO UPDATE SET title=songs.title, artist_id=songs.artist_id,
year=songs.year, duration=songs.duration """)

artist_table_insert = ("""INSERT INTO artists (artist_id, name, location, latitude, longitude) VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (artist_id) DO UPDATE SET name=artists.name, location=artists.location, latitude=artists.latitude, 
longitude=artists.longitude """)

time_table_insert = ("""INSERT INTO time (start_time, hour, day, week, month, year, weekday) VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (start_time) DO UPDATE SET hour=time.hour, day=time.day, week=time.week, month=time.month, 
year=time.year, weekday=time.weekday """)

# FIND SONGS BY SONG ID AND ARTIST ID

song_select_by_song_id_artist_id = ("""SELECT s.song_id, a.artist_id FROM songs s, artists a
WHERE s.artist_id = a.artist_id  
    AND s.title = %s
    AND a.name = %s
    AND s.duration = %s """)

# FIND SONGS BY ID

song_select = ("""SELECT COUNT(*) FROM songs s
WHERE s.song_id = %s """)

# FIND ARTISTS BY ID

artist_select = ("""SELECT COUNT(*) FROM artists a
WHERE a.artist_id = %s """)

# FIND TIMES BY ID

time_select = ("""SELECT COUNT(*) FROM time t
WHERE t.start_time = %s """)

# FIND USERS BY ID

user_select = ("""SELECT COUNT(*) FROM users u
WHERE u.user_id = %s """)


# QUERY LISTS
create_table_queries = [user_table_create, song_table_create, artist_table_create, time_table_create, songplay_table_create]
drop_table_queries = [user_table_drop, song_table_drop, artist_table_drop, time_table_drop, songplay_table_drop]

In [5]:
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *


def process_song_file(cur, filepath):
    
    # open song file
    df = pd.read_json(filepath, typ='series')

    # insert song record
    song_data = df[['song_id','title','artist_id', 'year', 'duration']]
  
    # check for song_id duplicates
    cur.execute(song_select, (song_data[['song_id']]))
    results = cur.fetchone()
    
    if results[0] == 0:
        cur.execute(song_table_insert, song_data)
        
    # insert artist record
    artist_data = df[['artist_id','artist_name','artist_location', 'artist_latitude', 'artist_longitude']]
    
    # check for artist_id duplicates
    cur.execute(artist_select, (artist_data[['artist_id']]))
    results = cur.fetchone()
   
    if results[0] == 0:
        cur.execute(artist_table_insert, artist_data)
    
        
def process_log_file(cur, filepath):
    # open log file
    df = pd.read_json(filepath, lines=True)

    # filter by NextSong action
    df = df[df['page'] == 'NextSong']
    # convert timestamp column to datetime
    df['ts'] = pd.to_datetime(df['ts'], unit='ms')
    
    # insert time data records
    time_data = [df['ts'], df['ts'].dt.hour, df['ts'].dt.day,
                 df['ts'].dt.weekofyear, df['ts'].dt.month,
                 df['ts'].dt.year,df['ts'].dt.weekday]
    column_labels = ['ts', 'hour', 'day', 'week of year', 'month', 'year', 'weekday']
    
    assert isinstance(time_data, list), 'time_data should be a list'
    assert isinstance(column_labels, list), 'column_labels should be a list'


    dictionary = dict(zip(column_labels, time_data))
    time_df = pd.DataFrame.from_dict(dictionary)
    assert isinstance(time_df, pd.DataFrame), 'time_df should be a dataframe'
    
    
    for i, row in time_df.iterrows():
        # check for start_time duplicates
        cur.execute(time_select, [row.ts])
        results = cur.fetchone()
        
        if results[0] == 0: 
            cur.execute(time_table_insert, list(row))
        

    # load user table
    user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']]
    
    # insert user records
    for i, row in user_df.iterrows():
        # check for user_id duplicates
        cur.execute(user_select, (str(row.userId),))
        results = cur.fetchone()
        
        if results[0] == 0:
            cur.execute(user_table_insert, row)

    # insert songplay records
    for index, row in df.iterrows():
        
        # get song id and artist id from song and artist tables
        cur.execute(song_select_by_song_id_artist_id, (row.song, row.artist, row.length))
        results = cur.fetchone()
        
        if results:
            songid, artistid = results
        else:
            songid, artistid = None, None

        # insert songplay record
        songplay_data = (row.ts, row.userId, row.level, songid, artistid, row.sessionId, row.location, row.userAgent)
        cur.execute(songplay_table_insert, songplay_data)
      
    
def process_data(cur, conn, filepath, func):
    # get all files matching extension from directory
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))

    # get total number of files found
    num_files = len(all_files)
    print('{} files found in {}'.format(num_files, filepath))

    # iterate over files and process
    for i, datafile in enumerate(all_files, 1):
        func(cur, datafile)
        conn.commit()
        print('{}/{} files processed.'.format(i, num_files))

        
def main():
    conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")
    cur = conn.cursor()

    process_data(cur, conn, filepath='data/song_data', func=process_song_file)
    process_data(cur, conn, filepath='data/log_data', func=process_log_file)
 
    cur.close()
    conn.close()


if __name__ == "__main__":
    main()

71 files found in data/song_data
1/71 files processed.
2/71 files processed.
3/71 files processed.
4/71 files processed.
5/71 files processed.
6/71 files processed.
7/71 files processed.
8/71 files processed.
9/71 files processed.
10/71 files processed.
11/71 files processed.
12/71 files processed.
13/71 files processed.
14/71 files processed.
15/71 files processed.
16/71 files processed.
17/71 files processed.
18/71 files processed.
19/71 files processed.
20/71 files processed.
21/71 files processed.
22/71 files processed.
23/71 files processed.
24/71 files processed.
25/71 files processed.
26/71 files processed.
27/71 files processed.
28/71 files processed.
29/71 files processed.
30/71 files processed.
31/71 files processed.
32/71 files processed.
33/71 files processed.
34/71 files processed.
35/71 files processed.
36/71 files processed.
37/71 files processed.
38/71 files processed.
39/71 files processed.
40/71 files processed.
41/71 files processed.
42/71 files processed.
43/71 file