In [1]:
from dmp import PostgresConnector
import pandas as pd
import os
from tqdm import tqdm
from dmp.sql_queries import *
from collections import Counter
import datetime

### Helper Functions

I'm using these 2 helper functions to append all the jsons on a dataframe.

In [2]:
def load_data_from_json(df, path):
    return df.append(pd.read_json(path, lines=True))

def load_dataset(df, path):
    df = pd.DataFrame()
    curdir = os.getcwd()
    for(path, dirnames, filenames) in os.walk(path):
        filepaths = [filename for filename in filenames if ".json" in filename]
        if filepaths:
            for filepath in filepaths:
                # print(df.shape)
                df = load_data_from_json(df, path+"/"+filepath)
    return df

### ETL for the songs DataFrame

In [3]:
df_songs = pd.DataFrame()
df_songs = load_dataset(df_songs, "../data/song_data/")
df_songs.shape

(71, 10)

In [4]:
df_songs.head()

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,AR7G5I41187FB4CE6C,,,"London, England",Adam Ant,SONHOTT12A8C13493C,Something Girls,233.40363,1982
0,1,AR8ZCNI1187B9A069B,,,,Planet P Project,SOIAZJW12AB01853F1,Pink World,269.81832,1984
0,1,ARXR32B1187FB57099,,,,Gob,SOFSOCN12A8C143F5D,Face the Ashes,209.60608,2007
0,1,AR10USD1187B99F3F1,,,"Burlington, Ontario, Canada",Tweeterfriendly Music,SOHKNRJ12A6701D1F8,Drop of Rain,189.57016,0
0,1,ARGSJW91187B9B1D6B,35.21962,-80.01955,North Carolina,JennyAnyKind,SOQHXMF12AB0182363,Young Boy Blues,218.77506,0


As we can see, we have information about both the songs and the artists in this df. Let's work with that info to fill the psql database.

In [5]:
songs_data = df_songs[["song_id", "title", "artist_id", "year", "duration"]]
songs_data.reset_index(inplace=True, drop=True)
songs_data.head()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SONHOTT12A8C13493C,Something Girls,AR7G5I41187FB4CE6C,1982,233.40363
1,SOIAZJW12AB01853F1,Pink World,AR8ZCNI1187B9A069B,1984,269.81832
2,SOFSOCN12A8C143F5D,Face the Ashes,ARXR32B1187FB57099,2007,209.60608
3,SOHKNRJ12A6701D1F8,Drop of Rain,AR10USD1187B99F3F1,0,189.57016
4,SOQHXMF12AB0182363,Young Boy Blues,ARGSJW91187B9B1D6B,0,218.77506


