### Import Libraries

In [13]:
#  import libraries
import os
import glob
import psycopg2
import pandas as pd
import sqlalchemy
from datetime import datetime

### Define connection variables

In [14]:
    # Define connection variables
    initConn = "host=localhost, dbname=postgres user=student password=student"  # intial connection to create SparkifyDB
    constr = "host=localhost dbname=sparkifydb user=student password=student"  # connect to SparkifyDB
    pgConn = "postgresql://student:student@localhost:5432/sparkifydb"   # connect to sparkify via sqlalchemy

### Define python functions for: 
* get_files: parse through filepath and retrieve json files
* getSongFiles: retrieving all song files from path
* getSongData: from each song file, retrieve song data into a list 
* prepareInsertQueries: from each song data, prepare an insert query statement
* defcon: define connection to Postgres DB

In [15]:
def get_files(filepath):
    all_files = []
    for root,dirs,files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for file in files:
            all_files.append(os.path.abspath(file))
    return all_files

In [16]:
def extract_data(dataFiles):
    extract = []
    for file in dataFiles:
        extract.append(pd.read_json(file,lines=True))
    return extract

In [17]:
# query = """INSERT INTO songs VALUES('{id}','{title}','{artist}',{duration});""".format(id=df.song_id[0],title=df.title[0].replace("'","''"),artist=df.artist_id[0],year=df.year[0],duration=df.duration[0])

def preapare_insert_queries(songData):
    insertQueries = []
    for data in songData:
        insertQueries.append(        
            """INSERT INTO songs VALUES('{id}','{title}','{artist}','{year}',{duration});""".format(id=data.song_id[0],title=data.title[0].replace("'","''"),artist=data.artist_id[0],year=data.year[0],duration=data.duration[0])
        )
    return insertQueries

In [18]:
def defcon(constr,commit):       
    dbconn = constr
    conn = psycopg2.connect(dbconn)
    conn.set_session(autocommit=commit)
    return conn

In [19]:
def dfToSql(dfName, data, constr, index, conn):
    if conn == '':
        engine = sqlalchemy.create_engine(constr)
        conn = engine.connect()
    print(
        "==> {name} table created with {number} records".format(
            name=dfName.capitalize(),
            number=data.to_sql(dfName, conn, index=index),
        )
    )
    return conn

In [20]:
def createDB(constr,name):
    dropDB = "DROP DATABASE IF EXISTS {} WITH (FORCE);".format(name)
    createSql = "CREATE DATABASE {} WITH ENCODING 'utf8';".format(name)
    conn = defcon(constr,True)
    cur = conn.cursor()
    cur.execute(dropDB)
    cur.execute(createSql)
    conn.close()

In [21]:
def insertSql(sql,data,constr): 
    conn = defcon(constr,True)
    cur = conn.cursor()
    cur = cur.execute(sql,data)
    conn.commit()
    return cur

In [22]:
def getTimeDet(timestamp):
    stamp = datetime.fromtimestamp(timestamp/1000)
    return [timestamp, stamp.hour, stamp.day, stamp.strftime('%V'), stamp.month, stamp.year, stamp.strftime('%A')]

### Connect to initial DB, create sparkifyDB and generate connect to sparkifyDB

In [23]:
createDB(initConn,'studentdb')

In [24]:
conn = defcon(constr,True)
cur = conn.cursor()

### Create Tables

In [25]:
createSongs = """
    CREATE TABLE IF NOT EXISTS songs (
    song_id VARCHAR, 
    title VARCHAR, 
    artist_id VARCHAR, 
    year int, 
    duration float 
    );
"""

createArtists = """
    CREATE TABLE IF NOT EXISTS artists(
    artist_id VARCHAR, 
    name VARCHAR, 
    location VARCHAR, 
    latitude float, 
    longitude float 
    );
"""

createTime = """
    CREATE TABLE IF NOT EXISTS time(
    timestamp BIGINT, 
    hour int, 
    day int, 
    week VARCHAR, 
    month int, 
    year int, 
    weekday VARCHAR 
    );
"""

createUsers = """
    CREATE TABLE IF NOT EXISTS users (
    user_id VARCHAR, 
    firstname VARCHAR, 
    lastnamt VARCHAR, 
    gender VARCHAR, 
    level VARCHAR 
    );
"""

createSongPlay = """
    CREATE TABLE IF NOT EXISTS songPlays (
    timestamp BIGINT, 
    user_id VARCHAR, 
    level VARCHAR, 
    song_id VARCHAR, 
    artist_id VARCHAR, 
    session_id VARCHAR, 
    location VARCHAR, 
    useragent VARCHAR 
    );
"""

In [26]:
insertSql(createSongs,'',constr)
insertSql(createArtists,'',constr)
insertSql(createTime,'',constr)
insertSql(createUsers,'',constr)
insertSql(createSongPlay,'',constr)

### Extract Data from source

In [27]:
song_files = extract_data(get_files('data/song_data/'))
log_files = extract_data(get_files('data/log_data/'))

In [28]:
song_info = pd.concat(song_files).drop('num_songs',axis=1)
dropCols = "auth itemInSession method registration status".split()
log_info = pd.concat(log_files).drop(dropCols,axis=1)
log_info = log_info[log_info['page']=='NextSong']

