In [1]:
import os
import glob
import psycopg2
import pandas as pd

In [2]:
def create_database(db_name):
    """
    - Creates and connects to the database
    - Returns the connection and cursor to the created database
    """
    # connect to default database
    conn = psycopg2.connect(host="localhost", dbname="studentdb", user="student", password="student")
    conn.set_session(autocommit=True)
    cur = conn.cursor()
    
    # create sparkify database with UTF8 encoding
    cur.execute("DROP DATABASE IF EXISTS {db_name}".format(db_name=db_name))
    cur.execute("CREATE DATABASE {db_name} WITH ENCODING 'utf8' TEMPLATE template0".format(db_name=db_name))

    # close connection to default database
    conn.close()    
    
    # connect to sparkify database
    conn = psycopg2.connect(host="localhost", dbname=db_name, user="student", password="student")
    cur = conn.cursor()
    conn.commit()
    
    return conn, cur

In [3]:
conn, cur = create_database('sparkifydb')

In [4]:
def connect_to_db(db_name):
    try:
        #connect to database
        conn = psycopg2.connect(host="localhost", dbname=db_name, user="student", password="student")
        cur = conn.cursor()
        return conn, cur
    except Exception as e:
        return print("error connecting to {db}\n".format(db=db_name),e)

In [5]:
def get_files(filepath):
    """extracts files from a particular dir/folder """
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

## Processing/ETL_ing songs data begins here

In [6]:
song_files = get_files("./data/song_data")
song_files[0]

'/home/workspace/data/song_data/A/A/A/TRAAAAW128F429D538.json'

In [7]:
filepath = song_files[0]

In [8]:
df = pd.read_json(filepath, typ='series')
df

num_songs                            1
artist_id           ARD7TVE1187B99BFB1
artist_latitude                   None
artist_longitude                  None
artist_location        California - LA
artist_name                     Casual
song_id             SOMZWCG12A8C13C480
title                 I Didn't Mean To
duration                       218.932
year                                 0
dtype: object

In [9]:
#extracting data for songs to songs tables
song_data = [df.song_id, df.title, df.artist_id, df.year, df.duration]
song_data

['SOMZWCG12A8C13C480', "I Didn't Mean To", 'ARD7TVE1187B99BFB1', 0, 218.93179]

In [10]:
# inserting songs data into songs table requires the creation of songs table if not exist, then do the data insertion later.
# since we will create a lot of tables, we write a function designated for creating table so as not to repeat ourselves

def create_table(conn_,cur_, create_table_query):
    try:
        #create table
        cur_.execute(create_table_query)
        conn_.commit()
    except Exception as e:
        return print("error creating table\n",e)
    
def drop_table(conn_, cur_, drop_table_query):
    #we can choose to drop table if need be as in this project, cause we were instructed to do so.
    create_table(conn_,cur_,drop_table_query)

In [11]:
# now, we utilize our above function
songs_table_create_query = """CREATE TABLE IF NOT EXISTS songs 
            (song_id varchar PRIMARY KEY, song_title varchar, artist_id varchar, year int, duration numeric)"""

songs_table_drop_query = "DROP TABLE IF EXISTS songs"



In [12]:
cur, conn

(<cursor object at 0x7faddf46d618; closed: 0>,
 <connection object at 0x7faddeb153d8; dsn: 'user=student password=xxx dbname=sparkifydb host=localhost', closed: 0>)

In [13]:
drop_table(conn, cur, songs_table_drop_query)
create_table(conn, cur, songs_table_create_query)

In [14]:
%load_ext sql
%sql postgresql://student:student@localhost/sparkifydb
%sql SELECT * FROM songs LIMIT 5;

 * postgresql://student:***@localhost/sparkifydb
0 rows affected.


song_id,song_title,artist_id,year,duration


In [15]:
def data_inserter(conn_, cur_, table, col_format, data_row):
    query = "INSERT INTO {table_name} VALUES (".format(table_name=table)
    query += col_format +")" 
    try:
        cur_.execute(query,data_row)
        conn_.commit()
    except Exception as e:
        print("error inserting values into table\n",e)

In [16]:
data_inserter(conn,cur,'songs',"%s,%s,%s,%s,%s",song_data)

In [17]:
#insert_into_songs_query = "INSERT INTO songs VALUES(%s, %s, %s, %s, %s);", song_data

In [18]:
#data_inserter(conn,cur,insert_into_songs_query)

In [19]:
# cur.execute("INSERT INTO songs VALUES (%s, %s, %s, %s, %s);", song_data)
# conn.commit()

