In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from datetime import datetime, timedelta
import os

spark = SparkSession.builder.appName("ETL Example").config("spark.jars.packages", "com.mysql:mysql-connector-j:8.4.0").getOrCreate()



# viết hàm etl
def read_data(path, start_date, end_date):

    #hàm đọc file json từ thư mục dựa trên khoảng thời gian
    def read_files(base_path, start_date, end_date):
        files = []

        start = datetime.strptime(start_date, "%Y%m%d")
        end   = datetime.strptime(end_date, "%Y%m%d")

        while start <= end:
            file_path = f"{base_path}/{start.strftime('%Y%m%d')}.json"
            if os.path.exists(file_path):
                files.append(file_path)
            start += timedelta(days=1)

        return spark.read.json(files)
    # Đọc dữ liệu
    df = read_files(path, start_date, end_date)

    #thêm cột file date để tính số ngày active
    #lấy path của file sau đó trích xuất 8 chữ số chứa thời gian 
    df = df.withColumn(
    "file_date",
    regexp_extract(input_file_name(), r"(\d{8})\.json", 1))


    # Lấy các cột cần thiết bao gồm _source.Appname, _source.Contract, _source.TotalDuration
    df = df.select("_source.Appname", "_source.Contract", "_source.TotalDuration", "file_date")

    return df 

def pivot_data(df):
    # Thêm cột Type dựa trên AppName
    df = df.withColumn("Type",
           when(col("AppName").isin('CHANNEL','KPLUS','KPlus'), "Truyền Hình")
          .when(col("AppName").isin('FIMS_RES','FIMS'), "Phim Truyện")
          .when(col("AppName") == 'RELAX', "Giải Trí")
          .when(col("AppName") == 'CHILD', "Thiếu Nhi")
          .when(col("AppName") == 'SPORT', "Thể Thao")
          .otherwise("Error"))
    
    #Lấy các cột cần thiết và loại bỏ những cột có type = "Error"
    df = df.select("Contract", "Type", "TotalDuration") \
           .filter(col("Type") != "Error")
    
    

    #Tính tổng thời lượng xem dựa trên contract và type
    df = df.groupBy("Contract", "Type").agg(sum("TotalDuration").alias("TotalDuration"))
    #Pivot table theo type nhóm theo contract 
    df = df.groupBy('Contract').pivot("Type").sum("TotalDuration") \
       .withColumnRenamed("Truyền Hình", "TVDuration") \
       .withColumnRenamed("Phim Truyện", "MovieDuration") \
       .withColumnRenamed("Thể Thao", "SportDuration") \
       .withColumnRenamed("Giải Trí", "RelaxDuration") \
       .withColumnRenamed("Thiếu Nhi", "ChildDuration")
    return df

def activeness(df):
    df = df.select("Contract","file_date")
    # Tính active days
    # đếm số ngày distinct mà contract xuất hiện

    df = df.dropDuplicates()

    df = df.groupBy("Contract") \
        .agg(countDistinct("file_date").alias("ActiveDays"))

    df = df.withColumn(
        "Activeness",
        col("ActiveDays") * lit(100)/ lit(2.0)
    )

    return df


def most_watch(df):
    # nếu null thì coi như 0 để greatest không ra null
    df = df.fillna(0, subset=["ChildDuration","MovieDuration","RelaxDuration","SportDuration","TVDuration"])
    
    max_value = greatest(col("ChildDuration"), col("MovieDuration"), col("RelaxDuration"), col("SportDuration"), col("TVDuration"))
    df = df.withColumn("Most_Watch", when(max_value == col("ChildDuration"), "Child")
                                   .when(max_value == col("MovieDuration"), "Movie")
                                   .when(max_value == col("RelaxDuration"), "Relax")
                                   .when(max_value == col("SportDuration"), "Sport")
                                   .when(max_value == col("TVDuration"), "TV"))
    return df
