# ETL Processes
Use this notebook to develop the ETL process for each of your tables before completing the `etl.py` file to load the whole datasets.

#### Standard Libraries

In [1]:
import os
import glob
import psycopg2
import pandas as pd
import json
from io import StringIO
import logging
import datetime

#### Custom Libraries

In [2]:
from postgre import Postgre

#### Setting Up Logging

In [3]:
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

### Utility Functions
The functions below serve as utility throughout the code to perform simple operations

In [4]:
def get_json_data(filepath):
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for file in files :
            with open(os.path.abspath(file), 'r') as f:
                try:
                    yield json.load(f)
                except:
                    with open(os.path.abspath(file), 'r') as f:
                        for line in f.readlines():
                            try:
                                yield json.loads(line)
                            except:
                                yield None


In [5]:
def df_rows_to_list(df:pd.DataFrame, number_of_rows:int=None):
    list_of_row_values = []
    number_of_rows = number_of_rows if number_of_rows else df.shape[0]
    for index in range(0, number_of_rows):
        list_of_row_values.append(list(df.values[index]))
    
    return list_of_row_values

In [6]:
def list_to_string_io(list_of_entries:list):
    """
    Return file like object of the type StringIO from a given list of list of strings.
    Argument:
        - list_of_entries {list} - list of list of strings to transform to StringIO
    Example:
        [
             ['AR8IEZO1187B99055E', 'SOINLJW12A8C13314C', 'City Slickers', 2008, 149.86404],
             ['AR558FS1187FB45658', 'SOGDBUF12A8C140FAA', 'Intro', 2003, 75.67628]
        ]
        
    Return:
        {StringIO} - file type object with values in input list concatenated.
    
    Example:
        'AR8IEZO1187B99055E\tSOINLJW12A8C13314C\tCity Slickers\t2008\t149.86404\n
         AR558FS1187FB45658\tSOGDBUF12A8C140FAA\tIntro\t2003\t75.67628'
    """
    return StringIO('\n'.join(['\t'.join([str(entry) for entry in set_of_entries]) for set_of_entries in list_of_entries]))

## Establish Connection to Postgre and Create Sparkify Database

In [7]:
sparkifydb = 'sparkifydb'
Postgre('studentdb').create_database(sparkifydb)
postgre = Postgre(sparkifydb)

Connection Established to: studentdb
Database sparkifydb dropped: True
Database sparkifydb created: True
Connection Established to: sparkifydb


# Process `song_data`

In [8]:
songs_filepath = "./data/song_data"
songs_data =[json_data for json_data in get_json_data(songs_filepath) if json_data]
songs_df = pd.DataFrame.from_dict(songs_data)
songs_df.sample(5)

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
39,ARAJPHH1187FB5566A,40.7038,"Queens, NY",-73.83168,The Shangri-Las,164.80608,1,SOYTPEP12AB0180E7B,Twist and Shout,1964
54,ARL7K851187B99ACD2,,,,Andy Andy,226.35057,1,SOMUYGI12AB0188633,La Culpa,0
38,ARGIWFO1187B9B55B7,,,,Five Bolt Main,225.09669,1,SOPSWQW12A6D4F8781,Made Like This (Live),0
11,ARLTWXK1187FB5A3F8,32.74863,"Fort Worth, TX",-97.32925,King Curtis,326.00771,1,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),0
31,AR7ZKHQ1187B98DD73,,,,Glad,270.602,1,SOTUKVB12AB0181477,Blessed Assurance,1993


### Establish Tables Info

In [34]:
songs_table_name = "songs"
artists_table_name = "artists"
time_table_name = "time"
users_table_name = "users"
songsplay_table_name = 'songplays'
table_info ={
    songs_table_name: {'artist_id': 'CHAR(18) NOT NULL',
                       'song_id': 'CHAR(18) NOT NULL',
                       'title': 'VARCHAR(100)',
                       'year': 'INTEGER',
                       'duration':'REAL'
                      },
    artists_table_name: {'artist_id': 'CHAR(18) NOT NULL',
                         'artist_name': 'VARCHAR(100)',
                         'artist_location': 'VARCHAR(100)',
                         'artist_latitude': 'REAL',
                         'artist_longitude':'REAL'
                      },
    time_table_name: {'day': 'INTEGER',
                      'hour': 'INTEGER',
                      'month': 'INTEGER',
                      'timestamp': 'TIMESTAMP(6)',
                      'week': 'INTEGER',
                      'weekday': 'INTEGER',
                      'year': 'INTEGER',       
                    },
    users_table_name: {'userId': 'INTEGER',
                      'firstName': 'VARCHAR(55)',
                      'lastName': 'VARCHAR(55)',
                      'gender': 'CHAR(1)',
                      'level': 'CHAR(10)'   
                        },
    songsplay_table_name:{
                "ts":"BIGINT",
                 "userId":"INTEGER",
                 "level":"CHAR(10)",
                 "song_id":"CHAR(18)",
                 "artist_id":"CHAR(18)",
                 "sessionId":"INTEGER",
                 "location":"VARCHAR(255)",
                 "userAgent":"VARCHAR(255)",
        }
    }

