# etl.py test notebook
The goal of this notebook is to test the functions from `etl.py` independently

In [4]:
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *
from etl import process_song_file,process_data

## 1. Testing `process_song_file`

In [5]:
conn = psycopg2.connect(
        "host=127.0.0.1 dbname=sparkifydb user=student password=student"
    )
cur = conn.cursor()

In [6]:
filepath='data/song_data'

In [7]:
all_files = []
for root, dirs, files in os.walk(filepath):
    files = glob.glob(os.path.join(root, '*.json'))
    for f in files:
        all_files.append(os.path.abspath(f))

In [8]:
all_files[:3]

['/home/gabriel/Documents/Repos/udacity_data_engineering_project1/data/song_data/A/A/A/TRAAAVG12903CFA543.json',
 '/home/gabriel/Documents/Repos/udacity_data_engineering_project1/data/song_data/A/A/A/TRAAABD128F429CF47.json',
 '/home/gabriel/Documents/Repos/udacity_data_engineering_project1/data/song_data/A/A/A/TRAAAAW128F429D538.json']

In [9]:
df = pd.read_json(all_files[0], lines=True)

# insert song record
song_data = (
    df.loc[0, ['song_id', 'title', 'artist_id', 'year', 'duration']]
    .astype(str)
    .tolist()
)
cur.execute(song_table_insert, song_data)

# insert artist record
artist_data = (
    df.loc[
        0,
        [
            'artist_id',
            'artist_name',
            'artist_location',
            'artist_latitude',
            'artist_longitude',
        ],
    ].values.tolist()
)

In [10]:
process_song_file(cur, all_files[0])

['SOUDSGM12AC9618304', 'Insatiable (Instrumental Version)', 'ARNTLGG11E2835DDB9', 0, 266.39628]


ProgrammingError: can't adapt type 'numpy.int64'

In [None]:
process_data(cur, conn, filepath='data/song_data', func=process_song_file)

## 2. Testing `create_database`

In [None]:
# connect to default database    
conn = psycopg2.connect(
    "host=127.0.0.1 dbname=studentdb user=student password=student"
)
conn.set_session(autocommit=True)
cur = conn.cursor()

In [None]:
# create sparkify database with UTF8 encoding
cur.execute("DROP DATABASE IF EXISTS sparkifydb")

## 3. Testing `process_log_file`

In [None]:
filepath = 'data/log_data'

In [None]:
all_files = []
for root, dirs, files in os.walk(filepath):
    files = glob.glob(os.path.join(root, '*.json'))
    for f in files:
        all_files.append(os.path.abspath(f))

In [None]:
all_files[:2]

In [None]:
df = pd.read_json(all_files[0], lines=True)
df.head(2)

In [None]:
# filter by NextSong action
df_log_time = df[df['page'] == 'NextSong']

# convert timestamp column to datetime
df_log_time['ts_dt'] = pd.to_datetime(df_log_time.ts, unit='ms')

In [1]:
df_log_time.loc[
        0].values.tolist()

NameError: name 'df_log_time' is not defined

In [2]:
df_log_time.head()

NameError: name 'df_log_time' is not defined

In [27]:
song_select = """
SELECT art.name,
       son.*
  FROM artists AS art
  JOIN songs as son
    ON art.artist_id = son.artist_id
 WHERE 1 = 1
   AND son.title = %s
   AND art.name = %s
   AND son.duration = %s
"""

In [19]:
row = df_log_time.iloc[0]
row

artist                                             matchbox twenty
auth                                                     Logged In
firstName                                                   Jayden
gender                                                           F
itemInSession                                                    0
lastName                                                     Duffy
length                                                     177.658
level                                                         free
location                               Seattle-Tacoma-Bellevue, WA
method                                                         PUT
page                                                      NextSong
registration                                           1.54015e+12
sessionId                                                      846
song                                            Argue (LP Version)
status                                                        

In [25]:
song_select.format(row.song, row.artist, str(row.length))

'\nSELECT art.name,\n       son.*\n  FROM artists AS art\n  JOIN songs as son\n    ON art.artist_id = son.artist_id\n WHERE 1 = 1\n   AND son.title = %s\n   AND art.name = %s\n   AND son.duration = %s\n'

In [24]:
row.song, row.artist, str(row.length)

('Argue (LP Version)', 'matchbox twenty', '177.65832')

In [23]:
cur.execute(song_select, (row.song, row.artist, row.length))
results = cur.fetchone()

ProgrammingError: relation "artists" does not exist
LINE 4:   FROM artists AS art
               ^


In [None]:
row.song, row.artist, row.length

In [25]:
def process_log_file(cur, filepath) -> None:
    """Extract and load data for song and artist from log files.

    Parameters
    ----------
    cur: psycopg2.connect
        Psycopg2 database cursor for inserting data.

    filepath: str
        Path for log file.

    """
    # open log file
    df = pd.read_json(filepath, lines=True)

    # filter by NextSong action
    df_log_time = df[df['page'] == 'NextSong']

    # convert timestamp column to datetime
    df_log_time['ts_dt'] = pd.to_datetime(df_log_time.ts, unit='ms')

    # insert time data records
    list_time_elements = ["hour",
                     "day",
                     "week",
                     "month",
                     "year",
                     "weekday"]

    for e in list_time_elements:
        df_log_time['start_time'] = df_log_time['ts_dt']
        df_log_time[e] = getattr(df_log_time['ts_dt'].dt, e)

    column_labels = ['start_time'] + list_time_elements

    time_df = df_log_time[column_labels]
    for i, row in time_df.iterrows():
        cur.execute(time_table_insert, list(row))

    # load user table
    user_df =  df[df['page'] == 'NextSong'][
    ['userId', 'firstName', 'lastName', 'gender', 'level']]

    # insert user records
    for i, row in user_df.iterrows():
        cur.execute(user_table_insert, row)

    # insert songplay records
    for index, row in df.iterrows():
        
        # get songid and artistid from song and artist tables
        cur.execute(song_select, (row.song, row.artist, row.length))
        results = cur.fetchone()
        
        if results:
            songid, artistid = results
        else:
            songid, artistid = None, None

        # insert songplay record
        songplay_data = (songid, artistid)
        cur.execute(songplay_table_insert, songplay_data)

In [26]:
process_log_file(cur, all_files[0])

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy


IndexError: tuple index out of range

In [None]:
    for i, row in time_df.iterrows():
        cur.execute(time_table_insert, list(row))

    # load user table
    user_df =  df_log[df_log['page'] == 'NextSong'][
    ['userId', 'firstName', 'lastName', 'gender', 'level']]

    # insert user records
    for i, row in user_df.iterrows():
        cur.execute(user_table_insert, row)

    # insert songplay records
    for index, row in df.iterrows():
        
        # get songid and artistid from song and artist tables
        cur.execute(song_select, (row.song, row.artist, row.length))
        results = cur.fetchone()
        
        if results:
            songid, artistid = results
        else:
            songid, artistid = None, None

        # insert songplay record
        songplay_data = (songid, artistid)
        cur.execute(songplay_table_insert, songplay_data)