# ETL process
develop the etl process before create the pipline to ensure that all process working with no error

In [2]:
from Sql_Queries import * 
import psycopg2
import pandas as pd
import os

In [3]:
try:
    conn = psycopg2.connect('host= 127.0.0.1 dbname=sparkify user=postgres password=123')
    cur= conn.cursor()
except psycopg2.Error as e:
    print(e)
    print('can not connect to the sparkify database')

In [4]:
def get_json_file_paths(folder_path):
    """
    Recursively retrieves the absolute paths of all JSON files in the specified folder and its subfolders.
    Args:
        folder_path (str): Path to the root folder containing JSON files.
    Returns:
        list: A list of absolute file paths.
    """
    json_file_paths = []
    for root, _, files in os.walk(folder_path):
        for filename in files:
            if filename.endswith(".json"):
                json_file_paths.append(os.path.join(root, filename))
    return json_file_paths

## Song Dataset
The first dataset is a subset of real data from the Million Song Dataset. Each file is in JSON format and contains metadata about a song and the artist of that song. The files are partitioned by the first three letters of each song's track ID
### process song dataset
use Song data set to perform ETL process and load the data into the **artists** and **songs** dimintion tables

In [28]:
songs_paths = get_json_file_paths("./data/song_data")

In [30]:
song_df = pd.read_json(songs_paths[0], lines=True)
song_df

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,ARD7TVE1187B99BFB1,,,California - LA,Casual,SOMZWCG12A8C13C480,I Didn't Mean To,218.93179,0


### #1: Songs Table
|FROM|--->|TO|
|---|---|---|
|song_id|--->|song_id|
title|--->|title
artist_id|--->|artist_id 
year|--->|year
duration|--->|duration 

In [31]:
songs = song_df[['song_id', 'title', 'artist_id', 'year', 'duration']].values[0]
songs

array(['SOMZWCG12A8C13C480', "I Didn't Mean To", 'ARD7TVE1187B99BFB1', 0,
       218.93179], dtype=object)

#### insert into songs table 
using `songs_table_insert` in `SqL_Queires.py` file transfer the data in the songs table

In [32]:
cur.execute(songs_table_insert, songs)
conn.commit()

### #2: artists Table 
|FROM|--->|TO|
|---|---|---|
|artist_id|--->|artist_id|
artist_name|--->|name
artist_location|--->|location 
artist_latitude|--->|latitude
artist_longitude|--->|longitude 

In [39]:
artists= song_df[['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']].values[0]
artists

array(['ARD7TVE1187B99BFB1', 'Casual', 'California - LA', nan, nan],
      dtype=object)

#### insert into artists table 
using `artists_table_insert` in `SqL_Queires.py` file transfer the data in the songs table

In [40]:
cur.execute(artists_table_insert, artists)
conn.commit()

### process Log dataset
use Song data set to perform ETL process and load the data into the **users** and **time** dimintion tables

In [5]:
logs_paths = get_json_file_paths("./data/log_data/")

In [8]:
log_df = pd.read_json(logs_paths[0], lines=True)
log_df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged In,Walter,M,0,Frye,,free,"San Francisco-Oakland-Hayward, CA",GET,Home,1540919166796,38,,200,1541105830796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",39
1,,Logged In,Kaylee,F,0,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Home,1540344794796,139,,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
3,,Logged In,Kaylee,F,2,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Upgrade,1540344794796,139,,200,1541106132796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Flat 55,200,1541106352796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


### #3: users Table 
|FROM|--->|TO|
|---|---|---|
|userId|--->|user_id|
|firstName|--->|first_name|
|lastName|--->|last_name|
|gender|--->|gender|
|level|--->|level|

#### insert into users table 
using `users_table_insert` in `SqL_Queires.py` file transfer the data in the songs table

In [14]:
users = log_df[['userId','firstName','lastName','gender','level']]

In [17]:
for i, row in users.iterrows():
    cur.execute(users_table_insert, row)
    conn.commit()

  cur.execute(users_table_insert, row)


### #4: time Table 
|FROM|--->|TO|
|---|---|---|
|ts|--->|start_time|
|ts|--->|hour|
|ts|--->|day|
|ts|--->|week|
|ts|--->|month|
|ts|--->|year|
|ts|--->|weekday|

In [24]:
time = log_df['ts']
time.head()

0    1541105830796
1    1541106106796
2    1541106106796
3    1541106132796
4    1541106352796
Name: ts, dtype: int64

In [25]:
time = pd.to_datetime(time, unit='ms')
time_df = pd.DataFrame(columns = ['start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday'])

In [27]:
time_df['start_time'] = time
time_df['hour'] = time.dt.hour
time_df['day'] = time.dt.day
time_df['week'] = time.dt.isocalendar().week
time_df['month'] = time.dt.month
time_df['year'] = time.dt.year
time_df['weekday'] = time.dt.weekday
time_df.head()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-01 20:57:10.796,20,1,44,11,2018,3
1,2018-11-01 21:01:46.796,21,1,44,11,2018,3
2,2018-11-01 21:01:46.796,21,1,44,11,2018,3
3,2018-11-01 21:02:12.796,21,1,44,11,2018,3
4,2018-11-01 21:05:52.796,21,1,44,11,2018,3


#### insert into time table 
using `time_table_insert` in `SqL_Queires.py` file transfer the data in the songs table

In [29]:
for i, row in time_df.iterrows():
    cur.execute(time_table_insert, row)
    conn.commit()

  cur.execute(time_table_insert, row)


### #5: songplays Table 