## Introduction:
  This is for the Udacity Data Engineering Postgres Data modelling Project (project 1) and is an exercise to load 5 
   tables in a star schema containing music data from user listening logs and songs JSON files

  The project is to set up a Postgres database using a star schema containing 5 tables for a 
  fictitious startup called Sparkify, who wants to analyze the data they've been collecting on 
  songs and user activity on their new music streaming app.
  
### Table Overview
    Database consists of 5 tables.  
    * Fact Table:  Songplay
    * Dimension Table: Songs
    * Dumension Table: Artists
    * Dimension Table: Time
    * Dimension Table: Users
    
  ![image.png](attachment:image.png)
  

### ETL Descripton:
      The ETL will:
       1. Drop existing tables, then Create tables using a script called create_tables.py.  
           - Create_tables.py will will use statements in script sql_queries.py

       2. Loop through a series of song files which are in JSON format.   For each song record:
           - Load the Songs dimension table
           - Load the Artists dimension table
           
       3. Loop through user log files.   For each log record:
           - Load the users dimension table.  Update Level if match on user_id
           - Parse the timestamp and load a time dimension table
           - Load a songplay fact table using the log record plus data from the songs and artists dimension tables
      
 
 ## Table Metadata
 
     1.  Songplay - fact table for the steaming data.   
         - songplay_id (serial) - a sequential primary key
         - start_time (timestamp) - time when user started listening
         - user_id (integer) - Identifier of the user who is listening
         - level (varchar)  - Subscription level, paid or free
         - song_id (varchar) - identifier for the song being played
         - artist_id (varchar) - identifier for the artist of the song played
         - session_id (integer) - identifier for the session
         - location (varchar) - location of the user
         - user_agent (varchar) - application song is streamed from

    2.  User Table: Contains information on the person streaming the song
         - user_id (integer) - Identifier of the user who is listening
         - first_name (varchar) - first name of the user
         - last_name (varchar) - last name of the user
         - gender (varchar) - gender of the user
         - level (varchar) - Subscription level, paid or free

    3.  Song table:  Dimension table with data on the song:
         - song_id (varchar) - unique identifier of the song
         - title (varchar) - title of the song
         - artist_id (varchar) - unique identfier of the artist who created the song
         - year (integer) - year the song was published
         - duration (numeric) - length of play time for the song

     4. Artist table: Dimension table with data on the artist
         - artist_id (varchar) - unique identifer for the artist
         - name (varchar) - artist name
         - location (varchar) - location of the artist
         - latitude (double precision) - latitude of the artists location
         - longitude (double precision) - longitude of the artists location

      5. Time table: contains the time the user played the song
         - start_time (timestamp) - time when user started listening
         - hour (integer) -  the hour from the timestamp
         - day (integer) - the day from the timestamp
         - week (integer) - the week from the timestamp
         - month (integer) - the month from the timestamp
         - year (integer) - the year from the timestamp
         - weekday (intger) - the day of the week from the timestamp

     This is a jupyter notebook version of etl.py.  It can also be run to load the fact and dimension tables:  
        - Calls create_tables.py to Drop existing tables and Create Tables
        - Reads Song JSON files
        - For each Song record:  Loads panda dataframe for Songs and Artists Dimension Tables.  
        - Loop through song data array and perform insert for Song dimension table
        - Loop through song data array and permorm insert for Artist dimension table
        - Loop through log data and load it into a pandas dataframe 
        - For each record in each log file, build arrays for laoding time, users and songplay tables
        - Split time into multiple columns containing hour, day, week, month and day of week
        - Loop through time array.  Load time table.   
         -loop through user array and load user table.  
             Update level column if user_id already exists
         - Build Songplay dataframe
         - Loop through songplay dataframe.  Look up Look up song, artist and duration 
             in song and artist dimension tables.  Insert record into songplay table.
          - Close Connection to database

In [1]:
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *

In [2]:
!python create_tables.py

In [3]:
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")
cur = conn.cursor()

In [4]:
conn.set_session(autocommit=True)

Description: This function is responsible for getting the location of all the files for the path passed into it and appending them into one dataframe

Arguments: filepath: log data or song data file path
Returns: dataframe of all file locations


In [5]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

# Process `song_data`
Perform ETL on the first dataset, `song_data`, to create the `songs` and `artists` dimensional tables.


In [6]:
song_files = get_files("data/song_data")


In [7]:
filepath = song_files[0]

In [8]:
songdf = pd.DataFrame()
for filepath in song_files:
    df = pd.read_json(filepath,lines=True)
    songdf = songdf.append(df)
print(len(songdf))
songdf.head()

