In [36]:
import os
import glob
import pandas as pd
import psycopg2
import configparser
from sql_queries import *


In [37]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

In [38]:
def read_config():
    # Create a ConfigParser object
    config = configparser.ConfigParser()
 
    # Read the configuration file
    config.read('config.ini')
 
    # Access values from the configuration file
    db_host = config.get('Database', 'hostname')
    db_default = config.get('Database', 'default_database')
    db_name = config.get('Database', 'database')
    db_username = config.get('Database', 'username')
    db_pwd = config.get('Database', 'pwd')
    db_port = config.get('Database', 'port_id')
 
    # Return a dictionary with the retrieved values
    config_values = {
        'db_name': db_name,
        'db_default': db_default,
        'db_host': db_host,
        'db_username': db_username,
        'db_pwd': db_pwd,
        'db_port': db_port
    }
 
    return config_values

In [40]:
#connect to sparkify database
config_data = read_config()
conn = psycopg2.connect(
    host= config_data['db_host'],
    dbname = config_data['db_name'],
    user = config_data['db_username'],
    password = config_data['db_pwd'],
    port = config_data['db_port']
)
cur = conn.cursor()

# Process `song_data`
In the initial phase of the ETL process, the `song_data` dataset is utilized to populate the `songs` and `artists` dimensional tables.

In [41]:
song_files = get_files('data/song_data')

In [42]:
song_files[0]

'/home/div/Data-Engineering-projects/postgres_ETL/data/song_data/A/B/B/TRABBBV128F42967D7.json'

In [60]:
df = pd.read_json(song_files[0], lines=True)
df.head()

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,AR7SMBG1187B9B9066,,,,Los Manolos,SOBCOSW12A8C13D398,Rumba De Barcelona,218.38322,0


# 1. `songs` Table

Extract the necessary data for the `songs` table by selecting the columns `song_id`, `title`, `artist_id`, `year`, and `duration`. Use `df.values` to retrieve only the values from these columns in the DataFrame.

In [61]:
song_data = list(df[['song_id', 'title', 'artist_id', 'year', 'duration']].values[0])
song_data

['SOBCOSW12A8C13D398',
 'Rumba De Barcelona',
 'AR7SMBG1187B9B9066',
 0,
 218.38322]

In [62]:
songs_df = df[['song_id', 'title', 'artist_id', 'year', 'duration']]
songs_df.head()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOBCOSW12A8C13D398,Rumba De Barcelona,AR7SMBG1187B9B9066,0,218.38322


## Insert Record into Song Table

Implement the `song_table_insert` query in the `sql_queries.py` file and use it to insert the record into the `songs` table.

In [63]:
cur.execute(song_table_insert, song_data)
conn.commit()

# 2. `artists` Table

Extract the data required for the `artists` table by selecting the columns `artist_id`, `artist_name`, and `artist_location`. Use `df.values` to obtain only the values from these specified columns in the DataFrame. Additionally, select only the first record from the DataFrame by indexing appropriately.

In [47]:
artist_data = list(df[['artist_id', 'artist_name', 'artist_location',
                       'artist_latitude', 'artist_longitude']].values[0])
artist_data

['AR7SMBG1187B9B9066', 'Los Manolos', '', nan, nan]

In [48]:
artists_df = df[['artist_id', 'artist_name', 'artist_location',
                       'artist_latitude', 'artist_longitude']]
artists_df.head()

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,AR7SMBG1187B9B9066,Los Manolos,,,


## Insert Record into Artist Table

Implement the `artist_table_insert` query in the `sql_queries.py` file and use it to insert the record into the `artists` table.

In [64]:
cur.execute(artist_table_insert, artist_data)
conn.commit()

# Process `log_data`

In this section, the ETL process is applied to the second dataset, `log_data`, to populate the `time` and `users` dimensional tables, as well as the `songplays` fact table.

The ETL is executed on a single log file, and a single record is inserted into each table. Utilize the `get_files` function detailed earlier to retrieve a list of all JSON log files from the `data/log_data` directory.

In [65]:
log_files = get_files('data/log_data')

