In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window
import os
from datetime import datetime

In [2]:
spark = (
    SparkSession.builder
    .appName("Local-ETL-Test")
    .master("local[*]")
    .config("spark.driver.memory", "2g")
    .config("spark.sql.files.maxPartitionBytes", 256 * 1024 * 1024) # 256 * 1024 * 1024 bytes
    .config("spark.sql.shuffle.partitions", "200") # 200 partitions for shuffle operations
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")

In [3]:
folder_path = f"/data/raw/log_search/"

date_file = os.listdir(folder_path)

save_path = f"/data/destination/log_search/"

In [4]:
def read_data(path, sub_folder):
    """
    đọc file folder_path join với sub folder + file name
    """
    path = os.path.join(path, sub_folder, "*.parquet")
    
    df = spark.read.parquet(path)
    return df

In [5]:
def datetime_transform(df, date_value):
    """
    Transform date được lấy từ sub folder, add vào cột làm giá trị cho date
    có thể không cần dùng tới, tuy nhiên, ai biết được
    """
    
    dt = datetime.strptime(date_value, "%Y%m%d")
    date_value = dt.date()
    month_value = dt.month
    
    df = df.drop("datetime")
    df = df.withColumn("date", F.lit(date_value))
    return df, month_value


In [6]:
def save_data(df, path= save_path):
    """
    save data
    """
    (
        df
        .repartition(4)
        .write
        .mode("overwrite")
        .option("header", "true")
        .csv(path)
    )
    print(f"Data saved to {path}")

In [7]:
def most_watch(final_df):
    """
    b1 bỏ user_id null
    b2 groupBy, thêm cột count
    tạo window theo user_id, order theo count giảm dần 
    lấy row number, lấy rn = 1
    đổi tên cột từ keyword -> mostWatch
    trả về final_df
    """
    final_df = final_df.where(col("user_id").isNotNull())

    final_df = (
        final_df
        .groupBy("user_id", "keyword")
        .count()
    )

    w = Window.partitionBy("user_id").orderBy(col("count").desc())

    final_df = (
        final_df
        .withColumn("rn", F.row_number().over(w))
        .where(col("rn") == 1)
        .select("user_id", col("keyword").alias("mostWatch"))
    )

    return final_df


In [None]:
def control_flow():
    final_df = None
    current_month = None

    for file in date_file:
        print(f"Processing daily file: {file}")

        # read + transform
        df = read_data(folder_path, file)
        df, month_value = datetime_transform(df, file)

        # first batch
        if final_df is None:
            final_df = df
            current_month = month_value
            continue

        # same month -> union
        if month_value == current_month:
            final_df = final_df.unionByName(df)

        # month changed -> flush old month
        
        else:
            print(f"Flushing month {current_month}")

            final_df = most_watch(final_df)

            path = f"{save_path}/month={current_month}" # month_{} -> month={} chatgpt noi dw lam nhu the
            save_data(final_df, path)

            # reset for new month
            final_df = df
            current_month = month_value

    # flush last month
    if final_df is not None:
        print(f"Flushing month {current_month}")

        final_df = most_watch(final_df)

        path = f"{save_path}/month={current_month}"
        save_data(final_df, path)

    print("ETL task complete")

In [9]:
if __name__ == "__main__":
    control_flow()

Processing daily file: 20220601
Processing daily file: 20220602
Processing daily file: 20220603
Processing daily file: 20220604
Processing daily file: 20220605
Processing daily file: 20220606
Processing daily file: 20220607
Processing daily file: 20220608
Processing daily file: 20220609
Processing daily file: 20220610
Processing daily file: 20220611
Processing daily file: 20220612
Processing daily file: 20220613
Processing daily file: 20220614
Processing daily file: 20220701
Flushing month 6
Data saved to /data/destination/log_search//month=6
Processing daily file: 20220702
Processing daily file: 20220703
Processing daily file: 20220704
Processing daily file: 20220705
Processing daily file: 20220706
Processing daily file: 20220707
Processing daily file: 20220708
Processing daily file: 20220709
Processing daily file: 20220710
Processing daily file: 20220711
Processing daily file: 20220712
Processing daily file: 20220713
Processing daily file: 20220714
Flushing month 7
Data saved to /dat