### Prepare dataframes

In [29]:
song_info.head(1)

Unnamed: 0,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,ARD7TVE1187B99BFB1,,,California - LA,Casual,SOMZWCG12A8C13C480,I Didn't Mean To,218.93179,0


In [30]:
log_info.head(1)

Unnamed: 0,artist,firstName,gender,lastName,length,level,location,page,sessionId,song,ts,userAgent,userId
2,Des'ree,Kaylee,F,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",NextSong,139,You Gotta Be,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


Prepare Song Dataframe

In [31]:
songcols = "song_id title artist_id year duration".split()
song_data = song_info[songcols]

In [32]:
song_data.head()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOMZWCG12A8C13C480,I Didn't Mean To,ARD7TVE1187B99BFB1,0,218.93179
0,SOCIWDW12A8C13D406,Soul Deep,ARMJAGH1187FB546F3,1969,148.03546
0,SOXVLOJ12AB0189215,Amor De Cabaret,ARKRRTF1187B9984DA,0,177.47546
0,SONHOTT12A8C13493C,Something Girls,AR7G5I41187FB4CE6C,1982,233.40363
0,SOFSOCN12A8C143F5D,Face the Ashes,ARXR32B1187FB57099,2007,209.60608


Artist Data Preparation

In [33]:
artistcols = "id name location latitude longitude".split()
artistcols = ["artist_"+ x for x in artistcols]
artist_data = song_info[artistcols]

In [34]:
artist_data.head()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,ARD7TVE1187B99BFB1,Casual,California - LA,,
0,ARMJAGH1187FB546F3,The Box Tops,"Memphis, TN",35.14968,-90.04892
0,ARKRRTF1187B9984DA,Sonora Santanera,,,
0,AR7G5I41187FB4CE6C,Adam Ant,"London, England",,
0,ARXR32B1187FB57099,Gob,,,


Time Data Preparation

In [35]:
timecols = "ts hour day week month year weekday".split()
log_data = log_info[log_info['page']=='NextSong']
time_info = list(log_data['ts'].map(lambda x: getTimeDet(x)))
time_data = pd.DataFrame(time_info,columns=timecols)
time_data.head()

Unnamed: 0,ts,hour,day,week,month,year,weekday
0,1541106106796,2,2,44,11,2018,Friday
1,1541106352796,2,2,44,11,2018,Friday
2,1541106496796,2,2,44,11,2018,Friday
3,1541106673796,2,2,44,11,2018,Friday
4,1541107053796,2,2,44,11,2018,Friday


Users Data Preparation

In [36]:
usercols = "userId firstName lastName gender level".split()
user_data = log_info[usercols]
user_data.head()

Unnamed: 0,userId,firstName,lastName,gender,level
2,8,Kaylee,Summers,F,free
4,8,Kaylee,Summers,F,free
5,8,Kaylee,Summers,F,free
6,8,Kaylee,Summers,F,free
7,8,Kaylee,Summers,F,free


Songplays Data Preparation

In [37]:
log_data.head(1)

Unnamed: 0,artist,firstName,gender,lastName,length,level,location,page,sessionId,song,ts,userAgent,userId
2,Des'ree,Kaylee,F,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",NextSong,139,You Gotta Be,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


In [38]:
log_cols = "ts userId level song artist sessionId location userAgent length".split()
songplay_data = log_data[log_cols]

### Prepare Insert Query and Data

In [39]:
insertSongsSql = """INSERT INTO songs VALUES (%s,%s,%s,%s,%s);"""
insertArtistsSql = """INSERT INTO artists VALUES (%s,%s,%s,%s,%s);"""
insertUsersSql = """INSERT INTO users VALUES (%s,%s,%s,%s,%s);"""
insertTimeSql = """INSERT INTO time VALUES (%s,%s,%s,%s,%s,%s,%s);"""
insertSongplaySql = """INSERT INTO songplays VALUES (%s,%s,%s,%s,%s,%s,%s,%s);"""
queries = [insertSongsSql,insertArtistsSql,insertUsersSql,insertTimeSql]

In [40]:
song_values = list(song_data.values)[0]
artist_values = list(artist_data.values)[0]
user_values = list(user_data.values)[0]
datasets = [song_data,artist_data,user_data,time_data]

### Execute

In [41]:
for index,data in enumerate(datasets):
    for value in list(data.values):
        cur.execute(queries[index],value)

In [42]:
song_select ="""SELECT s.song_id, s.artist_id FROM songs s 
                JOIN artists a ON s.artist_id = a.artist_id
                WHERE s.title = %s AND a.name = %s AND s.duration = %s
            """

In [43]:
song = []
for index,row in songplay_data.iterrows():   
    cur.execute(song_select,(row.song, row.artist, row.length))
    results = cur.fetchone()
    if results:
        song_id, artist_id = results
        song.append([song_id,artist_id])
    else:
        song_id, artist_id = None, None
    
    songplay = (row.ts, row.userId, row.level, song_id, artist_id, row.sessionId, row.location, row.userAgent)
    cur.execute(insertSongplaySql,songplay)

### Close Connections

In [44]:
song

[['SOZCTXZ12AB0182364', 'AR5KOSW1187FB35FF4']]

In [45]:
cur.close()
conn.close()