In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os 
from datetime import datetime, timedelta


In [2]:
spark = SparkSession.builder.config("spark.driver.memory", "4g").getOrCreate()

In [3]:
#đọc data
def read_data(path):
    df = spark.read.json(path)
    return df

In [8]:
path = 'D:\\Hoc_DE\\Data\\Dataset\\log_content\\20220401.json'

data = read_data(path)
data.printSchema()
data.show(truncate=False)

root
 |-- _id: string (nullable = true)
 |-- _index: string (nullable = true)
 |-- _score: long (nullable = true)
 |-- _source: struct (nullable = true)
 |    |-- AppName: string (nullable = true)
 |    |-- Contract: string (nullable = true)
 |    |-- Mac: string (nullable = true)
 |    |-- TotalDuration: long (nullable = true)
 |-- _type: string (nullable = true)

+--------------------+-------+------+--------------------------------------+-----+
|_id                 |_index |_score|_source                               |_type|
+--------------------+-------+------+--------------------------------------+-----+
|AX_momhia1FFivsGrn9o|history|0     |{KPLUS, HNH579912, 0C96E62FC55C, 254} |kplus|
|AX_momhca1FFivsGrnvg|history|0     |{KPLUS, HUFD40665, CCEDDC333614, 1457}|kplus|
|AX_momhaa1FFivsGrnny|history|0     |{KPLUS, HNH572635, B068E6A1C5F6, 2318}|kplus|
|AX_momhca1FFivsGrnvv|history|0     |{KPLUS, HND141717, 08674EE8D2C2, 1452}|kplus|
|AX_momhia1FFivsGrn98|history|0     |{KPLUS, HNH743

In [168]:
def tranfrom_data(df) :
    df = df.select('_source.*')
    df = df.drop('Mac')
    df.select('AppName').distinct()
    df = df.withColumn("Category",
        when((col("AppName") == 'CHANNEL') | 
            (col("AppName") =='KPLUS'), "Truyền Hình")
        .when((col("AppName") == 'VOD') | 
            (col("AppName") =='FIMS'), "Phim Truyện")
        .when((col("AppName") == 'RELAX'), "Giải Trí")
        .when((col("AppName") == 'CHILD'), "Thiếu Nhi")
        .when((col("AppName") == 'SPORT'), "Thể Thao")
        .otherwise("Error"))
    df = df.select('Contract','Category','TotalDuration')
    return df

In [169]:
def summarize_and_pivot_data(df):
    df = df.groupBy('Contract','Category').sum('TotalDuration').withColumnRenamed('sum(TotalDuration)','TotalDuration')
    result =  df.groupBy('Contract').pivot('Category').sum('TotalDuration').fillna(0)
    return result

In [170]:
def process(path):
    print('------------------------')
    print('Starting read data')
    data = read_data(path)
    print('------------------------')
    print('Starting tranfrom data')
    data = tranfrom_data(data)
    print('------------------------')
    print('Starting summarize and pivot data')
    result = summarize_and_pivot_data(data)
    print('------------------------')
    print('Task successfull')
    return result

In [171]:
# path = 'D:\\Hoc_DE\\Data\\Dataset\\log_content\\20220401.json'
# date = '20-13-13'
# final = process(path,date)
# final.show()

In [195]:
def convert_to_datevalue(value):
    date_value = datetime.strptime(value, "%Y%m%d").date()
    return date_value

def date_range(start_date, end_date):
    date_list = []
    current_date = start_date
    while current_date <= end_date:
        date_list.append(current_date.strftime("%Y%m%d"))  # Trả về định dạng chuỗi YYYYMMDD
        current_date += timedelta(days=1)  # Sử dụng timedelta
    return date_list

def generate_date_range(from_date, to_date):
    from_date = convert_to_datevalue(from_date)
    to_date = convert_to_datevalue(to_date)
    date_list = date_range(from_date, to_date)
    return date_list

def convert_filename_to_date(filename):
    # Lấy phần chuỗi chứa ngày từ tên file
    date_str = filename.split('.')[0]  # Lấy chuỗi trước dấu chấm
    # Chuyển đổi định dạng từ YYYYMMDD thành YYYY-MM-DD
    date_obj = datetime.strptime(date_str, '%Y%m%d').date()  # Chuyển đổi thành đối tượng datetime
    return date_obj

In [173]:
path = 'D:\\Hoc_DE\\Data\\Dataset\\log_content'
list_file = os.listdir(path)
list_file
read_data(path + '\\' + list_file[0])

DataFrame[_id: string, _index: string, _score: bigint, _source: struct<AppName:string,Contract:string,Mac:string,TotalDuration:bigint>, _type: string]

In [174]:
def read_listfile(list_file,path):
    final_result = None
    a = 0
    for file in list_file:
        # print(f"==> Processing file {file} <==")
        path_new = path + '\\' + list_file[a]
        data_now = read_data(path_new)
        date_data =convert_filename_to_date(file)
        data_now = data_now.withColumn("date", lit(date_data))  # Thêm cột 'date'


    if final_result is None:
        final_result = data_now  # Gán giá trị lần đầu
    else:
        final_result = final_result.union(data_now)  # Nối với DataFrame hiện tại
    a = a + 1
    return final_result

In [175]:
list_file = os.listdir(path)
path = 'D:\\Hoc_DE\\Data\\Dataset\\log_content'
df = read_listfile(list_file, path)

In [178]:
def process_30days(df):
    print('------------------------')
    print('Starting tranfrom data')
    data = tranfrom_data(df)
    print('------------------------')
    print('Starting summarize and pivot data')
    result = summarize_and_pivot_data(data)
    print('------------------------')
    print('Task successfull')
    return result
    

In [179]:
final_result = process_30days(df)
final_result.show()

------------------------
Starting tranfrom data
------------------------
Starting summarize and pivot data
------------------------
Task successfull
+---------+--------+-----------+---------+--------+-----------+
| Contract|Giải Trí|Phim Truyện|Thiếu Nhi|Thể Thao|Truyền Hình|
+---------+--------+-----------+---------+--------+-----------+
|HTFD11598|       0|       2884|        0|       0|        707|
|HPFD48556|      69|          0|        0|       0|      92976|
|NBFD10014|       0|          0|        0|       0|      84628|
|HNH619088|       0|       8456|      234|       0|      65210|
|HNH036174|       0|          0|        0|       0|       6049|
|DNH067877|       0|          0|        0|       0|       5760|
|SGH806190|       0|          0|        0|       0|       1131|
|HDFD42710|       0|          0|        0|       0|      12096|
|SGH674576|       0|       1535|        0|       0|       9910|
|NDFD32943|       0|          0|        0|       0|       6269|
|TNFD30439|       0

In [197]:
list_file = generate_date_range('20220401', '20220403')
list_file

['20220401', '20220402', '20220403']