In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, desc
from pyspark.sql.functions import row_number
import pyspark.sql.functions as sf
from datetime import datetime
import os

Start spark session

In [2]:
def get_spark_session():
    # return SparkSession.builder.config("spark.driver.memory", "8g").config(
    #     "spark.executor.cores", 8).getOrCreate()
    return SparkSession.builder \
        .appName("Write to MySQL") \
        .config("spark.jars", "/Users/habaokhanh/.ivy2/jars/mysql-connector-java-8.0.30.jar")\
        .getOrCreate()

Read json data

In [3]:
def read_json_data(spark, path, file_name):
    data = spark.read.json(path + file_name)
    return data.select('_source.*')

Functions preparation

In [4]:
def add_type_column(df):
    tv_condition_list = ['CHANNEL', 'DSHD', 'KPLUS', 'KPlus']
    movie_condition_list = ['VOD', 'FIMS_RES', 'BHD_RES', 'VOD_RES', 'FIMS', 'BHD', 'DANET']
    df = df.withColumn("Type",
                       when(col("AppName").isin(*tv_condition_list), "Truyền Hình")
                       .when(col("AppName").isin(*movie_condition_list),"Phim Truyện")
                       .when(col("AppName") == 'RELAX', "Giải Trí")
                       .when(col("AppName") == 'CHILD', "Thiếu Nhi")
                       .when(col("AppName") == 'SPORT', "Thể Thao")
                       .otherwise("Error"))
    return df

def rename_column(df):
    df = df.withColumnRenamed('Giải Trí', 'RelaxDuration') \
        .withColumnRenamed('Phim Truyện', 'MovieDuration') \
        .withColumnRenamed('Thiếu Nhi', 'ChildDuration') \
        .withColumnRenamed('Thể Thao', 'SportDuration') \
        .withColumnRenamed('Truyền Hình', 'TVDuration')
    return df

In [5]:
def add_date_filter_non_error(df, file_date):
    df = df.withColumn('Date', sf.lit(file_date))
    df = df.filter(df.Type != 'Error').select('Contract', 'Type', 'TotalDuration', 'Date').dropDuplicates()
    return df

In [6]:
def calculate_most_watched(df):
    windowSpec = Window.partitionBy("Contract").orderBy(desc("TotalDuration"))
    most_watched = df.withColumn("rank", row_number().over(windowSpec))
    most_watched = most_watched.filter(most_watched.rank == 1)
    return most_watched.select("Contract", "Type").withColumnRenamed("Type", "MostWatch")

In [7]:
def calculate_customer_taste(df):
    for column in df.columns:
        if 'Duration' in column:
            df = df.withColumn(column, when(col(column).isNotNull(), column[:-8]))

    taste = df.withColumn('CustomerTaste',
                           sf.concat_ws("-", *[col(i) for i in df.columns if 'Duration' in i]))

    taste = taste.select('Contract', 'CustomerTaste')
    return taste

In [8]:
def calculate_activeness(df, start_date, end_date):
    total_days = (end_date - start_date).days + 1
    
    active = df.groupby('Contract', 'Date').agg((sf.sum('TotalDuration').alias('TotalDurationPerDay')))
    active = active.withColumn("IsActive", sf.when(active.TotalDurationPerDay > 0, 1).otherwise(0))

    activeness = active.groupBy("Contract").agg(sf.sum("IsActive").alias("ActiveDays"))
    activeness = activeness.withColumn("ActiveRate", (sf.col("ActiveDays") / sf.lit(total_days)))
    return activeness

In [9]:
def join_dataframes(final_df, most_watched_df, customer_taste_df, activeness_df):
    final_df = final_df.join(most_watched_df, on='Contract', how='left')
    final_df = final_df.join(customer_taste_df, on='Contract', how='left')
    final_df = final_df.join(activeness_df, on='Contract', how='left')
    return final_df

In [10]:
def write_data(df):
    # df.repartition(1).write.csv('/Users/habaokhanh/Study_BigData_Dataset/log_content/clean/df_clean1', header=True)
    jdbc_url = "jdbc:mysql://localhost:3306"
    db_name = "movie"
    table_name = "log_movie_olap"
    mysql_url = f"{jdbc_url}/{db_name}"
    db_properties = {
        "user": "root",
        "password": "h@b@0kh@nh", 
        "driver": "com.mysql.cj.jdbc.Driver"
    }
    df.write\
    .mode("overwrite") \
    .jdbc(url=mysql_url, table=table_name, mode="overwrite", properties=db_properties)

Test & modify main task

