In [1]:
import os
import glob
import uuid
import pandas as pd
from datetime import datetime
pd.options.mode.chained_assignment = None

In [2]:
def gather_all_data():
    """
    Description: This function gathers logs data from "data/log_data/2018/11/" 
    directory and songs metadata from multiple directories with the following
    pattern "data/song_data/**/**/**/*.json". Gathered data saved into the 
    respective DataFrames. Logs entries associated with "NextSong" are 
    selected for further transformations. 
    
    Arguments:
        None

    Returns:
        logs_songs: is the DataFrame obtained by left join on logs and songs.
        df_songs_lib: is the DataFrame that containes songs metadata.
    """
    
    json_pattern = os.path.join("data/log_data/2018/11/",'*.json')
    file_list = sorted(glob.glob(json_pattern))

    dfs = []
    for file in file_list:
        data = pd.read_json(file, lines=True)
        #match = re.search(r'\d{4}-\d{2}-\d{2}', file)
        #date = datetime.strptime(match.group(), '%Y-%m-%d').date()
        #data["date"] = date
        dfs.append(data)
    df_logs = pd.concat(dfs, ignore_index=True) # (8056, 18)
    songplays_raw = df_logs.loc[df_logs["page"] == "NextSong"].reset_index(drop=True) # (6820, 18)

    song_path = glob.glob("data/song_data/**/**/**/*.json")
    dfs_ = []

    for songs in song_path:
        sdata = pd.read_json(songs, lines=True)
        dfs_.append(sdata)
    df_songs_lib = pd.concat(dfs_, ignore_index=True) # (71, 10)

    logs_songs = pd.merge(songplays_raw, df_songs_lib, left_on=["artist","song"], right_on=["artist_name","title"], how='left') # (6820, 28)
    return logs_songs, df_songs_lib

