# ETL Processes
Use this notebook to develop the ETL process for each of your tables before completing the `etl.py` file to load the whole datasets.

In [1]:
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *

In [2]:
%run create_tables.py

Table dropped successfully!!
Table created successfully!!


In [3]:
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=postgres password=")
cur = conn.cursor()

In [4]:
def get_files(filepath):
    # 这将初始化一个名为 `all_files` 的空列表。
    # 该列表将用于存储找到的所有 JSON 文件的路径。
    all_files = []
    # os.walk()` 函数通过自上而下或自下而上的方式生成目录树中的文件名。
    # 对于目录树中根植于 `filepath` 的每个目录，
    # 它都会生成一个 3 元组 `(dirpath, dirnames, filenames)`。
    for root, dirs, files in os.walk(filepath):
        # 这一行使用 `glob` 模块搜索当前目录（`root`）中扩展名为 `.json` 的所有文件。
        # `os.path.join(root,'*.json')` 通过将当前目录与模式 `'*.json'` 连接起来来构建路径。
        files = glob.glob(os.path.join(root,'*.json'))
        # 嵌套的 `for` 循环（`for f in files:`）遍历在当前目录中找到的每个 JSON 文件。
        for f in files :
            # 使用 `os.path.abspath(f)` 确定每个文件 `f` 的绝对路径，然后将其追加到 `all_files` 列表中。
            all_files.append(os.path.abspath(f))
    # 探查所有目录和子目录后，函数会返回包含所有找到的 JSON 文件绝对路径的`all_files`列表。
    return all_files

# Process `song_data`
In this first part, you'll perform ETL on the first dataset, `song_data`, to create the `songs` and `artists` dimensional tables.

Let's perform ETL on a single song file and load a single record into each table to start.
- Use the `get_files` function provided above to get a list of all song JSON files in `data/song_data`
- Select the first song in this list
- Read the song file and view the data

In [98]:
song_files = 

In [6]:
filepath = song_files[0]
filepath

'C:\\Users\\27117\\Road_To_Data_Engineering\\Data_Modeling_with_Postgres\\data\\song_data\\A\\A\\A\\TRAAAAW128F429D538.json'

In [7]:
df = pd.read_json(filepath, lines=True)
df.head()

Unnamed: 0,num_songs,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,song_id,title,duration,year
0,1,ARD7TVE1187B99BFB1,,,California - LA,Casual,SOMZWCG12A8C13C480,I Didn't Mean To,218.93179,0


## #1: Songs Table
#### Extract Data for Songs Table
- Select columns for song ID, title, artist ID, year, and duration
- Use `df.values` to select just the values from the dataframe
- Index to select the first (only) record in the dataframe
- Convert the array to a list and set it to `song_data`

In [8]:
# 选择歌曲 ID、标题、艺术家 ID、年份和持续时间列
selected_columns = df[['song_id', 'title', 'artist_id', 'year', 'duration']] 
# 使用 df.values 只选择数据帧中的值。(values from the dataframe)
record = selected_columns.values[0]
# 将数组转换为列表，并将其设置为 song_data
song_data = list(record)

In [9]:
song_data

['SOMZWCG12A8C13C480', "I Didn't Mean To", 'ARD7TVE1187B99BFB1', 0, 218.93179]

#### Insert Record into Song Table
Implement the `song_table_insert` query in `sql_queries.py` and run the cell below to insert a record for this song into the `songs` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `songs` table in the sparkify database.

Here you need to be careful to insert the Artist table first, otherwise the foreign key constraint will be violated

In [23]:
cur.execute(song_table_insert, song_data)
conn.commit()

Run `test.ipynb` to see if you've successfully added a record to this table.

## #2: `artists` Table
#### Extract Data for Artists Table
- Select columns for artist ID, name, location, latitude, and longitude
- Use `df.values` to select just the values from the dataframe
- Index to select the first (only) record in the dataframe
- Convert the array to a list and set it to `artist_data`