In [11]:
def main_task(start_date_str, end_date_str):
    start_time = datetime.now()

    # Extract
    path = '/Users/habaokhanh/Study_BigData_Dataset/log_content/'
    list_file = sorted([file for file in os.listdir(path) if file != '.DS_Store'])
    spark = get_spark_session()
    start_date = datetime.strptime(start_date_str, "%Y%m%d").date()
    end_date = datetime.strptime(end_date_str, "%Y%m%d").date()

    result_df = None
    for file_name in list_file:
        file_date_str = file_name.split('_')[-1].split('.')[0]
        file_date = datetime.strptime(file_date_str, "%Y%m%d").date()
        if start_date <= file_date <= end_date:
            df = read_json_data(spark, path, file_name)
            df = add_type_column(df)
            df = add_date_filter_non_error(df, file_date)

            if result_df is None:
                result_df = df
            else:
                result_df = result_df.union(df)
    
    return result_df

In [12]:
result_df = main_task('20220401', '20220403')

23/11/08 17:45:20 WARN Utils: Your hostname, MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.9 instead (on interface en0)
23/11/08 17:45:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/11/08 17:45:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/08 17:45:21 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/11/08 17:45:21 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
                                                                                

In [13]:
result_df.show()



+---------+-----------+-------------+----------+
| Contract|       Type|TotalDuration|      Date|
+---------+-----------+-------------+----------+
|BGFD29928|Truyền Hình|          669|2022-04-01|
|NDD011998|Truyền Hình|        18941|2022-04-01|
|HND486882|Phim Truyện|         5545|2022-04-01|
|QAFD05596|Truyền Hình|        83495|2022-04-01|
|HNH910024|Phim Truyện|         2847|2022-04-01|
|TGD020392|Phim Truyện|        17534|2022-04-01|
|TBFD09573|Phim Truyện|        14544|2022-04-01|
|TIFD29139|Phim Truyện|        25726|2022-04-01|
|BID025030|Truyền Hình|          170|2022-04-01|
|SGH069784|Truyền Hình|          151|2022-04-01|
|SGH936402|Truyền Hình|           61|2022-04-01|
|CBFD00028|Truyền Hình|        82141|2022-04-01|
|HPFD25462|Truyền Hình|        82775|2022-04-01|
|HND204270|Truyền Hình|        18770|2022-04-01|
|DAFD39835|Phim Truyện|         6728|2022-04-01|
|SGH661929|Phim Truyện|         7847|2022-04-01|
|BDAAA4088|Truyền Hình|          587|2022-04-01|
|HYFD17846|Truyền Hì

                                                                                

In [14]:
result_df_pivot = result_df.groupBy('Contract','Date').pivot("Type").sum('TotalDuration')
result_df_pivot = rename_column(result_df_pivot)
result_df_pivot.show()

23/11/08 17:45:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:45:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:45:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:45:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:45:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:45:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:45:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:45:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:45:49 WARN RowBasedKeyValueBatch: Calling spill() on

+---------+----------+-------------+-------------+-------------+-------------+----------+
| Contract|      Date|RelaxDuration|MovieDuration|ChildDuration|SportDuration|TVDuration|
+---------+----------+-------------+-------------+-------------+-------------+----------+
|AGAAA1203|2022-04-03|         null|         null|         null|         null|      9162|
|AGAAA1415|2022-04-03|         null|         null|         null|         null|      2383|
|AGAAA1461|2022-04-03|         null|         null|         null|         null|     52712|
|AGD031770|2022-04-02|         null|         null|         null|         null|      8926|
|AGFD00099|2022-04-01|         null|         null|         null|         null|        87|
|AGFD04698|2022-04-02|         null|         null|         null|         null|     38488|
|AGFD09595|2022-04-01|         null|         null|         null|         null|      4612|
|AGFD10782|2022-04-01|         null|         7676|         1521|         null|       104|
|AGFD11789

                                                                                

Most watched df

In [15]:
most_watched_df = calculate_most_watched(result_df)
most_watched_df.show()



+--------------+-----------+
|      Contract|  MostWatch|
+--------------+-----------+
|113.182.209.48|   Giải Trí|
|     AGAAA0338|Truyền Hình|
|     AGAAA0342|Truyền Hình|
|     AGAAA0346|Truyền Hình|
|     AGAAA0353|Phim Truyện|
|     AGAAA0372|Truyền Hình|
|     AGAAA0391|Truyền Hình|
|     AGAAA0504|Truyền Hình|
|     AGAAA0544|Truyền Hình|
|     AGAAA0550|Truyền Hình|
|     AGAAA0555|Truyền Hình|
|     AGAAA0576|Truyền Hình|
|     AGAAA0613|Truyền Hình|
|     AGAAA0638|Truyền Hình|
|     AGAAA0663|Truyền Hình|
|     AGAAA0693|Truyền Hình|
|     AGAAA0718|Truyền Hình|
|     AGAAA0723|Truyền Hình|
|     AGAAA0732|Truyền Hình|
|     AGAAA0750|Truyền Hình|
+--------------+-----------+
only showing top 20 rows



                                                                                