In [3]:
def prepare_tables(logs_songs, df_songs_lib):
    # prepare the data for DB friendly readings
    """
    Description: This function further transforms logs and songs library DataFrames 
    to forme 5 DataFrames which are the prototypes of the future fact and dimension 
    tables for the database with a star schema.

    Arguments:
        logs_songs: is the DataFrame obtained by left join on logs and songs.
        df_songs_lib: is the DataFrame that containes songs metadata.

    Returns:
        songplays_df_: the DataFrame which is the prototype of a fact table
        users_df_: the DataFrame which is the prototype of a users dimension table
        songs_df_: the DataFrame which is the prototype of a songs dimension table
        artists_df_: the DataFrame which is the prototype of a artists dimension table
        time_df_: the DataFrame which is the prototype of a time dimension table
    """
    
    logs_songs["songplay_id"] = 1
    logs_songs["songplay_id"] = logs_songs.songplay_id.apply(lambda x: uuid.uuid4().hex)
    
    songplays_df = logs_songs[["songplay_id", "ts", "length", "userId", "level", "song_id", "artist_id", "sessionId", "location", "userAgent"]]
    songplays_df.columns = ["songplay_id", "start_time", "length_played", "user_id", "level", "song_id", "artist_id", "session_id", "location", "user_agent"]
    songplays_df_ = songplays_df.where(pd.notnull(songplays_df), None)

    users_df = logs_songs[["userId", "firstName", "lastName", "gender", "level"]].drop_duplicates().reset_index(drop=True) # (129, 5)
    users_df.columns = ["user_id", "first_name", "last_name", "gender", "level"]
    users_df["user_unique_id"] = 1
    users_df["user_unique_id"] = users_df.user_unique_id.apply(lambda x: uuid.uuid4().hex)
    users_df_ = users_df.where(pd.notnull(users_df), None)

    songs_df = df_songs_lib[["song_id", "title", "artist_id", "year", "duration"]].drop_duplicates().reset_index(drop=True) # (71, 5)
    songs_df_ = songs_df.where(pd.notnull(songs_df), None)
    
    artists_df = df_songs_lib[["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]].drop_duplicates().reset_index(drop=True) # (69, 5)
    artists_df.columns = ["artist_id", "name", "location", "latitude", "longitude"]
    artists_df_ = artists_df.where(pd.notnull(artists_df), None)

    time_df = songplays_df[["songplay_id", "start_time"]]
    time_df["hour"] = time_df.start_time.apply(lambda x: pd.Timestamp(x, unit='ms').hour)
    time_df["day"] = time_df.start_time.apply(lambda x: pd.Timestamp(x, unit='ms').day)
    time_df["week"] = time_df.start_time.apply(lambda x: pd.Timestamp(x, unit='ms').week)
    time_df["month"] = time_df.start_time.apply(lambda x: pd.Timestamp(x, unit='ms').month)
    time_df["year"] = time_df.start_time.apply(lambda x: pd.Timestamp(x, unit='ms').year)
    time_df["weekday"] = time_df.start_time.apply(lambda x: pd.Timestamp(x, unit='ms').weekday()) # Monday == 0 … Sunday == 6
    time_df_ = time_df.where(pd.notnull(time_df), None) # time_df.shape = (6820, 8)
    
    return songplays_df_, users_df_, songs_df_, artists_df_, time_df_

In [4]:
%%time
logs_songs, df_songs_lib = gather_all_data()
songplays_df, users_df, songs_df, artists_df, time_df = prepare_tables(logs_songs, df_songs_lib)

CPU times: user 1.51 s, sys: 76.7 ms, total: 1.59 s
Wall time: 1.61 s


In [5]:
from sqlalchemy import create_engine
from create_tables import create_database, drop_tables

cur, conn = create_database()
engine = create_engine('postgresql://student:student@127.0.0.1:5432/sparkifydb')
drop_tables(cur, conn)

# INSERT ALL RECORDS
songplays_df.to_sql('songplays', engine, index=False)
users_df.to_sql('users', engine, index=False)
songs_df.to_sql('songs', engine, index=False)
artists_df.to_sql('artists', engine, index=False)
time_df.to_sql('time', engine, index=False)

In [6]:
with engine.connect() as con:
    con.execute('ALTER TABLE songplays ADD PRIMARY KEY (songplay_id);')
    con.execute('ALTER TABLE users ADD PRIMARY KEY (user_unique_id);')
    con.execute('ALTER TABLE songs ADD PRIMARY KEY (song_id);')
    con.execute('ALTER TABLE artists ADD PRIMARY KEY (artist_id);')
    con.execute('ALTER TABLE time ADD PRIMARY KEY (songplay_id);')

![Entity Relation Diagram](ERD_sparkifydb.jpg)

In [7]:
engine.execute("SELECT songplay_id, user_id, level, songplays.song_id, title, artists.name FROM ((songplays JOIN songs ON songplays.song_id = songs.song_id) \
JOIN artists ON songs.artist_id = artists.artist_id) WHERE songplays.song_id IS NOT NULL").fetchall()

[('78a38e81749b4c3397ea9612e890c9ba', '15', 'paid', 'SOZCTXZ12AB0182364', 'Setanta matins', 'Elena')]

In [8]:
engine.execute("SELECT songplay_id, start_time, song_id FROM songplays LIMIT 10").fetchall()

[('5188e369b3284795a32645f4cb9f4b20', 1541106106796, None),
 ('c7a78f0aec904d6b90e614f72543d45e', 1541106352796, None),
 ('bc158dfa0da04c15a6a368e07fa4e408', 1541106496796, None),
 ('2e0f3483caec4a989effd268915119d7', 1541106673796, None),
 ('fbe67666aca34607bc46c9991114d8f1', 1541107053796, None),
 ('261be0f297354375a963bee96134a7fb', 1541107493796, None),
 ('efe5a5c3aaab4dad8835187b034a4689', 1541107734796, None),
 ('82722065a1884d8e839da716a901dbb2', 1541108520796, None),
 ('dc2d8fdb14664148bbf68f6a69cf1efe', 1541109125796, None),
 ('9f2b069023324629bd0f09297fb89fdc', 1541109325796, None)]

In [9]:
try: 
    cur.execute("SELECT songplay_id, start_time, song_id FROM songplays LIMIT 10")

except psycopg2.Error as e: 
    print("Error: select *")
    print (e)

row = cur.fetchone()
while row:
    print(row)
    row = cur.fetchone()

('5188e369b3284795a32645f4cb9f4b20', 1541106106796, None)
('c7a78f0aec904d6b90e614f72543d45e', 1541106352796, None)
('bc158dfa0da04c15a6a368e07fa4e408', 1541106496796, None)
('2e0f3483caec4a989effd268915119d7', 1541106673796, None)
('fbe67666aca34607bc46c9991114d8f1', 1541107053796, None)
('261be0f297354375a963bee96134a7fb', 1541107493796, None)
('efe5a5c3aaab4dad8835187b034a4689', 1541107734796, None)
('82722065a1884d8e839da716a901dbb2', 1541108520796, None)
('dc2d8fdb14664148bbf68f6a69cf1efe', 1541109125796, None)
('9f2b069023324629bd0f09297fb89fdc', 1541109325796, None)