74


Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARD7TVE1187B99BFB1,,California - LA,,Casual,218.93179,1,SOMZWCG12A8C13C480,I Didn't Mean To,0
0,ARNTLGG11E2835DDB9,,,,Clp,266.39628,1,SOUDSGM12AC9618304,Insatiable (Instrumental Version),0
0,AR8ZCNI1187B9A069B,,,,Planet P Project,269.81832,1,SOIAZJW12AB01853F1,Pink World,1984
0,AR10USD1187B99F3F1,,"Burlington, Ontario, Canada",,Tweeterfriendly Music,189.57016,1,SOHKNRJ12A6701D1F8,Drop of Rain,0
0,ARMJAGH1187FB546F3,35.14968,"Memphis, TN",-90.04892,The Box Tops,148.03546,1,SOCIWDW12A8C13D406,Soul Deep,1969


#### Replace non-numeric in numeric fields with 0

In [9]:
## Replace any NaN with 0 for numeric 

songdf['duration'] = pd.to_numeric(songdf['duration']).fillna(0)
songdf['year'] = pd.to_numeric(songdf['year']).fillna(0)
songdf['artist_latitude'] = pd.to_numeric(songdf['artist_latitude']).fillna(0)
songdf['artist_longitude'] = pd.to_numeric(songdf['artist_longitude']).fillna(0)


## #1: `songs` Table
#### Extract Data for Songs Table
- Select columns for song ID, title, artist ID, year, and duration
- Use `df.values` to select just the values from the dataframe
- Index to select the first (only) record in the dataframe
- Convert the array to a list and set it to `song_data`

In [10]:
## Columns to extract: song_id, title, artist_id, year, duration
 
song_df = songdf[['song_id','title','artist_id','year','duration']]

song_data=song_df.values
song_data=song_data.tolist()
print(len(song_data))

74


#### Insert Record into Song Table
Implement the `song_table_insert` query in `sql_queries.py` and run the cell below to insert a record for this song into the `songs` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `songs` table in the sparkify database.

#### Runs Insert for all Song records

In [11]:
song_table_insert = ("INSERT INTO songs (song_id, title, artist_id, year, duration) \
                       VALUES (%s, %s, %s, %s, %s) ON CONFLICT DO NOTHING")
 
for songdata in song_data:
    cur.execute(song_table_insert,songdata)
    conn.commit()
cur.execute ("select count(*) from songs;")
print(cur.fetchone())
 

(71,)


Run `test.ipynb` to see if you've successfully added a record to this table.

## #2: `artists` Table
#### Extract Data for Artists Table
- Select columns for artist ID, name, location, latitude, and longitude
- Use `df.values` to select just the values from the dataframe
- Index to select the first (only) record in the dataframe
- Convert the array to a list and set it to `artist_data`

In [12]:
## Columns to extract: artist_id, name, location, latitude, longitude

artist_df = songdf[['artist_id','artist_name','artist_location','artist_latitude','artist_longitude']]

artist_data=artist_df.values.tolist()
print(len(artist_data))
artist_df.head()


74


Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,ARD7TVE1187B99BFB1,Casual,California - LA,0.0,0.0
0,ARNTLGG11E2835DDB9,Clp,,0.0,0.0
0,AR8ZCNI1187B9A069B,Planet P Project,,0.0,0.0
0,AR10USD1187B99F3F1,Tweeterfriendly Music,"Burlington, Ontario, Canada",0.0,0.0
0,ARMJAGH1187FB546F3,The Box Tops,"Memphis, TN",35.14968,-90.04892


#### Insert Record into Artist Table
Implement the `artist_table_insert` query in `sql_queries.py` and run the cell below to insert a record for this song's artist into the `artists` table.  

#### Runs Insert for All Artist Rows

In [13]:
artist_table_insert = ("INSERT INTO artists (artist_id, name, location, latitude, longitude) \
                       VALUES (%s, %s, %s, %s, %s) ON CONFLICT DO NOTHING")
 
for artistdata in artist_data:
    cur.execute(artist_table_insert,artistdata)
    conn.commit()
cur.execute ("select count(*) from artists;")
print(cur.fetchone())

(69,)


Run `test.ipynb` to see if you've successfully added a record to this table.

# Process `log_data`
In this part, you'll perform ETL on the second dataset, `log_data`, to create the `time` and `users` dimensional tables, as well as the `songplays` fact table.

Let's perform ETL on a single log file and load a single record into each table.
- Use the `get_files` function provided above to get a list of all log JSON files in `data/log_data`
- Select the first log file in this list
- Read the log file and view the data

In [14]:
log_files = get_files("data/log_data")
print(log_files[0])

/home/workspace/data/log_data/2018/11/2018-11-30-events.json


In [15]:
filepath = log_files[0]

In [16]:
logdf = pd.DataFrame()
for filepath in log_files:
    df = pd.read_json(filepath,lines=True)
    logdf = logdf.append(df)
print(len(logdf))
logdf.head(3)

8056


Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Stephen Lynch,Logged In,Jayden,M,0,Bell,182.85669,free,"Dallas-Fort Worth-Arlington, TX",PUT,NextSong,1540992000000.0,829,Jim Henson's Dead,200,1543537327796,Mozilla/5.0 (compatible; MSIE 10.0; Windows NT...,91
1,Manowar,Logged In,Jacob,M,0,Klein,247.562,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Shell Shock,200,1543540121796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
2,Morcheeba,Logged In,Jacob,M,1,Klein,257.41016,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Women Lose Weight (Feat: Slick Rick),200,1543540368796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73


