# ETL Processes
We'll use this notebook to develop the ETL process for each of the database tables before completing the `etl.py` file to load the whole datasets.

In [1]:
import os
import glob
import psycopg2
import json
import pandas as pd
from sql_queries import *

In [2]:
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")
cur = conn.cursor()

In [3]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

# Process `song_data`
In this first part wel'll perform ETL on the first dataset, `song_data`, to create the `songs` and `artists` dimensional tables.


- The `get_files` function defined above is used to get a list of all song JSON files in `data/song_data`.
- A new function is defined to loop over all the json song files and combine their contents in a new json fie in the project's root directory.
- The data is filtered to create new `song_data` and `artist_data` variables.
- The filtered data is saved to csv files in the project's root directory.
- the `song_data` and `artist_data` are inserted in bulk into the `songs` table and `artists` table respectively using the `COPY` command with pre-defined bulk insert queries from the `sql_queries.py` file which is much faster compared to using single insert queries in a `for` loop. 

In [4]:
song_files = get_files('data/song_data')

In [5]:
def merge_json_files(files):
    result = list()
    fname = 'all_songs.json'
    for f in files:
        with open(f, 'r') as infile:
            result.append(json.load(infile))

    with open(fname, 'w') as output_file:
        json.dump(result, output_file, indent=0)
        return fname


json_file = merge_json_files(song_files)

In [6]:
df = pd.read_json(json_file, lines=False)
df.head()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARD7TVE1187B99BFB1,,California - LA,,Casual,218.93179,1,SOMZWCG12A8C13C480,I Didn't Mean To,0
1,ARNTLGG11E2835DDB9,,,,Clp,266.39628,1,SOUDSGM12AC9618304,Insatiable (Instrumental Version),0
2,AR8ZCNI1187B9A069B,,,,Planet P Project,269.81832,1,SOIAZJW12AB01853F1,Pink World,1984
3,AR10USD1187B99F3F1,,"Burlington, Ontario, Canada",,Tweeterfriendly Music,189.57016,1,SOHKNRJ12A6701D1F8,Drop of Rain,0
4,ARMJAGH1187FB546F3,35.14968,"Memphis, TN",-90.04892,The Box Tops,148.03546,1,SOCIWDW12A8C13D406,Soul Deep,1969


## #1: `songs` Table
#### Extracting Data for Songs Table
- we'll extract columns for song ID, title, artist ID, year, and duration.


- The data will be checked for duplicate values in the song Id column and the duplicates will be dropped.


- The data will be saved to a `csv` file in the project's root directory.


- The data will be inserted in bulk to the songs table from the `csv` file using the `song_table_bulk_insert` query
  that is imported from the `sql_queries.py` file.

In [7]:
song_data = df[['song_id', 'title', 'artist_id', 'year', 'duration']]
song_data.head()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOMZWCG12A8C13C480,I Didn't Mean To,ARD7TVE1187B99BFB1,0,218.93179
1,SOUDSGM12AC9618304,Insatiable (Instrumental Version),ARNTLGG11E2835DDB9,0,266.39628
2,SOIAZJW12AB01853F1,Pink World,AR8ZCNI1187B9A069B,1984,269.81832
3,SOHKNRJ12A6701D1F8,Drop of Rain,AR10USD1187B99F3F1,0,189.57016
4,SOCIWDW12A8C13D406,Soul Deep,ARMJAGH1187FB546F3,1969,148.03546


Now we will check the the `song_id` column for duplicate values and drop them if they exist becuase inserting data in bulk using the `COPY` command while much faster and more efficient does not currently (at the time of doing this project) support combining it with the `ON CONFLICT` statement outside "AnalyticDB"..., we could get around that by using a temporary unconstrained table (by unique primary key or a desired constraint in general) to get the data from the csv and then insert it to the main table we intend to keep but even then the `ON CONFLICT` statement currently allows only one duplicate for a given row in this context.

so for this table... since the data is not big we'll clean the data using python before inserting, but if the data was much bigger and was collected so that it woudn't have more than one duplicate per row... then using the solution mentioned above and letting sql handle it would be more efficient since sql handles data processing better than python..., also if this was used in a production pipeline (depending on the data size and database hosting service capacity and availability) doing so could take some load of the main server/service hosting the back-end and let sql handle it more efficiently.

In [8]:
# Checking duplicate values for the song_id column
song_data.song_id.duplicated().value_counts()

False    71
True      3
Name: song_id, dtype: int64