In [66]:
log_files[0]

'/home/div/Data-Engineering-projects/postgres_ETL/data/log_data/2018/11/2018-11-23-events.json'

In [67]:
df = pd.read_json(log_files[0], lines=True)
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Great Lake Swimmers,Logged In,Kevin,M,0,Arellano,215.11791,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540007000000.0,815,Your Rocky Spine,200,1542931645796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66
1,Soziedad Alkoholika,Logged In,Kevin,M,1,Arellano,204.7473,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540007000000.0,815,Va Bien,200,1542931860796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66
2,Franz Ferdinand,Logged In,Kevin,M,2,Arellano,172.01587,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540007000000.0,815,Eleanor Put Your Boots On,200,1542932064796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66
3,Modest Mouse,Logged In,Kevin,M,3,Arellano,209.52771,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540007000000.0,815,Float On,200,1542932236796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66
4,Adam Lambert,Logged In,Kevin,M,4,Arellano,266.44853,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540007000000.0,815,Aftermath,200,1542932445796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66


# 3. `time` table

To prepare data for the `time` table, begin by filtering the records to include only those with the `NextSong` action. Then, convert the `ts` column, which contains timestamps, to datetime format. From this datetime-converted `ts` column, extract the following attributes: `timestamp`, `hour`, `day`, `week`, `month`, `year`, and `weekday`. These attributes will be used to populate the `time` table.

In [68]:
df = df[df['page'] == 'NextSong']
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Great Lake Swimmers,Logged In,Kevin,M,0,Arellano,215.11791,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540007000000.0,815,Your Rocky Spine,200,1542931645796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66
1,Soziedad Alkoholika,Logged In,Kevin,M,1,Arellano,204.7473,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540007000000.0,815,Va Bien,200,1542931860796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66
2,Franz Ferdinand,Logged In,Kevin,M,2,Arellano,172.01587,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540007000000.0,815,Eleanor Put Your Boots On,200,1542932064796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66
3,Modest Mouse,Logged In,Kevin,M,3,Arellano,209.52771,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540007000000.0,815,Float On,200,1542932236796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66
4,Adam Lambert,Logged In,Kevin,M,4,Arellano,266.44853,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540007000000.0,815,Aftermath,200,1542932445796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66


In [69]:
t = pd.to_datetime(df['ts'])
t.head()

0   1970-01-01 00:25:42.931645796
1   1970-01-01 00:25:42.931860796
2   1970-01-01 00:25:42.932064796
3   1970-01-01 00:25:42.932236796
4   1970-01-01 00:25:42.932445796
Name: ts, dtype: datetime64[ns]

In [70]:
time_data = [(tt.value, tt.hour, tt.day, tt.week, tt.month, tt.year, tt.weekday()) for tt in t]
column_labels = ('timestamp', 'hour', 'day', 'week', 'month', 'year', 'weekday')

In [71]:
time_df = pd.DataFrame(data=time_data, columns=column_labels)
time_df.head()

Unnamed: 0,timestamp,hour,day,week,month,year,weekday
0,1542931645796,0,1,1,1,1970,3
1,1542931860796,0,1,1,1,1970,3
2,1542932064796,0,1,1,1,1970,3
3,1542932236796,0,1,1,1,1970,3
4,1542932445796,0,1,1,1,1970,3


## Insert Records into Time Table

Implement the `time_table_insert` query in 

In [72]:
for i, row in time_df.iterrows():
    cur.execute(time_table_insert, list(row))
    conn.commit()

# 4. `users` table

To compile data for the `users` table, select the following columns from the dataset: `userId`, `firstName`, `lastName`, `gender`, and `level`. These selected attributes will be used to populate the `users` table.

In [56]:
user_df = df[['userId', 'firstName', 'lastName', 'gender', 'level']]
user_df.head()

Unnamed: 0,userId,firstName,lastName,gender,level
0,66,Kevin,Arellano,M,free
1,66,Kevin,Arellano,M,free
2,66,Kevin,Arellano,M,free
3,66,Kevin,Arellano,M,free
4,66,Kevin,Arellano,M,free