In [20]:
#%sql rollback;

In [21]:
%sql SELECT * FROM songs LIMIT 5;

 * postgresql://student:***@localhost/sparkifydb
1 rows affected.


song_id,song_title,artist_id,year,duration
SOMZWCG12A8C13C480,I Didn't Mean To,ARD7TVE1187B99BFB1,0,218.93179


# create and insert into artist table

In [22]:
artist_table_create_query = """CREATE TABLE IF NOT EXISTS artists 
            (artist_id varchar PRIMARY KEY, artist_name varchar, artist_location varchar, artist_longitude point, artist_latitude point)"""

artist_table_drop_query = "DROP TABLE IF EXISTS artists"

In [23]:
artist_data = [df.artist_id, df.artist_name, df.artist_location, df.artist_longitude, df.artist_latitude]
artist_data

['ARD7TVE1187B99BFB1', 'Casual', 'California - LA', None, None]

In [24]:
drop_table(conn, cur, artist_table_drop_query)
create_table(conn, cur, artist_table_create_query)

In [25]:
data_inserter(conn,cur,'artists',"%s,%s,%s,%s,%s",artist_data)

In [26]:
%sql SELECT * FROM artists LIMIT 5;

 * postgresql://student:***@localhost/sparkifydb
1 rows affected.


artist_id,artist_name,artist_location,artist_longitude,artist_latitude
ARD7TVE1187B99BFB1,Casual,California - LA,,


# Processing log data

In [27]:
log_files = get_files("./data/log_data")
log_files[0]

'/home/workspace/data/log_data/2018/11/2018-11-30-events.json'

In [28]:
filepath = log_files[0]

In [29]:
df = pd.read_json(filepath, lines=True)
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Stephen Lynch,Logged In,Jayden,M,0,Bell,182.85669,free,"Dallas-Fort Worth-Arlington, TX",PUT,NextSong,1540992000000.0,829,Jim Henson's Dead,200,1543537327796,Mozilla/5.0 (compatible; MSIE 10.0; Windows NT...,91
1,Manowar,Logged In,Jacob,M,0,Klein,247.562,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Shell Shock,200,1543540121796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
2,Morcheeba,Logged In,Jacob,M,1,Klein,257.41016,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Women Lose Weight (Feat: Slick Rick),200,1543540368796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
3,Maroon 5,Logged In,Jacob,M,2,Klein,231.23546,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Won't Go Home Without You,200,1543540625796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
4,Train,Logged In,Jacob,M,3,Klein,216.76363,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Hey_ Soul Sister,200,1543540856796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73


In [30]:
df = df[df['page']=='NextSong']
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Stephen Lynch,Logged In,Jayden,M,0,Bell,182.85669,free,"Dallas-Fort Worth-Arlington, TX",PUT,NextSong,1540992000000.0,829,Jim Henson's Dead,200,1543537327796,Mozilla/5.0 (compatible; MSIE 10.0; Windows NT...,91
1,Manowar,Logged In,Jacob,M,0,Klein,247.562,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Shell Shock,200,1543540121796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
2,Morcheeba,Logged In,Jacob,M,1,Klein,257.41016,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Women Lose Weight (Feat: Slick Rick),200,1543540368796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
3,Maroon 5,Logged In,Jacob,M,2,Klein,231.23546,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Won't Go Home Without You,200,1543540625796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
4,Train,Logged In,Jacob,M,3,Klein,216.76363,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Hey_ Soul Sister,200,1543540856796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73


In [31]:
#converts integer datetime since epoch to datetime
df['date'] = pd.to_datetime(df['ts'], unit='ms')

In [32]:
#df.date.dt.date.head(5)
#df.head()
#hour, day, week of year, month, year, and weekday
df['hour'] = df['date'].dt.hour
df['day'] = df['date'].dt.day
df['week_of_year'] = df['date'].dt.week
df['month'] = df['date'].dt.month
df['year'] = df['date'].dt.year
df['weekday'] = df['date'].dt.dayofweek

In [33]:
time_df = df[['hour','day','week_of_year','month','year','weekday']]
time_df.head(2)

Unnamed: 0,hour,day,week_of_year,month,year,weekday
0,0,30,48,11,2018,4
1,1,30,48,11,2018,4


In [34]:
# checking to see which columns is unique to be used as primary key for our table
dic = {}
for col in df.columns:
    res = df[col].is_unique
    dic[col] = res
dic