def customer_state(df):
    customer_state = concat_ws("-", 
    when(col("ChildDuration") > 0, "Child"),
    when(col("MovieDuration") > 0, "Movie"),
    when(col("RelaxDuration") > 0, "Relax"),
    when(col("SportDuration") > 0, "Sport"),
    when(col("TVDuration") > 0, "TV")
    )

    df = df.withColumn("Customer_Taste", customer_state)

    df = df.replace(['Child-Movie-Relax-Sport-TV'],['all'])
    
    return df

def engagement_type(df):
    df = df.withColumn(
    "TotalDuration_All",
    col("TVDuration")+col("MovieDuration")+col("ChildDuration")+col("RelaxDuration")+col("SportDuration")
    )

    # ví dụ ngưỡng (bạn đổi theo đơn vị duration)
    df = df.withColumn(
        "Engagement_Type",
        when(col("TotalDuration_All") < lit(3*60*60), lit("Low engagement"))
        .when(col("TotalDuration_All") <= lit(3*180*60), lit("Normal engagement"))
        .otherwise(lit("High engagement"))
    )

    return df 
def load_data(df):
    jdbc_url = "jdbc:mysql://localhost:3306/mydb"
    connection_properties = {
    "user": "root",
    "password": "*********",
    "driver": "com.mysql.jdbc.Driver"
    }
    df.write.mode("overwrite").jdbc(url=jdbc_url, table="customer_profile", mode="overwrite", properties=connection_properties)


def main_task():
    start_time = datetime.now()
    # Extract
    path = 'D:\\BA\\INDA\\Spark\\pyspark\\Practice\\data'
    df= read_data(path, '20220401', '20220402')

    #Transform
    df1 = activeness(df)
    df2 = pivot_data(df)
    df3 = most_watch(df2)
    df4 = customer_state(df3)
    df5 = engagement_type(df4)

    result = df5.join(df1, "Contract", "left")
    result = result.select("Contract","Most_Watch", "Customer_Taste", "Engagement_Type", "Activeness" )
    #Load
    load_data(result)
    end_time = datetime.now()
    time_processing = (end_time - start_time).total_seconds()
    print('It took {} to process the data'.format(time_processing))
    return print('Data Saved Successfully')
main_task()

It took 9.421431 to process the data
Data Saved Successfully


In [9]:
df.show()

+---------+----------+--------------+---------------+----------+
| Contract|Most_Watch|Customer_Taste|Engagement_Type|Activeness|
+---------+----------+--------------+---------------+----------+
|HNH572635|        TV|           all|High engagement|     100.0|
|HND083642|        TV|           all|High engagement|     100.0|
|LDFD05747|        TV|           all|High engagement|     100.0|
|HNH960439|        TV|           all|High engagement|     100.0|
|HNJ035736|        TV|           all|High engagement|     100.0|
|HNH579912|        TV|           all|High engagement|     100.0|
|HNH893773|        TV|           all|High engagement|     100.0|
|HNH743103|        TV|           all|High engagement|     100.0|
|DTFD21200|        TV|           all|High engagement|     100.0|
|HNH063566|        TV|           all|High engagement|     100.0|
|DNFD74404|        TV|           all|High engagement|     100.0|
|HND141717|        TV|           all|High engagement|     100.0|
|HNH866786|        TV|   

In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("ETL Example").getOrCreate()

df = spark.read.json('D:\\BA\\INDA\\Spark\\pyspark\\Practice\\data\\*.json')
df = df.withColumn(
    "file_date",
    regexp_extract(input_file_name(), r"(\d{8})\.json", 1))
df.show()