Customer taste

In [16]:
customer_taste_df = calculate_customer_taste(result_df_pivot)
customer_taste_df.show()

23/11/08 17:46:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:46:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:46:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:46:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:46:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:46:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:46:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:46:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:46:12 WARN RowBasedKeyValueBatch: Calling spill() on

+---------+--------------------+
| Contract|       CustomerTaste|
+---------+--------------------+
|AGAAA1203|                  TV|
|AGAAA1415|                  TV|
|AGAAA1461|                  TV|
|AGD031770|                  TV|
|AGFD00099|                  TV|
|AGFD04698|                  TV|
|AGFD09595|                  TV|
|AGFD10782|      Movie-Child-TV|
|AGFD11789|                  TV|
|AGFD12029|                  TV|
|AGFD13638|                  TV|
|AGFD15028|            Child-TV|
|AGFD17589|Relax-Movie-Child...|
|AGFD26512|                  TV|
|AGFD30875|                  TV|
|AGFD31724|                  TV|
|AGFD35499|                  TV|
|AGFD35884|                  TV|
|AGFD36965|                  TV|
|AGFD41051|                  TV|
+---------+--------------------+
only showing top 20 rows



                                                                                

Active rate

In [19]:
start_date_str = '20220401'
end_date_str = '20220403'
start_date = datetime.strptime(start_date_str, "%Y%m%d").date()
end_date = datetime.strptime(end_date_str, "%Y%m%d").date()

activeness_df = calculate_activeness(result_df, start_date, end_date)
activeness_df.show()

23/11/08 17:50:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:50:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:50:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:50:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:50:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:50:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:50:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:50:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:50:24 WARN RowBasedKeyValueBatch: Calling spill() on

+---------+----------+------------------+
| Contract|ActiveDays|        ActiveRate|
+---------+----------+------------------+
|HNH332181|         3|               1.0|
|HNH794192|         1|0.3333333333333333|
|HNH870395|         3|               1.0|
|HYFD47097|         3|               1.0|
|HYFD55120|         3|               1.0|
|KGFD09873|         3|               1.0|
|QNFD34621|         3|               1.0|
|QNFD67022|         3|               1.0|
|SGD162987|         3|               1.0|
|SGH093032|         3|               1.0|
|SGH494322|         3|               1.0|
|STFD21925|         3|               1.0|
|TNFD17016|         3|               1.0|
|HNH871147|         3|               1.0|
|HUFD05523|         3|               1.0|
|HYFD43307|         3|               1.0|
|NDFD19838|         3|               1.0|
|QNFD76024|         3|               1.0|
|SGD487844|         3|               1.0|
|SGJ005892|         3|               1.0|
+---------+----------+------------

                                                                                

Final df to write to db

In [20]:
final_df = join_dataframes(result_df_pivot, most_watched_df, customer_taste_df, activeness_df)
final_df.show()

23/11/08 17:50:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:50:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:50:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:50:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:50:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:50:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:50:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:50:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/08 17:50:33 WARN RowBasedKeyValueBatch: Calling spill() on



23/11/08 17:50:42 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.
23/11/08 17:50:42 WARN TaskMemoryManager: Failed to allocate a page (33554432 bytes), try again.

+--------------+----------+-------------+-------------+-------------+-------------+----------+-----------+-------------+----------+------------------+
|      Contract|      Date|RelaxDuration|MovieDuration|ChildDuration|SportDuration|TVDuration|  MostWatch|CustomerTaste|ActiveDays|        ActiveRate|
+--------------+----------+-------------+-------------+-------------+-------------+----------+-----------+-------------+----------+------------------+
|113.182.209.48|2022-04-01|           89|         null|         null|         null|        63|   Giải Trí|     Relax-TV|         1|0.3333333333333333|
|     AGAAA0338|2022-04-03|         null|         null|         null|         null|      8894|Truyền Hình|           TV|         3|               1.0|
|     AGAAA0338|2022-04-03|         null|         null|         null|         null|      8894|Truyền Hình|           TV|         3|               1.0|
|     AGAAA0338|2022-04-03|         null|         null|         null|         null|      8894|

                                                                                