In [19]:
# 选择歌曲 ID、标题、艺术家 ID、年份和持续时间列
selected_columns = df[['artist_id','artist_name','artist_location','artist_latitude','artist_longitude']] 
# 使用 df.values 只选择数据帧中的值。(values from the dataframe)
record = selected_columns.values[0]
# 将数组转换为列表，并将其设置为 song_data
artist_data = list(record) 
artist_data

['ARD7TVE1187B99BFB1', 'Casual', 'California - LA', nan, nan]

#### Insert Record into Artist Table
Implement the `artist_table_insert` query in `sql_queries.py` and run the cell below to insert a record for this song's artist into the `artists` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `artists` table in the sparkify database.

In [22]:
try:
    cur.execute(artist_table_insert, artist_data)
    conn.commit()
except Exception as e:
    print(e)
    conn.rollback()

Run `test.ipynb` to see if you've successfully added a record to this table.

# Process `log_data`
In this part, you'll perform ETL on the second dataset, `log_data`, to create the `time` and `users` dimensional tables, as well as the `songplays` fact table.

Let's perform ETL on a single log file and load a single record into each table.
- Use the `get_files` function provided above to get a list of all log JSON files in `data/log_data`
- Select the first log file in this list
- Read the log file and view the data

In [90]:
log_files = get_files(r"C:\Users\27117\Road_To_Data_Engineering\Data_Modeling_with_Postgres\data\log_data")

In [91]:
filepath = log_files[0]

In [92]:
df = pd.read_json(filepath, lines=True)
df.head(3)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,Logged In,Walter,M,0,Frye,,free,"San Francisco-Oakland-Hayward, CA",GET,Home,1540919166796,38,,200,1541105830796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",39
1,,Logged In,Kaylee,F,0,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Home,1540344794796,139,,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


## #3: `time` Table
#### Extract Data for Time Table
- Filter records by `NextSong` action
- Convert the `ts` timestamp column to datetime
  - Hint: the current timestamp is in milliseconds
- Extract the timestamp, hour, day, week of year, month, year, and weekday from the `ts` column and set `time_data` to a list containing these values in order
  - Hint: use pandas' [`dt` attribute](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.dt.html) to access easily datetimelike properties.
- Specify labels for these columns and set to `column_labels`
- Create a dataframe, `time_df,` containing the time data for this file by combining `column_labels` and `time_data` into a dictionary and converting this into a dataframe

In [33]:
# Filter records by NextSong action
df = df[df['page'] == 'NextSong']
df.head(3)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,You Gotta Be,200,1541106106796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Flat 55,200,1541106352796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8
5,Tamba Trio,Logged In,Kaylee,F,4,Summers,177.18812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540344794796,139,Quem Quiser Encontrar O Amor,200,1541106496796,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",8


In [38]:
# Convert the ts timestamp column to datetime
# Note: The current timestamp is in milliseconds, so you'll need to divide by 1000
t = pd.to_datetime(df['ts'], unit='ms')
t.head()

0   2018-11-01 20:57:10.796
1   2018-11-01 21:01:46.796
2   2018-11-01 21:01:46.796
3   2018-11-01 21:02:12.796
4   2018-11-01 21:05:52.796
Name: ts, dtype: datetime64[ns]

In [65]:
# Extract the timestamp, hour, day, week of year, month, year, and weekday from the ts column
time_data = (t, t.dt.hour, t.dt.day, t.dt.isocalendar().week, t.dt.month, t.dt.year, t.dt.weekday)
column_labels = ('start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday')

In [66]:
# Create a dataframe, time_df, containing the time data for this file
time_df = pd.DataFrame(dict(zip(column_labels, time_data)))
time_df.head()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-01 20:57:10.796,20,1,44,11,2018,3
1,2018-11-01 21:01:46.796,21,1,44,11,2018,3
2,2018-11-01 21:01:46.796,21,1,44,11,2018,3
3,2018-11-01 21:02:12.796,21,1,44,11,2018,3
4,2018-11-01 21:05:52.796,21,1,44,11,2018,3


