In [None]:
import os
from datetime import datetime, timedelta
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *

In [None]:
spark = SparkSession.builder.config("spark.driver.memory", "8g").getOrCreate()


def read_data(path):
    return spark.read.json(path)

def select_fields(df):
    return df.select("_source.*")

def calculate_devices(df):
    total_devices = df.select("Contract", "Mac").groupBy("Contract").count()
    return total_devices.withColumnRenamed("count", "TotalDevices")

def transform_category(df):
    return df.withColumn(
        "Type",
        when(col("AppName").isin('CHANNEL', 'DSHD', 'KPLUS', 'KPlus'), "Truyền Hình")
        .when(col("AppName").isin('VOD', 'FIMS_RES', 'BHD_RES', 'VOD_RES', 'FIMS', 'BHD', 'DANET'), "Phim Truyện")
        .when(col("AppName") == 'RELAX', "Giải Trí")
        .when(col("AppName") == 'CHILD', "Thiếu Nhi")
        .when(col("AppName") == 'SPORT', "Thể Thao")
        .otherwise("Error")
    )

def calculate_statistics(df):
    statistics = df.select('Contract', 'TotalDuration', 'Type').groupBy('Contract', 'Type').sum()
    statistics = statistics.withColumnRenamed('sum(TotalDuration)', 'TotalDuration')
    statistics = statistics.groupBy('Contract').pivot('Type').sum('TotalDuration').na.fill(0)
    return statistics

def finalize_result(statistics, total_devices):
    return statistics.join(total_devices, 'Contract', 'inner')

def save_data(result, save_path):
    result.repartition(1).write.mode('overwrite').option("header", "true").csv(save_path)
    print("Data Saved Successfully")

def etl_1_day(path):
    print(f'--- ETL for {path} ---')
    df = read_data(path)
    df = select_fields(df)
    total_devices = calculate_devices(df)
    df = transform_category(df)
    statistics = calculate_statistics(df)
    return finalize_result(statistics, total_devices)

In [None]:
def main(input_path, output_path, start_date, to_date):
    start_date = datetime.strptime(start_date, '%Y%m%d').date()
    to_date = datetime.strptime(to_date, '%Y%m%d').date()

    date_list = []
    current_date = start_date
    while current_date <= to_date:
        date_list.append(current_date.strftime('%Y%m%d'))
        current_date += timedelta(days=1)

    print("Processing dates:", date_list)

    start_time = datetime.now()
    df = etl_1_day(os.path.join(input_path, date_list[0] + '.json'))
    for i in date_list[1:]:
        print(f"--- ETL task for {i} ---")
        new_df = etl_1_day(os.path.join(input_path, i + '.json'))
        new_df = new_df.select('Contract', 'Giải Trí', 'Phim Truyện', 'Thiếu Nhi', 'Thể Thao', 'Truyền Hình', 'TotalDevices')
        df = df.union(new_df)
        df.cache()
    print("Calculation on final output")
    result = df.groupBy("Contract").sum()
    result.show()
    save_data(result, output_path)
    end_time = datetime.now()
    print("Total time (s):", (end_time - start_time).total_seconds())
    

In [None]:
if __name__ == "__main__":
    input_path = 'D:/study/dataset/log_content/'
    output_path = 'D:/study/dataset/output/'
    start_date = input('Please input start_date format yyyymmdd: ')
    to_date = input('Please input to_date format yyyymmdd: ')
    main(input_path, output_path, start_date, to_date)