# Spark를 활용한 ecommerce 유저 행동 데이터 parquet 파일 변환 및 저장
- [kaggle 데이터셋 링크](https://www.kaggle.com/datasets/mkechinov/ecommerce-behavior-data-from-multi-category-store)

In [7]:
import numpy as np
import pandas as pd
import os
import shutil
import copy
from tqdm import tqdm
import re
import pickle
from joblib import dump, load

import pyarrow as pa
import pyarrow.parquet as pq

from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, date_format, col
from pyspark.sql.dataframe import DataFrame as SparkDataFrame
from pyspark.sql import functions as F

import warnings
warnings.filterwarnings(action='ignore')
pd.set_option("display.max_columns", None)

## 1. 데이터 불러오기

In [2]:
raw_files = os.listdir("data/raw_data")
raw_files

['2019-Dec.csv',
 '2019-Nov.csv',
 '2019-Oct.csv',
 '2020-Apr.csv',
 '2020-Feb.csv',
 '2020-Jan.csv',
 '2020-Mar.csv']

### 1-1. 데이터프레임을 dictionary로 변경

In [2]:
def load_csv_files_to_dict(spark, folder_path):
    """
    지정된 폴더 내의 모든 CSV 파일을 읽어와서,
    파일명(key)과 DataFrame(value)의 dictionary를 생성
    """
    # CSV 파일 목록 생성 (확장자가 .csv인 파일 필터링)
    # csv_files = [f for f in os.listdir(folder_path) if f.endswith(".csv")]
    csv_files = os.listdir(folder_path)
    
    # 결과를 저장할 dictionary 초기화
    csv_dict = {}
    
    # 각 CSV 파일을 읽어와 dictionary에 저장
    for file_name in tqdm(csv_files, desc="Processing CSV files"):
        file_path = os.path.join(folder_path, file_name)
        df = spark.read \
            .option("header", True) \
            .option("inferSchema", True) \
            .csv(file_path)
        raw_df_name = file_name.replace("-", "_").replace(".csv", "").lower()
        csv_dict[raw_df_name] = df
        # DataFrame 내용 확인 (첫 5행 출력)
        # df.show(5)
    
    return csv_dict

In [41]:
# SparkSession 생성
spark = SparkSession.builder \
    .appName("MultiCSVReader") \
    .getOrCreate()

# 함수 호출하여 dictionary 생성
csv_dict = load_csv_files_to_dict(spark, "data/raw_data")

# 생성된 dictionary의 key (파일명) 출력
print("\nCSV 파일 Dictionary keys:", list(csv_dict.keys()))

Processing CSV files: 100%|██████████| 7/7 [04:51<00:00, 41.64s/it]


CSV 파일 Dictionary keys: ['2019_dec', '2019_nov', '2019_oct', '2020_apr', '2020_feb', '2020_jan', '2020_mar']





### 1-2. 불러온 데이터셋 확인

In [4]:
csv_dict["2019_oct"].printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



In [42]:
csv_dict["2019_oct"].show(3)

+-------------------+----------+----------+-------------------+--------------------+--------+-----+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand|price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+--------+-----+---------+--------------------+
|2019-10-01 09:00:00|      view|  44600062|2103807459595387724|                NULL|shiseido|35.79|541312140|72d76fde-8bb3-4e0...|
|2019-10-01 09:00:00|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua| 33.2|554748717|9333dfbd-b87a-470...|
|2019-10-01 09:00:01|      view|  17200506|2053013559792632471|furniture.living_...|    NULL|543.1|519107250|566511c2-e2e3-422...|
+-------------------+----------+----------+-------------------+--------------------+--------+-----+---------+--------------------+
only showing top 3 rows



In [11]:
null_counts_oct = csv_dict["2019_oct"].select([F.count(F.when(F.col(c).isNull(), c)).alias(c)\
                                            for c in csv_dict["2019_oct"].columns])
null_counts_oct.show()

+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|  brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|         0|         0|         0|          0|     13515609|6113008|    0|      0|           2|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+



In [13]:
# 행의 개수
num_rows_oct = csv_dict["2019_oct"].count()

# 열의 개수: csv_dict["2019_oct"].columns 리스트의 길이 계산
num_columns_oct = len(csv_dict["2019_oct"].columns)

print("DataFrame shape:", (num_rows_oct, num_columns_oct))


DataFrame shape: (42448764, 9)


In [12]:
null_counts_apr = csv_dict["2020_apr"].select([F.count(F.when(F.col(c).isNull(), c)).alias(c)\
                                            for c in csv_dict["2020_apr"].columns])
null_counts_apr.show()

+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|  brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|         0|         0|         0|          0|      6755873|8985057|    0|      0|         109|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+



In [14]:
# 행의 개수
num_rows_apr = csv_dict["2020_apr"].count()

# 열의 개수: csv_dict["2020_apr"].columns 리스트의 길이 계산
num_columns_apr = len(csv_dict["2020_apr"].columns)

print("DataFrame shape:", (num_rows_apr, num_columns_apr))


DataFrame shape: (66589268, 9)


## 2. 파생 컬럼 생성
- `event_time` 컬럼의 형태: `2019-12-01 09:00:00`
- 이 컬럼을 날짜와 시간 컬럼으로 각각 분리

In [56]:
def separate_event_time(csv_dict: dict, event_time_col: str = "event_time") -> dict:
    """
    Spark DataFrame의 event_time 컬럼을 timestamp로 변환 후, 
    날짜(yyyy-MM-dd)와 시간(HH:mm:ss) 컬럼으로 분리하여 csv_dict의 각 DataFrame을 업데이트
    """
    for key in tqdm(csv_dict.keys(), desc="Seperating Columns"):
        df: SparkDataFrame = csv_dict[key]
        
        # 1. event_time 컬럼을 to_timestamp()를 사용하여 timestamp 타입으로 변환
        df = df.withColumn(event_time_col, to_timestamp(col(event_time_col)))
        
        # 2. 날짜 부분 추출: date_format()을 사용하여 "yyyy-MM-dd" 형식으로 날짜 추출
        df = df.withColumn("event_time_ymd", date_format(col(event_time_col), "yyyy-MM-dd"))
        
        # 3. 시간 부분 추출: date_format()을 사용하여 "HH:mm:ss" 형식으로 시간 추출
        df = df.withColumn("event_time_hms", date_format(col(event_time_col), "HH:mm:ss"))
        
        # 수정된 DataFrame을 dictionary에 업데이트
        csv_dict[key] = df
        
    return csv_dict

In [57]:
csv_dict_seperated = separate_event_time(csv_dict)

Seperating Columns: 100%|██████████| 7/7 [00:00<00:00, 55.95it/s]


In [58]:
csv_dict_seperated.keys()

dict_keys(['2019_dec', '2019_nov', '2019_oct', '2020_apr', '2020_feb', '2020_jan', '2020_mar'])

In [60]:
csv_dict_seperated["2019_dec"].show(3)

+-------------------+----------+----------+-------------------+--------------------+-----+-------+---------+--------------------+--------------+--------------+
|         event_time|event_type|product_id|        category_id|       category_code|brand|  price|  user_id|        user_session|event_time_ymd|event_time_hms|
+-------------------+----------+----------+-------------------+--------------------+-----+-------+---------+--------------------+--------------+--------------+
|2019-12-01 09:00:00|      view|   1005105|2232732093077520756|construction.tool...|apple|1302.48|556695836|ca5eefc5-11f9-450...|    2019-12-01|      09:00:00|
|2019-12-01 09:00:00|      view|  22700068|2232732091643068746|                NULL|force| 102.96|577702456|de33debe-c7bf-44e...|    2019-12-01|      09:00:00|
|2019-12-01 09:00:01|      view|   2402273|2232732100769874463|appliances.person...|bosch| 313.52|539453785|5ee185a7-0689-4a3...|    2019-12-01|      09:00:01|
+-------------------+----------+--------

In [39]:
csv_dict_seperated["2020_apr"].show(3)

+-------------------+----------+----------+-------------------+--------------------+-------+-------+---------+--------------------+--------------+--------------+
|         event_time|event_type|product_id|        category_id|       category_code|  brand|  price|  user_id|        user_session|event_time_ymd|event_time_hms|
+-------------------+----------+----------+-------------------+--------------------+-------+-------+---------+--------------------+--------------+--------------+
|2020-04-01 09:00:00|      view|   1201465|2232732101407408685|apparel.shoes.sli...|samsung| 230.38|568984877|e2456cef-2d4f-42b...|    2020-04-01|      09:00:00|
|2020-04-01 09:00:01|      view|   1307156|2053013554658804075|electronics.audio...|  apple|1352.67|514955500|38f43134-de83-471...|    2020-04-01|      09:00:01|
|2020-04-01 09:00:01|      view|   1480477|2053013563835941749|appliances.kitche...|  apple|1184.05|633645770|16aba270-b3c2-4b2...|    2020-04-01|      09:00:01|
+-------------------+-------

## 3. 데이터셋 parquet 저장

In [61]:
def save_csv_dict_to_parquet(csv_dict: dict, output_folder: str) -> None:
    """
    딕셔너리의 각 Spark DataFrame을 parquet 파일로 저장
    """
    # output_folder가 존재하지 않으면 생성
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    
    # 딕셔너리의 각 key-value 쌍에 대해 처리
    for key, df in tqdm(csv_dict.items(), desc="Saving Parquet Files"):

        parquet_file = key + ".parquet"
        output_path = os.path.join(output_folder, parquet_file)
        
        # DataFrame을 parquet 파일로 저장 (overwrite 모드)
        df.write.mode("overwrite").parquet(output_path)
        
        print(f"{key} 데이터프레임이 {output_path}로 저장되었습니다.")

In [62]:
output_folder = os.path.join("data", "parquet_data")
save_csv_dict_to_parquet(csv_dict_seperated, output_folder)

Saving Parquet Files:  14%|█▍        | 1/7 [01:40<10:05, 100.97s/it]

2019_dec 데이터프레임이 data\parquet_data\2019_dec.parquet로 저장되었습니다.


Saving Parquet Files:  29%|██▊       | 2/7 [03:25<08:34, 102.90s/it]

2019_nov 데이터프레임이 data\parquet_data\2019_nov.parquet로 저장되었습니다.


Saving Parquet Files:  43%|████▎     | 3/7 [04:28<05:40, 85.03s/it] 

2019_oct 데이터프레임이 data\parquet_data\2019_oct.parquet로 저장되었습니다.


Saving Parquet Files:  57%|█████▋    | 4/7 [06:10<04:34, 91.44s/it]

2020_apr 데이터프레임이 data\parquet_data\2020_apr.parquet로 저장되었습니다.


Saving Parquet Files:  71%|███████▏  | 5/7 [07:34<02:57, 88.98s/it]

2020_feb 데이터프레임이 data\parquet_data\2020_feb.parquet로 저장되었습니다.


Saving Parquet Files:  86%|████████▌ | 6/7 [09:01<01:28, 88.07s/it]

2020_jan 데이터프레임이 data\parquet_data\2020_jan.parquet로 저장되었습니다.


Saving Parquet Files: 100%|██████████| 7/7 [10:26<00:00, 89.48s/it]

2020_mar 데이터프레임이 data\parquet_data\2020_mar.parquet로 저장되었습니다.





In [63]:
# SparkSession 종료
spark.stop()

In [64]:
# 저장된 parquet 파일 확인: SparkSession 생성
spark = SparkSession.builder \
    .appName("ParquetReader") \
    .getOrCreate()

In [72]:
parquet_2019_oct = spark.read.parquet(os.path.join(output_folder, "2019_oct.parquet"))
parquet_2019_oct.show(5)

+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+--------------+--------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|event_time_ymd|event_time_hms|
+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+--------------+--------------+
|2019-10-13 15:25:46|      view|   1002544|2053013555631882655|electronics.smart...|   apple| 460.51|518958788|e7e27c5c-1e78-481...|    2019-10-13|      15:25:46|
|2019-10-13 15:25:46|      view|   3700301|2053013565983425517|appliances.enviro...|   vitek| 120.93|557977070|7afc206c-7259-4be...|    2019-10-13|      15:25:46|
|2019-10-13 15:25:46|      view|  49100004|2127425375913902544|                NULL|    NULL|  45.05|514456508|9d6837a5-40df-49d...|    2019-10-13|      15:25:46|
|2019-10-13 15:25:46| 

In [75]:
print("원본 2019_oct DataFrame shape:", (num_rows_oct, num_columns_oct))


# 행의 개수
num_rows_parquet_2019_oct = parquet_2019_oct.count()

# 열의 개수: parquet_2019_oct.columns 리스트의 길이 계산
num_columns_parquet_2019_oct = len(parquet_2019_oct.columns)

print("Parquet으로 저장된 2019_oct DataFrame shape:", (num_rows_parquet_2019_oct, num_columns_parquet_2019_oct))

원본 2019_oct DataFrame shape: (42448764, 9)
Parquet으로 저장된 2019_oct DataFrame shape: (42448764, 11)


In [76]:
# SparkSession 종료
spark.stop()