In [9]:
# Dropping duplicates while keeping the last value and hence the last update to info assotiated with the song id
song_data.drop_duplicates(subset='song_id', keep='last', inplace=True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


In [10]:
# Confirming duplicates drop
song_data.song_id.duplicated().value_counts()

False    71
Name: song_id, dtype: int64

#### Inserting Records into Song Table
- we'll save the cleaned data into a `csv` file
- We'll implement `song_table_bulk_insert` query from `sql_queries.py` to copy the data from the `csv` file and insert it in bulk which is much faster and more efficient than using a `for` loop to loop over the rows doing single inserts

In [11]:
song_fname = 'songs.csv'

# Saving the song data to a csv file
song_data.to_csv(song_fname, index=False) # Removing the extra csv index between to match the columns for bulk data insert

# Getting the current working directory path
working_dir = os.getcwd()

song_data_path = f"{working_dir}/{song_fname}"

In [12]:
# Copying the data from the csv file to the database table
with open(song_data_path, 'r') as f:
    cur.copy_expert(sql=song_table_bulk_insert, file=f)
    conn.commit()

Run `test.ipynb` to see if you've successfully added a record to this table.

## #2: `artists` Table
#### Extracting Data for Artists Table

The artist data will be extracted, checked, prepared and then saved into a csv file in the same manner used before with the song table

In [13]:
artist_data = df[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']]
artist_data.head()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,ARD7TVE1187B99BFB1,Casual,California - LA,,
1,ARNTLGG11E2835DDB9,Clp,,,
2,AR8ZCNI1187B9A069B,Planet P Project,,,
3,AR10USD1187B99F3F1,Tweeterfriendly Music,"Burlington, Ontario, Canada",,
4,ARMJAGH1187FB546F3,The Box Tops,"Memphis, TN",35.14968,-90.04892


In [14]:
# Cheking for duplicate artist id values
artist_data.artist_id.duplicated().value_counts()

False    69
True      5
Name: artist_id, dtype: int64

In [15]:
# Dropping duplicate values
artist_data.drop_duplicates(subset='artist_id', keep='last', inplace=True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


In [16]:
# Confirming duplicates drop
artist_data.artist_id.duplicated().value_counts()

False    69
Name: artist_id, dtype: int64

#### Insert Record into Artist Table
Implement the `artist_table_insert` query in `sql_queries.py` and run the cell below to insert a record for this song's artist into the `artists` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `artists` table in the sparkify database.

In [17]:
artist_fname = 'artists.csv'

# Saving artist data to a csv file
artist_data.to_csv(artist_fname, index=False) # Removing the extra csv index between to match the columns for bulk data insert

# Getting the current working directory path
working_dir = os.getcwd()

artist_data_path = f"{working_dir}/{artist_fname}"

In [18]:
# Copying the data from the csv to the database table
with open(artist_data_path, 'r') as f:
    cur.copy_expert(sql=artist_table_bulk_insert, file=f)
    conn.commit()

# Process `log_data`
In this part, we'll perform ETL on the second dataset, `log_data`, to create the `time` and `users` dimensional tables, as well as the `songplays` fact table.

- The `get_files` function is used to get all the log data file paths.

- A function is defined to loop over all log files extracting their data and appending it to a dataframe.

- We'll extract the time data and user data, check it, clean it, pre-process it and then save it to a csv file and bulk insert each file to it's respective database table.

In [19]:
# Getting the log data file paths
log_files = get_files('data/log_data')

In [20]:
# looping over the files and importing the data from all files in a dataframe
def merge_LogFiles(file_names):
    df_temp = None
    for file in file_names:
        if df_temp is not None:
            df_temp = df_temp.append(pd.read_json(file, lines=True), ignore_index=True)
        elif df_temp is None:
            df_temp = pd.read_json(file, lines=True)
    return df_temp

df = merge_LogFiles(log_files)

In [21]:
# Cheking the amount of data / dataframe dimensions
df.shape

(8056, 18)

## #3: `time` Table
#### Extracting Data for Time Table

- we'll filter records by `NextSong` action.
- We'll convert the `ts` timestamp column to datetime and extract useful datetime data from it using the pandas `dt` attribute.
    - We'll extract the timestamp, hour, day, week of year, month, year, and weekday from the `ts` column and set `time_data` to a dataframe containing these
      values in order.     
- We'll specify labels for these columns and set to `column_labels`.
- We'll create a dataframe, `time_df,` containing the time data for this file by combining `column_labels` and `time_data` into a dictionary and converting this
  into a dataframe.

In [22]:
df = df[df.page == 'NextSong']
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Stephen Lynch,Logged In,Jayden,M,0,Bell,182.85669,free,"Dallas-Fort Worth-Arlington, TX",PUT,NextSong,1540992000000.0,829,Jim Henson's Dead,200,1543537327796,Mozilla/5.0 (compatible; MSIE 10.0; Windows NT...,91
1,Manowar,Logged In,Jacob,M,0,Klein,247.562,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Shell Shock,200,1543540121796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
2,Morcheeba,Logged In,Jacob,M,1,Klein,257.41016,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Women Lose Weight (Feat: Slick Rick),200,1543540368796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
3,Maroon 5,Logged In,Jacob,M,2,Klein,231.23546,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Won't Go Home Without You,200,1543540625796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
4,Train,Logged In,Jacob,M,3,Klein,216.76363,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Hey_ Soul Sister,200,1543540856796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73


In [23]:
t = pd.to_datetime(df['ts'] , unit='ms')
t.head()

0   2018-11-30 00:22:07.796
1   2018-11-30 01:08:41.796
2   2018-11-30 01:12:48.796
3   2018-11-30 01:17:05.796
4   2018-11-30 01:20:56.796
Name: ts, dtype: datetime64[ns]

In [24]:
time_data = (t, t.dt.hour, t.dt.day, t.dt.week , t.dt.month, t.dt.year, t.dt.weekday)
column_labels = ('timestamp', 'hour', 'day', 'week of year', 'month', 'year', 'weekday')


In [25]:
time_df = pd.DataFrame.from_dict(dict(zip(column_labels,time_data)))
time_df.head()

Unnamed: 0,timestamp,hour,day,week of year,month,year,weekday
0,2018-11-30 00:22:07.796,0,30,48,11,2018,4
1,2018-11-30 01:08:41.796,1,30,48,11,2018,4
2,2018-11-30 01:12:48.796,1,30,48,11,2018,4
3,2018-11-30 01:17:05.796,1,30,48,11,2018,4
4,2018-11-30 01:20:56.796,1,30,48,11,2018,4


#### Insert Records into Time Table

- We'll save the time data into a `csv` file.

- We'll implement the `time_table_bulk_insert` query from `sql_queries.py` and insert the data in bulk from the `csv` file into the database table using the `copy` command along with the technique mentioned earlier (creating a temporary unconstrained table and inserting the csv data into it then copying it to the main time table to be able to handle rare timestamp conflicts) becuase the combined amount of data is bigger and it would be more efficient to copy it in bulk and let sql do the processing that inserting single lines by using a `for` loop or letting python do process the data for occasional duplicates.

In [26]:
time_fname = 'time.csv'

# Saving time data into a csv file
time_df.to_csv(time_fname, index=False) # Removing the extra csv index between to match the columns for bulk data insert

# Getting the current working directory path
working_dir = os.getcwd()

time_data_path = f"{working_dir}/{time_fname}"

In [27]:
# Copying the time data from the csv file into the database table
with open(time_data_path, 'r') as f:
    cur.copy_expert(sql=time_table_bulk_insert, file=f)
    conn.commit()

Run `test.ipynb` to see if you've successfully added records to this table.

## #4: `users` Table
#### Extracting Data for Users Table
We'll extract user data, check it, clean it, pre-process it and then save it to a csv file and insert it in bulk to the database table in the same manner as before

In [28]:
# Extracting user data
user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']]
# Cheking data dimensions and properties
print(user_df.shape, user_df.dtypes)

(6820, 5) userId       object
firstName    object
lastName     object
gender       object
level        object
dtype: object


We'll transform the user_id data type from object to int to avoid any problems with duplicate value evaluations (missing some duplicates). 

In [29]:
# Transorming the "userId" column data type from object to int
user_df.userId = pd.to_numeric(user_df.userId)

# Dropping dublicate values
user_df.drop_duplicates(subset='userId', inplace=True)

# Confirming duplicate values drop
print(user_df.shape)
print(user_df.userId.duplicated().value_counts())

user_df.head(10)

(96, 5)
False    96
Name: userId, dtype: int64


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  self[name] = value
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """


Unnamed: 0,userId,firstName,lastName,gender,level
0,91,Jayden,Bell,M,free
1,73,Jacob,Klein,M,paid
23,86,Aiden,Hess,M,free
30,24,Layla,Griffin,F,paid
40,26,Ryan,Smith,M,free
59,49,Chloe,Cuevas,F,paid
62,57,Katherine,Gay,F,free
69,30,Avery,Watkins,F,paid
79,92,Ryann,Smith,F,free
125,74,Braden,Parker,M,free


#### Insert Records into Users Table
We'll mplement the `user_table_bulk_insert` query from `sql_queries.py` to insert the data in bulk from the `csv` file into the database table.

In [30]:
user_fname = 'users.csv'

# Saving the data into a csv file 
user_df.to_csv(user_fname, index=False) # Removing the extra csv index between to match the columns for bulk data insert

# Getting the current working directory path
working_dir = os.getcwd()

user_data_path = f"{working_dir}/{user_fname}"

In [31]:
# Copying the user data from the csv file into the database table
with open(user_data_path, 'r') as f:
    cur.copy_expert(sql=user_table_bulk_insert, file=f)
    conn.commit()

## #5: `songplays` Table
#### Extracting Data and Songplays Table

- We'll extract the songplay table data from the log data minus the song id and artist id and save it into a csv file.

- then we'll copy data and insert it in bulk from the csv file into a temporary table called records **implementing the `songplay_table_bulk_insert` query from `sql_querie.py`**.

#### Insert Records into Songplays Table
We'll select the full songplay data (including song id and artist id) from the left outer join (to keep all records even in the absence of song id and artist id) of the records table and songs table on matching song names **implementing the `songplay_table_bulk_insert` query from `sql_querie.py`**.

In [32]:
# Cheking for null values
df.song.isnull().value_counts()

False    6820
Name: song, dtype: int64

In [33]:
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Stephen Lynch,Logged In,Jayden,M,0,Bell,182.85669,free,"Dallas-Fort Worth-Arlington, TX",PUT,NextSong,1540992000000.0,829,Jim Henson's Dead,200,1543537327796,Mozilla/5.0 (compatible; MSIE 10.0; Windows NT...,91
1,Manowar,Logged In,Jacob,M,0,Klein,247.562,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Shell Shock,200,1543540121796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
2,Morcheeba,Logged In,Jacob,M,1,Klein,257.41016,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Women Lose Weight (Feat: Slick Rick),200,1543540368796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
3,Maroon 5,Logged In,Jacob,M,2,Klein,231.23546,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Won't Go Home Without You,200,1543540625796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
4,Train,Logged In,Jacob,M,3,Klein,216.76363,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,1049,Hey_ Soul Sister,200,1543540856796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73


In [34]:
# Extracting songplay data
records_df = df[['ts', 'userId', 'level', 'sessionId','location' ,'userAgent', 'song']]
records_df['ts'] = pd.to_datetime(records_df['ts'] , unit='ms')
records_df.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  This is separate from the ipykernel package so we can avoid doing imports until


Unnamed: 0,ts,userId,level,sessionId,location,userAgent,song
0,2018-11-30 00:22:07.796,91,free,829,"Dallas-Fort Worth-Arlington, TX",Mozilla/5.0 (compatible; MSIE 10.0; Windows NT...,Jim Henson's Dead
1,2018-11-30 01:08:41.796,73,paid,1049,"Tampa-St. Petersburg-Clearwater, FL","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",Shell Shock
2,2018-11-30 01:12:48.796,73,paid,1049,"Tampa-St. Petersburg-Clearwater, FL","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",Women Lose Weight (Feat: Slick Rick)
3,2018-11-30 01:17:05.796,73,paid,1049,"Tampa-St. Petersburg-Clearwater, FL","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",Won't Go Home Without You
4,2018-11-30 01:20:56.796,73,paid,1049,"Tampa-St. Petersburg-Clearwater, FL","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",Hey_ Soul Sister


In [35]:
records_fname = 'records.csv'

# Saving the data into a csv file
records_df.to_csv(records_fname, index=False) # Removing the extra csv index between to match the columns for bulk data insert

# Getting the current working directory path
working_dir = os.getcwd()

records_data_path = f"{working_dir}/{records_fname}"

In [36]:
# Copying the songplay data from the csv file into the database table
with open(records_data_path, 'r') as f:
    cur.copy_expert(sql=songplay_table_bulk_insert, file=f)
    conn.commit()

# Close Connection to Sparkify Database

In [37]:
conn.close()

# Implement `etl.py`
Use what you've completed in this notebook to implement `etl.py`.