{'artist': False,
 'auth': False,
 'firstName': False,
 'gender': False,
 'itemInSession': False,
 'lastName': False,
 'length': False,
 'level': False,
 'location': False,
 'method': False,
 'page': False,
 'registration': False,
 'sessionId': False,
 'song': False,
 'status': False,
 'ts': True,
 'userAgent': False,
 'userId': False,
 'date': True,
 'hour': False,
 'day': False,
 'week_of_year': False,
 'month': False,
 'year': False,
 'weekday': False}

In [35]:
#create time table
time_table_create_query = """CREATE TABLE IF NOT EXISTS time 
            (hour int, day int, week_of_year int, month int, year int, weekday int)"""

time_table_drop_query = "DROP TABLE IF EXISTS time"
drop_table(conn,cur,time_table_drop_query)
create_table(conn,cur,time_table_create_query)

In [36]:
#insert time records into time table
for i, row in time_df.iterrows():
#     cur.execute(time_table_insert, list(row))
#     conn.commit()
    data_inserter(conn,cur,'time',"%s,%s,%s,%s,%s,%s",list(row))   

In [37]:
%sql SELECT * FROM time LIMIT 5;

 * postgresql://student:***@localhost/sparkifydb
5 rows affected.


hour,day,week_of_year,month,year,weekday
0,30,48,11,2018,4
1,30,48,11,2018,4
1,30,48,11,2018,4
1,30,48,11,2018,4
1,30,48,11,2018,4


# Users table

In [38]:
user_df = df[['userId','firstName','lastName','gender','level']]
user_df.head(3)

Unnamed: 0,userId,firstName,lastName,gender,level
0,91,Jayden,Bell,M,free
1,73,Jacob,Klein,M,paid
2,73,Jacob,Klein,M,paid


In [39]:
#change from camelcase to snake case
user_df.rename(columns={'userId':'user_id','firstName':'first_name','lastName':'last_name'},inplace=True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  return super(DataFrame, self).rename(**kwargs)


In [40]:
#user table create query
user_table_create_query = """CREATE TABLE IF NOT EXISTS users
        (user_id int, first_name varchar, last_name varchar, gender varchar, level varchar)"""

user_table_drop_query = "DROP TABLE IF EXISTS users"

In [41]:
drop_table(conn,cur,user_table_drop_query)
create_table(conn,cur,user_table_create_query)
#%sql rollback

In [42]:
#insert time records into time table
for i, row in user_df.iterrows():
#     cur.execute(time_table_insert, list(row))
#     conn.commit()
    data_inserter(conn,cur,'users',"%s,%s,%s,%s,%s",list(row))   

In [43]:
%sql select * from users limit 5;

 * postgresql://student:***@localhost/sparkifydb
5 rows affected.


user_id,first_name,last_name,gender,level
91,Jayden,Bell,M,free
73,Jacob,Klein,M,paid
73,Jacob,Klein,M,paid
73,Jacob,Klein,M,paid
73,Jacob,Klein,M,paid


In [45]:
%sql select * from songs limit 5;

 * postgresql://student:***@localhost/sparkifydb
1 rows affected.


song_id,song_title,artist_id,year,duration
SOMZWCG12A8C13C480,I Didn't Mean To,ARD7TVE1187B99BFB1,0,218.93179


In [51]:
df.head(2).iloc[:,:19]

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,date
0,Stephen Lynch,Logged In,Jayden,M,0,Bell,182.85669,free,"Dallas-Fort Worth-Arlington, TX",PUT,NextSong,1540992000000.0,829,Jim Henson's Dead,200,1543537327796,Mozilla/5.0 (compatible; MSIE 10.0; Windows NT...,91,2018-11-30 00:22:07.796
1,Manowar,Logged In,Jacob,M,0,Klein,247.562,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Shell Shock,200,1543540121796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73,2018-11-30 01:08:41.796


In [55]:
#%sql select * from artists limit 5;
%sql select song_id, a.artist_id from artists a inner join songs s on a.artist_id=s.artist_id;

 * postgresql://student:***@localhost/sparkifydb
1 rows affected.


song_id,artist_id
SOMZWCG12A8C13C480,ARD7TVE1187B99BFB1


In [56]:
%sql select * from artists;

 * postgresql://student:***@localhost/sparkifydb
1 rows affected.


artist_id,artist_name,artist_location,artist_longitude,artist_latitude
ARD7TVE1187B99BFB1,Casual,California - LA,,


In [None]:
def print_my_name():
    print("Jubril Khan is my name")
    