+--------------------+-------+------+--------------------+-----+---------+
|                 _id| _index|_score|             _source|_type|file_date|
+--------------------+-------+------+--------------------+-----+---------+
|450909db-542b-409...|history|     0|{VOD, DTFD21200, ...|kplus| 20220401|
|f1b3a321-0037-437...|history|     0|{SPORT, HND083642...|kplus| 20220401|
|bd08416c-61ef-405...|history|     0|{RELAX, HNH579912...|kplus| 20220401|
|1978eb60-cf44-47d...|history|     0|{FIMS, HNH572635,...|kplus| 20220401|
|f1953eb6-7fbb-418...|history|     0|{SPORT, HNH579912...|kplus| 20220401|
|f1a408c4-9cfd-43c...|history|     0|{KPlus, DTFD21200...|kplus| 20220401|
|159e3e01-3c06-403...|history|     0|{CHANNEL, HNH8937...|kplus| 20220401|
|5f726668-2f0f-461...|history|     0|{BHD_RES, HNH9604...|kplus| 20220401|
|fccf4ad9-7449-4ce...|history|     0|{KPlus, HNH893773...|kplus| 20220401|
|687fd62c-bfb5-496...|history|     0|{DSHD, LDFD05747,...|kplus| 20220401|
|c953e180-5ad3-41b...|his

In [20]:
df1 = df.select("_source.Appname", "_source.Contract", "_source.TotalDuration", "file_date")
df1.show(5)

+-------+---------+-------------+---------+
|Appname| Contract|TotalDuration|file_date|
+-------+---------+-------------+---------+
|    VOD|DTFD21200|         2915| 20220401|
|  SPORT|HND083642|         1187| 20220401|
|  RELAX|HNH579912|         1741| 20220401|
|   FIMS|HNH572635|         1174| 20220401|
|  SPORT|HNH579912|         1365| 20220401|
+-------+---------+-------------+---------+
only showing top 5 rows



In [21]:
df2 = df1.withColumn("Type",
           when(col("AppName").isin('CHANNEL','KPLUS','KPlus'), "Truyền Hình")
          .when(col("AppName").isin('FIMS_RES','FIMS'), "Phim Truyện")
          .when(col("AppName") == 'RELAX', "Giải Trí")
          .when(col("AppName") == 'CHILD', "Thiếu Nhi")
          .when(col("AppName") == 'SPORT', "Thể Thao")
          .otherwise("Error"))
    
    #Lấy các cột cần thiết và loại bỏ những cột có type = "Error"
df3 = df2.select("Contract", "Type", "TotalDuration") \
           .filter(col("Type") != "Error")
    
    

    #Tính tổng thời lượng xem dựa trên contract và type
df4 = df3.groupBy("Contract", "Type").agg(sum("TotalDuration").alias("TotalDuration"))
    #Pivot table theo type nhóm theo contract 
df5 = df4.groupBy('Contract').pivot("Type").sum("TotalDuration") \
       .withColumnRenamed("Truyền Hình", "TVDuration") \
       .withColumnRenamed("Phim Truyện", "MovieDuration") \
       .withColumnRenamed("Thể Thao", "SportDuration") \
       .withColumnRenamed("Giải Trí", "RelaxDuration") \
       .withColumnRenamed("Thiếu Nhi", "ChildDuration")

df5.show(5)

+---------+-------------+-------------+-------------+-------------+----------+
| Contract|RelaxDuration|MovieDuration|ChildDuration|SportDuration|TVDuration|
+---------+-------------+-------------+-------------+-------------+----------+
|HNH572635|     82309195|    165271114|     82928514|     82679061| 247958067|
|HND083642|     81894794|    164821187|     82367173|     83515635| 248476926|
|LDFD05747|     82985750|    164240134|     82592008|     81977390| 247311041|
|HNH960439|     83049290|    165314562|     81744306|     81894543| 246243326|
|HNJ035736|     83158460|    164108348|     82811254|     82574431| 248232218|
+---------+-------------+-------------+-------------+-------------+----------+
only showing top 5 rows



In [24]:
df6 = df5.fillna(0, subset=["ChildDuration","MovieDuration","RelaxDuration","SportDuration","TVDuration"])
    
max_value = greatest(col("ChildDuration"), col("MovieDuration"), col("RelaxDuration"), col("SportDuration"), col("TVDuration"))
df7 = df6.withColumn("Most_Watch", when(max_value == col("ChildDuration"), "Child")
                                   .when(max_value == col("MovieDuration"), "Movie")
                                   .when(max_value == col("RelaxDuration"), "Relax")
                                   .when(max_value == col("SportDuration"), "Sport")
                                   .when(max_value == col("TVDuration"), "TV"))
df7.select("Contract", "Most_Watch").show(5)

+---------+----------+
| Contract|Most_Watch|
+---------+----------+
|HNH572635|        TV|
|HND083642|        TV|
|LDFD05747|        TV|
|HNH960439|        TV|
|HNJ035736|        TV|
+---------+----------+
only showing top 5 rows



In [26]:
customer_state = concat_ws("-", 
    when(col("ChildDuration") > 0, "Child"),
    when(col("MovieDuration") > 0, "Movie"),
    when(col("RelaxDuration") > 0, "Relax"),
    when(col("SportDuration") > 0, "Sport"),
    when(col("TVDuration") > 0, "TV")
    )

df8 = df7.withColumn("Customer_Taste", customer_state)

df9 = df8.replace(['Child-Movie-Relax-Sport-TV'],['all'])
df9.select("Contract", "Customer_Taste").show(5)
    

+---------+--------------+
| Contract|Customer_Taste|
+---------+--------------+
|HNH572635|           all|
|HND083642|           all|
|LDFD05747|           all|
|HNH960439|           all|
|HNJ035736|           all|
+---------+--------------+
only showing top 5 rows



In [28]:
df10 = df9.withColumn(
    "TotalDuration_All",
    col("TVDuration")+col("MovieDuration")+col("ChildDuration")+col("RelaxDuration")+col("SportDuration")
    )

    # ví dụ ngưỡng (bạn đổi theo đơn vị duration)
df11 = df10.withColumn(
        "Engagement_Type",
        when(col("TotalDuration_All") < lit(3*60*60), lit("Low engagement"))
        .when(col("TotalDuration_All") <= lit(3*180*60), lit("Normal engagement"))
        .otherwise(lit("High engagement"))
    )

df11.select("Contract", "Engagement_Type").show(5)

+---------+---------------+
| Contract|Engagement_Type|
+---------+---------------+
|HNH572635|High engagement|
|HND083642|High engagement|
|LDFD05747|High engagement|
|HNH960439|High engagement|
|HNJ035736|High engagement|
+---------+---------------+
only showing top 5 rows



In [30]:
df12 = df1.select("Contract","file_date")
    # Tính active days
    # đếm số ngày distinct mà contract xuất hiện

df13 = df12.dropDuplicates()

df14 = df13.groupBy("Contract") \
        .agg(countDistinct("file_date").alias("ActiveDays"))

df15 = df14.withColumn(
        "Activeness",
        col("ActiveDays") * lit(100)/ lit(5.0)
    )

df15.select("Contract", "Activeness").show(5)


+---------+----------+
| Contract|Activeness|
+---------+----------+
|HNH572635|     100.0|
|HND083642|     100.0|
|LDFD05747|     100.0|
|HNH960439|     100.0|
|HNJ035736|     100.0|
+---------+----------+
only showing top 5 rows



In [31]:
result = df11.join(df15, "Contract", "left")
result = result.select("Contract","Most_Watch", "Customer_Taste", "Engagement_Type", "Activeness" )
result.show()

+---------+----------+--------------+---------------+----------+
| Contract|Most_Watch|Customer_Taste|Engagement_Type|Activeness|
+---------+----------+--------------+---------------+----------+
|HNH572635|        TV|           all|High engagement|     100.0|
|HND083642|        TV|           all|High engagement|     100.0|
|LDFD05747|        TV|           all|High engagement|     100.0|
|HNH960439|        TV|           all|High engagement|     100.0|
|HNJ035736|        TV|           all|High engagement|     100.0|
|HNH579912|        TV|           all|High engagement|     100.0|
|HNH893773|        TV|           all|High engagement|     100.0|
|HNH743103|        TV|           all|High engagement|     100.0|
|DTFD21200|        TV|           all|High engagement|     100.0|
|HNH063566|        TV|           all|High engagement|     100.0|
|DNFD74404|        TV|           all|High engagement|     100.0|
|HND141717|        TV|           all|High engagement|     100.0|
|HNH866786|        TV|   