#### Insert Records into Time Table
Implement the `time_table_insert` query in `sql_queries.py` and run the cell below to insert records for the timestamps in this log file into the `time` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `time` table in the sparkify database.

In [67]:
for i, row in time_df.iterrows():
    cur.execute(time_table_insert, list(row))
    conn.commit()

Run `test.ipynb` to see if you've successfully added records to this table.

## #4: `users` Table
#### Extract Data for Users Table
- Select columns for user ID, first name, last name, gender and level and set to `user_df`

In [69]:
print(df.columns)

Index(['artist', 'auth', 'firstName', 'gender', 'itemInSession', 'lastName',
       'length', 'level', 'location', 'method', 'page', 'registration',
       'sessionId', 'song', 'status', 'ts', 'userAgent', 'userId'],
      dtype='object')


In [83]:
selected_columns = df[['userId','firstName','lastName','gender','level']] 
record = selected_columns.values[0]
user_df = pd.DataFrame([record], columns=['user_id', 'first_name', 'last_name', 'gender', 'level'])
user_df

Unnamed: 0,user_id,first_name,last_name,gender,level
0,39,Walter,Frye,M,free


#### Insert Records into Users Table
Implement the `user_table_insert` query in `sql_queries.py` and run the cell below to insert records for the users in this log file into the `users` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `users` table in the sparkify database.

In [84]:
for i, row in user_df.iterrows():
    cur.execute(user_table_insert, row)
    conn.commit()

Run `test.ipynb` to see if you've successfully added records to this table.

## #5: `songplays` Table
#### Extract Data and Songplays Table
This one is a little more complicated since information from the songs table, artists table, and original log file are all needed for the `songplays` table. Since the log file does not specify an ID for either the song or the artist, you'll need to get the song ID and artist ID by querying the songs and artists tables to find matches based on song title, artist name, and song duration time.
- Implement the `song_select` query in `sql_queries.py` to find the song ID and artist ID based on the title, artist name, and duration of a song.
- Select the timestamp, user ID, level, song ID, artist ID, session ID, location, and user agent and set to `songplay_data`

#### Insert Records into Songplays Table
- Implement the `songplay_table_insert` query and run the cell below to insert records for the songplay actions in this log file into the `songplays` table. Remember to run `create_tables.py` before running the cell below to ensure you've created/resetted the `songplays` table in the sparkify database.

### 一些准备工作
- 因为前面做的只是一条数据的示例。现在尝试一次性把所有的相关数据插入PostgreSQL数据库中

In [103]:
def process_song_file(cur, filepath):
    """
    Process songs files and insert records into the Postgres database.
    :param cur: cursor reference 这是 PostgreSQL 数据库的游标引用，允许函数执行 SQL 命令。
    :param filepath: complete file path for the file to load 这是需要处理的歌曲文件的路径。
    """
    # 这个名为 "process_song_file "的函数用于处理单个歌曲文件，并将相关数据插入 PostgreSQL 数据库。下面是该函数的具体操作步骤：
    
    # 此行将 JSON 格式的歌曲文件读入 pandas DataFrame。文件以系列（`typ='series'`）的形式读取，因为它预计只包含一条记录。然后，该系列被包裹在一个列表中，以将其转换为 DataFrame。
    # open song file
    df = pd.DataFrame([pd.read_json(filepath, typ='series', convert_dates=False)])
    
    # 循环遍历 DataFrame 中的记录。由于 DataFrame 预计只包含一行（来自一个歌曲文件），因此该循环将运行一次。
    for value in df.values:
        # "开头的行从 DataFrame 行中提取单个数据点，并将它们赋值给相应的变量。
        num_songs, artist_id, artist_latitude, artist_longitude, artist_location, artist_name, song_id, title, duration, year = value

        # insert artist record
        # 将艺术家数据结构化为元组，然后使用 `artist_table_insert` SQL 命令将其插入数据库。
        artist_data = (artist_id, artist_name, artist_location, artist_latitude, artist_longitude)
        cur.execute(artist_table_insert, artist_data)

        # insert song record
        # 同样，歌曲数据被结构化为一个元组，然后使用`song_table_insert` SQL 命令插入数据库。
        song_data = (song_id, title, artist_id, year, duration)
        cur.execute(song_table_insert, song_data)
    
    print(f"Records inserted for file {filepath}")