## Insert Records into Users Table

Implemetn the `user_table_insert` query

In [73]:
for i, row in user_df.iterrows():
    cur.execute(user_table_insert, row)
    conn.commit()

  cur.execute(user_table_insert, row)


# 5. `songsplay` Table

The construction of the `songplays` table is complex as it requires integrating data from the `songs`, `artists`, and original log files. The absence of direct song and artist IDs in the log files necessitates querying the `songs` and `artists` tables to match song title, artist name, and duration for the correct IDs.

`song_select` query is used to retrieve the necessary song and artist IDs. Subsequent details such as timestamp, user ID, level, session ID, location, and user agent are then compiled into songplay_data for populating the songplays table, ensuring each record is detailed and accurately linked to its song and artist.

In [74]:
df = df.merge(songs_df, how='left', left_on=['song', 'artist', 'length'], right_on=['title', 'artist_id', 'duration'])

In [75]:
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,...,song,status,ts,userAgent,userId,song_id,title,artist_id,year,duration
0,Great Lake Swimmers,Logged In,Kevin,M,0,Arellano,215.11791,free,"Harrisburg-Carlisle, PA",PUT,...,Your Rocky Spine,200,1542931645796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66,,,,,
1,Soziedad Alkoholika,Logged In,Kevin,M,1,Arellano,204.7473,free,"Harrisburg-Carlisle, PA",PUT,...,Va Bien,200,1542931860796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66,,,,,
2,Franz Ferdinand,Logged In,Kevin,M,2,Arellano,172.01587,free,"Harrisburg-Carlisle, PA",PUT,...,Eleanor Put Your Boots On,200,1542932064796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66,,,,,
3,Modest Mouse,Logged In,Kevin,M,3,Arellano,209.52771,free,"Harrisburg-Carlisle, PA",PUT,...,Float On,200,1542932236796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66,,,,,
4,Adam Lambert,Logged In,Kevin,M,4,Arellano,266.44853,free,"Harrisburg-Carlisle, PA",PUT,...,Aftermath,200,1542932445796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",66,,,,,


## Creating songplays DataFrame

In [76]:
# Including all relevant fields and possibly adding a 'songplay_id'
df['songplay_id'] = range(1, len(df) + 1)  # Generating songplay IDs
songplays_df = df[['songplay_id', 'ts', 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent']].copy()

# Rename columns to match the expected format for songplays table
songplays_df.rename(columns={
    'ts': 'start_time',
    'userId': 'user_id',
    'sessionId': 'session_id',
    'userAgent': 'user_agent'
}, inplace=True)

# Convert timestamp to datetime
songplays_df.loc[:, 'start_time'] = pd.to_datetime(songplays_df['start_time'], unit='ms')

# Show the head of the DataFrame to verify
songplays_df.head()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,1,2018-11-23 00:07:25.796,66,free,,,815,"Harrisburg-Carlisle, PA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
1,2,2018-11-23 00:11:00.796,66,free,,,815,"Harrisburg-Carlisle, PA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
2,3,2018-11-23 00:14:24.796,66,free,,,815,"Harrisburg-Carlisle, PA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
3,4,2018-11-23 00:17:16.796,66,free,,,815,"Harrisburg-Carlisle, PA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."
4,5,2018-11-23 00:20:45.796,66,free,,,815,"Harrisburg-Carlisle, PA","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4..."


## Insert Records into Songplays Table

Implement the `songplay_table_insert` query

In [77]:
for index, row in df.iterrows():

    # get songid and artistid from song and artist tables
    cur.execute(song_select, (row.song, row.artist, row.length))
    results = cur.fetchone()
    
    if results:
        songid, artistid = results
    else:
        songid, artistid = None, None

    # insert songplay record
    songplay_data = (index, row['ts'], row['userId'], row['level'], songid, artistid, row['sessionId'],
                     row['location'], row['userAgent'])
    cur.execute(songplay_table_insert, songplay_data)
    conn.commit()

# Close Connection to Sparkify Database

In [78]:
conn.close()