## #1: `songs` Table
#### Extract Data for Songs Table
- Select columns for song ID, title, artist ID, year, and duration
- Use `df.values` to select just the values from the dataframe
- Index to select the first (only) record in the dataframe
- Convert the array to a list and set it to `song_data`

#### Insert Record into Song Table
Implement the `song_table_insert` query in `sql_queries.py` and run the cell below to insert a record for this song into the `songs` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `songs` table in the sparkify database

#### Select Single Row from DF and Transform it to String IO

In [10]:
column_names = list(table_info.get(songs_table_name).keys())
song_selected_df = songs_df[column_names]
song_data = df_rows_to_list(song_selected_df, number_of_rows=1)
file_object = list_to_string_io(song_data)
song_data

[['AR8IEZO1187B99055E',
  'SOINLJW12A8C13314C',
  'City Slickers',
  2008,
  149.86404]]

#### Create Song Table and Insert Single Record

In [11]:
postgre.create_table(table_name=songs_table_name, columns_dict=table_info.get(songs_table_name))
postgre.copy_to_table(file_object, table_name=songs_table_name, columns=column_names)

Table songs dropped: True
Table songs created: True
Inserted 1 Records to Table songs


#### Verify Record got Inserted

In [12]:
results = postgre._execute_query("select * from songs;", results=True)
results

[{'artist_id': 'AR8IEZO1187B99055E',
  'song_id': 'SOINLJW12A8C13314C',
  'title': 'City Slickers',
  'year': 2008,
  'duration': 149.864}]

## #2: `artists` Table
#### Extract Data for Artists Table
- Select columns for artist ID, name, location, latitude, and longitude
- Use `df.values` to select just the values from the dataframe
- Index to select the first (only) record in the dataframe
- Convert the array to a list and set it to `artist_data`

#### Insert Record into Artist Table
Implement the `artist_table_insert` query in `sql_queries.py` and run the cell below to insert a record for this song's artist into the `artists` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `artists` table in the sparkify database.

In [13]:
column_names = list(table_info.get(artists_table_name).keys())
artist_df = songs_df[column_names]
artist_data = df_rows_to_list(artist_df, number_of_rows=1)
file_object = list_to_string_io(artist_data)
artist_data

[['AR8IEZO1187B99055E', 'Marc Shaiman', '', nan, nan]]

In [14]:
postgre.create_table(table_name=artists_table_name, columns_dict=table_info.get(artists_table_name))
postgre.copy_to_table(file_object, table_name=artists_table_name, columns=column_names)

Table artists dropped: True
Table artists created: True
Inserted 1 Records to Table artists


In [15]:
results = postgre._execute_query("select * from artists;", results=True)
results

[{'artist_id': 'AR8IEZO1187B99055E',
  'artist_name': 'Marc Shaiman',
  'artist_location': None,
  'artist_latitude': nan,
  'artist_longitude': nan}]

# Process `log_data`
In this part, you'll perform ETL on the second dataset, `log_data`, to create the `time` and `users` dimensional tables, as well as the `songplays` fact table.

Let's perform ETL on a single log file and load a single record into each table.
- Use the `get_files` function provided above to get a list of all log JSON files in `data/log_data`
- Select the first log file in this list
- Read the log file and view the data