In [118]:
def process_log_file(cur, filepath):
    """
    Process Event log files and insert records into the Postgres database.
    :param cur: cursor reference
    :param filepath: complete file path for the file to load
    """
    # open log file
    # 该行将 JSON 格式的日志文件读入 pandas DataFrame。
    # `lines=True` 参数表示文件每行有一个 JSON 对象。
    df = pd.read_json(filepath, lines=True)
    
    # 通过 `NextSong` 操作进行过滤：
    #这将过滤 DataFrame，使其只包含`page`列的值为 "NextSong "的记录。
    #它还将 `ts` 列转换为精度为毫秒的日期时间格式。
    # filter by NextSong action
    df = df[df['page'] == "NextSong"].astype({'ts': 'datetime64[ms]'})

    # 将时间戳列转换为日期时间格式
    # convert timestamp column to datetime
    t = pd.Series(df['ts'], index=df.index)

    # insert time data records
    # 代码会构建一个新的 DataFrame `time_df`，将时间戳分解为各种时间成分，
    # 如小时、天、年的星期等。
    column_labels = ["timestamp", "hour", "day", "weelofyear", "month", "year", "weekday"]
    time_data = (t, t.dt.hour, t.dt.day, t.dt.isocalendar().week, t.dt.month, t.dt.year, t.dt.weekday)

    time_df = pd.DataFrame(dict(zip(column_labels, time_data)))
    
    # 然后将这些数据插入数据库的时间表中。
    for i, row in time_df.iterrows():
        cur.execute(time_table_insert, list(row))

    # 这将创建一个新的 DataFrame `user_df`，其中包含与用户相关的列。
    # load user table
    user_df = df[['userId','firstName','lastName','gender','level']]
    
    # 然后，函数遍历 `user_df` DataFrame，并将用户数据插入数据库中的 users 表。
    # insert user records
    for i, row in user_df.iterrows():
        cur.execute(user_table_insert, row)

    # 函数遍历原始数据帧 `df`。对于每一行：
    # insert songplay records
    for index, row in df.iterrows():
        
        # get songid and artistid from song and artist tables
        # 它会根据歌曲名称、艺术家姓名和歌曲持续时间，尝试在数据库中查找匹配的歌曲和艺术家。
        cur.execute(song_select, (row.song, row.artist, row.length))
        results = cur.fetchone()
        
        # 如果找到匹配，它将检索歌曲 ID 和艺术家 ID。
        if results:
            songid, artistid = results
        else:
            songid, artistid = None, None

        # insert songplay record
        #  然后，它会构建歌曲播放数据，并将其插入数据库中的歌曲播放表。
        songplay_data = ( row.ts, row.userId, row.level, songid, artistid, row.sessionId, row.location, row.userAgent)
        cur.execute(songplay_table_insert, songplay_data)

In [114]:
def process_data(cur, conn, filepath, func):
    """
    Driver function to load data from songs and event log files into Postgres database.
    :param cur: a database cursor reference 指向 PostgreSQL 数据库的游标引用，允许函数执行 SQL 命令。
    :param conn: database connection reference 数据库连接引用，用于向数据库提交事务。
    :param filepath: parent directory where the files exists JSON 文件所在的目录路径。
    :param func: function to call 处理每个文件时应调用的函数的引用。根据上下文，可以是 `process_song_file` 或 `process_log_file`。
    """
    # process_data "函数是一个驱动函数，用于处理 JSON 文件（歌曲文件或事件日志文件）中的数据并将其加载到 PostgreSQL 数据库中。下面是其功能的细分：
    # get all files matching extension from directory
    # 该函数首先初始化一个空列表 `all_files`。然后，它从 `filepath` 开始遍历所有目录和子目录，并收集找到的所有 JSON 文件。这些文件路径会追加到 `all_files` 列表中。
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    #  该函数计算并打印在指定目录中找到的 JSON 文件总数。
    # get total number of files found
    num_files = len(all_files)
    print('{} files found in {}'.format(num_files, filepath))
    
    # 然后，函数遍历`all_files`列表中的每个文件。对于每个文件：
    # 它调用指定的 `func`（`process_song_file` 或 `process_log_file`）来处理文件。
    # 使用 `conn.commit()` 向数据库提交事务。
    # 打印一条进度消息，说明总共处理了多少文件。
    # iterate over files and process
    for i, datafile in enumerate(all_files, 1):
        func(cur, datafile)
        conn.commit()
        print('{}/{} files processed.'.format(i, num_files))