In [6]:
artist_data = df_songs[["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]]
artist_data.reset_index(inplace=True, drop=True)
artist_data.drop_duplicates(subset="artist_id", inplace=True)
artist_data.head()

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  return func(*args, **kwargs)


Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,AR7G5I41187FB4CE6C,Adam Ant,"London, England",,
1,AR8ZCNI1187B9A069B,Planet P Project,,,
2,ARXR32B1187FB57099,Gob,,,
3,AR10USD1187B99F3F1,Tweeterfriendly Music,"Burlington, Ontario, Canada",,
4,ARGSJW91187B9B1D6B,JennyAnyKind,North Carolina,35.21962,-80.01955


In [7]:
artist_data.shape

(69, 5)

In [8]:
artist_data[artist_data["artist_id"] == "ARGSJW91187B9B1D6B"]

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
4,ARGSJW91187B9B1D6B,JennyAnyKind,North Carolina,35.21962,-80.01955


#### Populating song and artist table

In [9]:
pc = PostgresConnector(host="localhost", port=5432, user="aleon", password="psqludacity", dbname="sparkify")

In [10]:
pc.drop_tables([song_table_drop])
pc.create_tables([song_table_create])
pc.insert_data(song_table_insert, songs_data)

cannot drop table song because other objects depend on it
DETAIL:  constraint songplays_song_id_fkey on table songplays depends on table song
HINT:  Use DROP ... CASCADE to drop the dependent objects too.

Error when dropping table. Try again.
Successfully created table.


In [11]:
pc.drop_tables([artist_table_drop])
pc.create_tables([artist_table_create])
pc.insert_data(artist_table_insert, artist_data)

cannot drop table artist because other objects depend on it
DETAIL:  constraint song_artist_id_fkey on table song depends on table artist
constraint songplays_artist_id_fkey on table songplays depends on table artist
HINT:  Use DROP ... CASCADE to drop the dependent objects too.

Error when dropping table. Try again.
Successfully created table.
None
duplicate key value violates unique constraint "artist_pkey"
DETAIL:  Key (artist_id)=(AR7G5I41187FB4CE6C) already exists.

Error when inserting data. Rolling back.


### ETL for Logs DataFrame

In [12]:
df_logs = pd.DataFrame()
df_logs = load_dataset(df_logs, "../data/log_data/")
df_logs.shape

(8056, 18)

In [13]:
df_logs.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Frumpies,Logged In,Anabelle,F,0,Simpson,134.47791,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,Fuck Kitty,200,1541903636796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
1,Kenny G with Peabo Bryson,Logged In,Anabelle,F,1,Simpson,264.75057,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,By The Time This Night Is Over,200,1541903770796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
2,Biffy Clyro,Logged In,Anabelle,F,2,Simpson,189.83138,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,God & Satan,200,1541904034796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69
3,,Logged In,Lily,F,0,Burns,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1540621000000.0,456,,200,1541910841796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",32
4,HIM,Logged In,Lily,F,1,Burns,212.06159,free,"New York-Newark-Jersey City, NY-NJ-PA",PUT,NextSong,1540621000000.0,456,Beautiful,200,1541910973796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",32


In [14]:
df_logs["userId"] = df_logs["userId"].replace("", -1).astype(int)

#### Populating the rest of the tables

In [15]:
# Create the user table
user_data = df_logs[["userId", "firstName", "lastName", "gender", "level"]]
user_data["userId"] = pd.to_numeric(user_data["userId"], downcast="integer")
user_data.drop_duplicates("userId", inplace=True)
user_data.dropna(inplace=True, subset=["userId"])
user_data = user_data[user_data["userId"] != ""]
user_data.reset_index(inplace=True, drop=True)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._set_item(key, value)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  return func(*args, **kwargs)


In [16]:
pc.drop_tables([user_table_drop])
pc.create_tables([user_table_create])
pc.insert_data(user_table_insert, user_data)

cannot drop table users because other objects depend on it
DETAIL:  constraint songplays_user_id_fkey on table songplays depends on table users
HINT:  Use DROP ... CASCADE to drop the dependent objects too.

Error when dropping table. Try again.
Successfully created table.
None
duplicate key value violates unique constraint "users_pkey"
DETAIL:  Key (user_id)=(69) already exists.

Error when inserting data. Rolling back.


In [17]:
print(datetime.datetime.fromtimestamp(1541903636796/1000))

2018-11-10 20:33:56.796000


In [18]:
# Create the time table
time_data = df_logs[["ts"]]
time_data.drop_duplicates("ts", inplace=True)
time_data["start_time"] = time_data["ts"].astype(str)
time_data["ts"] = time_data["ts"].apply(lambda x: datetime.datetime.fromtimestamp(x/1000))
time_data["hour"] = pd.DatetimeIndex(time_data["ts"]).hour
time_data["day"] = pd.DatetimeIndex(time_data["ts"]).day
time_data["week"] = pd.DatetimeIndex(time_data["ts"]).weekofyear
time_data["month"] = pd.DatetimeIndex(time_data["ts"]).month
time_data["year"] = pd.DatetimeIndex(time_data["ts"]).year
time_data["weekday"] = pd.DatetimeIndex(time_data["ts"]).weekday
time_data.drop(columns="ts", inplace=True)
time_data.head()

  
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  errors=errors,


Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,1541903636796,20,10,45,11,2018,5
1,1541903770796,20,10,45,11,2018,5
2,1541904034796,20,10,45,11,2018,5
3,1541910841796,22,10,45,11,2018,5
4,1541910973796,22,10,45,11,2018,5


In [19]:
pc.drop_tables([time_table_drop])
pc.create_tables([time_table_create])
pc.insert_data(time_table_insert, time_data)

cannot drop table "time" because other objects depend on it
DETAIL:  constraint songplays_start_time_fkey on table songplays depends on table "time"
HINT:  Use DROP ... CASCADE to drop the dependent objects too.

Error when dropping table. Try again.
Successfully created table.
None
duplicate key value violates unique constraint "time_pkey"
DETAIL:  Key (start_time)=(1541903636796) already exists.

Error when inserting data. Rolling back.


### ETL for the songplays table

In [20]:
pc.drop_tables([songplay_table_drop])
pc.create_tables([songplay_table_create])

Successfully dropped table.
Successfully created table.


In [21]:
# Test the song_select query
pc.get_cur().execute(song_select, ("Young Boy Blues", "JennyAnyKind", 218.77506))
print(pc.get_cur().fetchall())

[('SOQHXMF12AB0182363', 'ARGSJW91187B9B1D6B')]


In [22]:
# Parse through the songs table and look for data on the logs.
def query_song(pc: PostgresConnector, query, data):
    try:
        pc.get_cur().execute(query, data)
        results = pc.get_cur().fetchone()
        if results:
            return results
        else:
            return None, None
    except psycopg2.Error as e:
        print(e) 

In [23]:
for index, row in tqdm(df_logs.iterrows(), total=df_logs.shape[0]):
    song_id, artist_id = query_song(pc, song_select, (row["song"], row["artist"], row["length"]))
    #print(song_id, artist_id)
    songplay_data = pd.DataFrame(columns=["start_time", "user_id", "level", "song_id", "artist_id", "session_id", 
                                        "location", "user_agent"])
    songplay_data = songplay_data.append({"start_time": row["ts"], "user_id": row["userId"], "level": row["level"], 
                          "song_id": song_id, "artist_id": artist_id, "session_id": row["sessionId"], 
                          "location": row["location"], "user_agent": row["userAgent"]}, ignore_index=True)
    pc.insert_data(songplay_table_insert, songplay_data)

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 8056/8056 [01:15<00:00, 107.26it/s]