In [16]:
logs_filepath = "./data/log_data"
logs_data =[json_data for json_data in get_json_data(logs_filepath) if json_data]
logs_df = pd.DataFrame.from_dict(logs_data)
logs_df.sample(5)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
1340,,Logged In,Jordan,F,0,Rodriguez,,free,"Los Angeles-Long Beach-Anaheim, CA",GET,Home,1540993000000.0,902,,200,1543584463796,Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) G...,68
590,Pixies,Logged In,Matthew,M,74,Jones,229.3024,paid,"Janesville-Beloit, WI",PUT,NextSong,1541063000000.0,957,Where Is My Mind?,200,1543350181796,"""Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537....",36
182,The Strokes,Logged In,Chloe,F,53,Cuevas,155.34975,paid,"San Francisco-Oakland-Hayward, CA",PUT,NextSong,1540941000000.0,1041,Is This It,200,1543511966796,Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20...,49
5530,,Logged In,Jacob,M,36,Klein,,paid,"Tampa-St. Petersburg-Clearwater, FL",GET,Home,1540558000000.0,294,,200,1542048506796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
1501,Evanescence,Logged In,Rylan,M,45,George,220.3424,paid,"Birmingham-Hoover, AL",PUT,NextSong,1541020000000.0,1076,Hello,200,1543600424796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",16


## #3: `time` Table
#### Extract Data for Time Table
- Filter records by `NextSong` action
- Convert the `ts` timestamp column to datetime
  - Hint: the current timestamp is in milliseconds
- Extract the timestamp, hour, day, week of year, month, year, and weekday from the `ts` column and set `time_data` to a list containing these values in order
  - Hint: use pandas' [`dt` attribute](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.dt.html) to access easily datetimelike properties.
- Specify labels for these columns and set to `column_labels`
- Create a dataframe, `time_df,` containing the time data for this file by combining `column_labels` and `time_data` into a dictionary and converting this into a dataframe

In [17]:
time_df = logs_df[logs_df['page'] == 'NextSong'].reset_index(drop=True)
time_df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Sydney Youngblood,Logged In,Jacob,M,53,Klein,238.07955,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,954,Ain't No Sunshine,200,1543449657796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
1,Gang Starr,Logged In,Layla,F,88,Griffin,151.92771,paid,"Lake Havasu City-Kingman, AZ",PUT,NextSong,1541057000000.0,984,My Advice 2 You (Explicit),200,1543449690796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",24
2,3OH!3,Logged In,Layla,F,89,Griffin,192.522,paid,"Lake Havasu City-Kingman, AZ",PUT,NextSong,1541057000000.0,984,My First Kiss (Feat. Ke$ha) [Album Version],200,1543449841796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",24
3,RÃÂ¶yksopp,Logged In,Jacob,M,54,Klein,369.81506,paid,"Tampa-St. Petersburg-Clearwater, FL",PUT,NextSong,1540558000000.0,954,The Girl and The Robot,200,1543449895796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",73
4,Kajagoogoo,Logged In,Layla,F,90,Griffin,223.55546,paid,"Lake Havasu City-Kingman, AZ",PUT,NextSong,1541057000000.0,984,Too Shy,200,1543450033796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",24


In [18]:
def convert_timestamp(timestamp_miliseconds_list:list):
    series = []
    for record in timestamp_miliseconds_list:
         series.append(datetime.datetime.fromtimestamp(record/1000.0))
    return series

time_df = time_df.apply(lambda x: convert_timestamp(x) if x.name == 'ts' else x)

In [19]:
time_data = (time_df.ts, time_df.ts.dt.hour, time_df.ts.dt.day, time_df.ts.dt.week, time_df.ts.dt.month, time_df.ts.dt.year, time_df.ts.dt.weekday)
column_labels = ('timestamp', 'hour', 'day', 'week', 'month', 'year', 'weekday')

In [20]:
time_dict = []
for index in range(0, len(time_data[0])):
    time_dict.append({key:value for key, value in zip(column_labels, (entry[index] for entry in time_data))})
time_df = pd.DataFrame(time_dict)
time_df.head()

Unnamed: 0,day,hour,month,timestamp,week,weekday,year
0,29,0,11,2018-11-29 00:00:57.796,48,3,2018
1,29,0,11,2018-11-29 00:01:30.796,48,3,2018
2,29,0,11,2018-11-29 00:04:01.796,48,3,2018
3,29,0,11,2018-11-29 00:04:55.796,48,3,2018
4,29,0,11,2018-11-29 00:07:13.796,48,3,2018


In [21]:
column_names = list(table_info.get(time_table_name).keys())
file_object = StringIO()
time_df.to_csv(file_object, columns=column_names, sep='\t', header=False, index=False)
file_object.seek(0)

0

In [22]:
postgre.create_table(table_name=time_table_name, columns_dict=table_info.get(time_table_name))
postgre.copy_to_table(file_object, table_name=time_table_name, columns=column_names)
results = postgre._execute_query("select * from time;", results=True)
results[0]

Table time dropped: True
Table time created: True
Inserted 6820 Records to Table time


{'day': 29,
 'hour': 0,
 'month': 11,
 'timestamp': datetime.datetime(2018, 11, 29, 0, 0, 57, 796000),
 'week': 48,
 'weekday': 3,
 'year': 2018}

## #4: `users` Table
#### Extract Data for Users Table
- Select columns for user ID, first name, last name, gender and level and set to `user_df`

#### Insert Records into Users Table
Implement the `user_table_insert` query in `sql_queries.py` and run the cell below to insert records for the users in this log file into the `users` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `users` table in the sparkify database.

In [23]:
column_names = list(table_info.get(users_table_name).keys())
user_df = logs_df[column_names]
file_object = StringIO()
user_df.to_csv(file_object, columns=column_names, sep='\t', header=False, index=False)
file_object.seek(0)

0

In [24]:
postgre.create_table(table_name=users_table_name, columns_dict=table_info.get(users_table_name))
postgre.copy_to_table(file_object, table_name=users_table_name, columns=column_names)
results = postgre._execute_query("select * from users;", results=True)
results[0]

Table users dropped: True
Table users created: True
Inserted 8056 Records to Table users


{'userid': 73,
 'firstname': 'Jacob',
 'lastname': 'Klein',
 'gender': 'M',
 'level': 'paid      '}

#### Insert full data to tables

In [25]:
tables_dataframes ={
    songs_table_name: song_selected_df,
    artists_table_name: artist_df,
    time_table_name: time_df,
    users_table_name: user_df
    }
for table_name, table_df in tables_dataframes.items():
    postgre.create_table(table_name=table_name, columns_dict=table_info.get(table_name))
    postgre.copy_dataframe_to_table(table_df, table_name)

Table songs dropped: True
Table songs created: True
Inserted 72 Records to Table songs
Table artists dropped: True
Table artists created: True
Inserted 72 Records to Table artists
Table time dropped: True
Table time created: True
Inserted 6820 Records to Table time
Table users dropped: True
Table users created: True
Inserted 8056 Records to Table users


## #5: `songplays` Table
#### Extract Data and Songplays Table
This one is a little more complicated since information from the songs table, artists table, and original log file are all needed for the `songplays` table. Since the log file does not specify an ID for either the song or the artist, you'll need to get the song ID and artist ID by querying the songs and artists tables to find matches based on song title, artist name, and song duration time.
- Implement the `song_select` query in `sql_queries.py` to find the song ID and artist ID based on the title, artist name, and duration of a song.
- Select the timestamp, user ID, level, song ID, artist ID, session ID, location, and user agent and set to `songplay_data`

#### Insert Records into Songplays Table
- Implement the `songplay_table_insert` query and run the cell below to insert records for the songplay actions in this log file into the `songplays` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `songplays` table in the sparkify database.

In [28]:
def get_song_artist_id(postgre:Postgre, song_title:str, artist_name, song_duration:int):
    results = [None, None]
    if all((song_title, artist_name, song_duration)):
        song_select ="""SELECT DISTINCT a.song_id, a.artist_id
                    FROM {SONGS_TABLE_NAME} a
                    FULL JOIN {ARTISTS_TABLE_NAME} b ON a.artist_id = b.artist_id
                    WHERE a.title='{SONG_TITLE}'
                    -- AND a.duration={SONG_DURATION}
                    -- AND b.artist_name='{ARTIST_NAME}'
                    """

        results = postgre._execute_query(song_select.format(
                                                    SONGS_TABLE_NAME=songs_table_name,
                                                    ARTISTS_TABLE_NAME=artists_table_name,
                                                    SONG_TITLE=song_title.replace("'","''"),
                                                    SONG_DURATION=song_duration,
                                                    ARTIST_NAME=artist_name.replace("'","''") ), 
                                         results=True)
        if results:
            results = list(results[0].values())

    return results

In [29]:
logs_df[['song_id', 'artist_id']] = logs_df.apply(lambda row: pd.Series(get_song_artist_id(postgre, row['song'], row['artist'], row['length'])), axis=1)

In [35]:
column_names = list(table_info.get(songsplay_table_name).keys())
songplay_df = logs_df[column_names]
postgre.create_table(table_name=songsplay_table_name, columns_dict=table_info.get(songsplay_table_name))
postgre.copy_dataframe_to_table(songplay_df, songsplay_table_name)

Table songplays dropped: True
Table songplays created: True
Inserted 8056 Records to Table songplays


# Close Connection to Sparkify Database

In [37]:
postgre.conn.close()

# Implement `etl.py`
Use what you've completed in this notebook to implement `etl.py`.

In [None]:
from etl import main
main()