In [106]:
process_data(cur, conn, filepath=r"C:\Users\27117\Road_To_Data_Engineering\Data_Modeling_with_Postgres\data\song_data", func=process_song_file)

60 files found in C:\Users\27117\Road_To_Data_Engineering\Data_Modeling_with_Postgres\data\song_data
Records inserted for file C:\Users\27117\Road_To_Data_Engineering\Data_Modeling_with_Postgres\data\song_data\A\A\A\TRAAAAW128F429D538.json
1/60 files processed.
Records inserted for file C:\Users\27117\Road_To_Data_Engineering\Data_Modeling_with_Postgres\data\song_data\A\A\A\TRAAABD128F429CF47.json
2/60 files processed.
Records inserted for file C:\Users\27117\Road_To_Data_Engineering\Data_Modeling_with_Postgres\data\song_data\A\A\A\TRAAADZ128F9348C2E.json
3/60 files processed.
Records inserted for file C:\Users\27117\Road_To_Data_Engineering\Data_Modeling_with_Postgres\data\song_data\A\A\A\TRAAAEF128F4273421.json
4/60 files processed.
Records inserted for file C:\Users\27117\Road_To_Data_Engineering\Data_Modeling_with_Postgres\data\song_data\A\A\A\TRAAAFD128F92F423A.json
5/60 files processed.
Records inserted for file C:\Users\27117\Road_To_Data_Engineering\Data_Modeling_with_Postgres\

Records inserted for file C:\Users\27117\Road_To_Data_Engineering\Data_Modeling_with_Postgres\data\song_data\A\B\C\TRABCRU128F423F449.json
58/60 files processed.
Records inserted for file C:\Users\27117\Road_To_Data_Engineering\Data_Modeling_with_Postgres\data\song_data\A\B\C\TRABCTK128F934B224.json
59/60 files processed.
Records inserted for file C:\Users\27117\Road_To_Data_Engineering\Data_Modeling_with_Postgres\data\song_data\A\B\C\TRABCUQ128E0783E2B.json
60/60 files processed.


In [121]:
try:
    process_data(cur, conn, filepath=r"C:\Users\27117\Road_To_Data_Engineering\Data_Modeling_with_Postgres\data\log_data", func=process_log_file)
except Exception as e:
    print(e)
    conn.rollback()

30 files found in C:\Users\27117\Road_To_Data_Engineering\Data_Modeling_with_Postgres\data\log_data
1/30 files processed.
2/30 files processed.
3/30 files processed.
4/30 files processed.
5/30 files processed.
6/30 files processed.
7/30 files processed.
8/30 files processed.
9/30 files processed.
10/30 files processed.
11/30 files processed.
12/30 files processed.
13/30 files processed.
14/30 files processed.
15/30 files processed.
16/30 files processed.
17/30 files processed.
18/30 files processed.
19/30 files processed.
20/30 files processed.
21/30 files processed.
22/30 files processed.
23/30 files processed.
24/30 files processed.
25/30 files processed.
26/30 files processed.
27/30 files processed.
28/30 files processed.
29/30 files processed.
30/30 files processed.


Run `test.ipynb` to see if you've successfully added records to this table.

# Close Connection to Sparkify Database

In [122]:
conn.close()

# Implement `etl.py`
Use what you've completed in this notebook to implement `etl.py`.