In [17]:
## Replace any NaN with 0 for numeric 

logdf['userId'] = pd.to_numeric(logdf['userId']).fillna(0)
 

## #3: `time` Table
#### Extract Data for Time Table
- Filter records by `NextSong` action
- Convert the `ts` timestamp column to datetime
  

In [18]:

time_df = logdf[['ts']]
time_df.head()

Unnamed: 0,ts
0,1543537327796
1,1543540121796
2,1543540368796
3,1543540625796
4,1543540856796


In [19]:

t= pd.to_datetime(time_df['ts'], unit='ms')

In [20]:
#Time table colums: start_time, hour, day, week, month, year, weekday)")
time_data = pd.DataFrame(list(zip(t, t.dt.hour,t.dt.day,t.dt.weekofyear,t.dt.month,t.dt.year,t.dt.dayofweek)),
               columns =['start_time', 'hour','day','week','month','year','weekday'])

time_data2=time_data.values.tolist()
print(len(time_data2))


8056


#### Insert Records into Time Table
Implement the `time_table_insert` query in `sql_queries.py` and run the cell below to insert records for the timestamps in this log file into the `time` table.  

In [21]:
time_table_insert = ("INSERT INTO time (start_time, hour, day, week, month, year, weekday) \
                      VALUES(%s,%s,%s,%s,%s,%s,%s) ON CONFLICT DO NOTHING")

for timerecord in time_data2:
    cur.execute(time_table_insert,timerecord)
    conn.commit()
cur.execute ("select count(*) from time;")
print(cur.fetchone())
  


(8023,)


Run `test.ipynb` to see if you've successfully added records to this table.

## #4: `users` Table
#### Extract Data for Users Table
- Select columns for user ID, first name, last name, gender and level and set to `user_df`

In [22]:
## Columns to extract: user_id, first_name, last_name, gender, level 

user_df = logdf[['userId','firstName','lastName','gender','level']]

user_data=user_df.values.tolist()
print(len(user_data))
user_df.head()
 

8056


Unnamed: 0,userId,firstName,lastName,gender,level
0,91.0,Jayden,Bell,M,free
1,73.0,Jacob,Klein,M,paid
2,73.0,Jacob,Klein,M,paid
3,73.0,Jacob,Klein,M,paid
4,73.0,Jacob,Klein,M,paid


#### Insert Records into Users Table
Implement the `user_table_insert` query in `sql_queries.py` and run the cell below to insert records for the users in this log file into the `users` table.  

In [23]:
user_table_insert = ("INSERT INTO users (user_id, first_name, last_name, gender, level) \
                      VALUES(%s,%s,%s,%s,%s) ON CONFLICT DO NOTHING")

for userdata in user_data:
    cur.execute(user_table_insert,userdata)
    conn.commit()
cur.execute ("select count(*) from users;")
print(cur.fetchone())

(98,)


Run `test.ipynb` to see if you've successfully added records to this table.

## #5: `songplays` Table
#### Extract Data and Songplays Table
This one is a little more complicated since information from the songs table, artists table, and original log file are all needed for the `songplays` table. Since the log file does not specify an ID for either the song or the artist, you'll need to get the song ID and artist ID by querying the songs and artists tables to find matches based on song title, artist name, and song duration time.
- Implement the `song_select` query in `sql_queries.py` to find the song ID and artist ID based on the title, artist name, and duration of a song.
- Select the timestamp, user ID, level, song ID, artist ID, session ID, location, and user agent and set to `songplay_data`


#### Insert Records into Songplays Table
- Implement the `songplay_table_insert` query and run the cell below to insert records for the songplay actions in this log file into the `songplays` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `songplays` table in the sparkify database.

In [24]:

song_select = ("SELECT song_id, songs.artist_id \
               FROM songs JOIN artists on songs.artist_id = artists.artist_id \
               WHERE title = %s and duration = %s and name=%s")

songplay_table_insert = ("INSERT INTO songplays (start_time, user_id,\
                level, song_id, artist_id, session_id, location, user_agent) \
                VALUES(%s,%s,%s,%s,%s,%s,%s,%s)")

for index, row in logdf.iterrows():
    # get songid and artistid from song and artist tables
    cur.execute(song_select,[row.song, row.length, row.artist])
    results = cur.fetchone()
    
    if results:
        songid, artistid = results
    else:
        songid, artistid = None, None

    # insert songplay record
    t= pd.to_datetime(row.ts, unit='ms')
    songplay_data = (t, row.userId, row.level, songid, artistid, row.sessionId, row.location, row.userAgent)
    cur.execute(songplay_table_insert, songplay_data)
    conn.commit()

Run `test.ipynb` to see if you've successfully added records to this table.

# Close Connection to Sparkify Database

In [25]:
conn.close()

# Implement `etl.py`
Use what you've completed in this notebook to implement `etl.py`.