In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

sc = SparkContext()
spark = SparkSession.builder.appName("LogAnalysis").getOrCreate()

In [3]:
import os
folder_path = "./DataSampleTest"  # Replace with the actual folder path
log_files = [f for f in os.listdir(folder_path) if f.startswith("log")]
print(log_files)

['logt21.txt', 'logt22.txt', 'logt23.txt', 'logt24.txt', 'logt25.txt', 'logt31.txt', 'logt32.txt']


# 1. Prepare data 

## 1.1. Read data

Bước này đọc & chuẩn bị data cho phân tích

In [8]:
# demo dùng hàm eval để parse json string thành map
s = "{u'ItemId': u'100052388', u'RealTimePlaying': u'570.3', u'SessionMainMenu': u'B046FCAC0DC1:2016:02:17:03:00:34:105'}"
eval(s)

{'ItemId': '100052388',
 'RealTimePlaying': '570.3',
 'SessionMainMenu': 'B046FCAC0DC1:2016:02:17:03:00:34:105'}

In [2]:
# read all text files into 
rdd = sc.textFile("./DataSampleTest/logt*.txt").map(lambda x: eval(x)) # x = x[0] (in case rdd 1-column): value of rdd 
rdd

PythonRDD[2] at RDD at PythonRDD.scala:53

In [81]:
df_input = rdd.toDF()
df_input.show()

+-----+-------+--------------------+---------+---------+----------+-------------+-----------+--------+------------------+------------+--------+------+------------+---------+--------------------+------------+---------+-----+------------+-----------+--------------+---------------+------------------+------------+--------------------+--------------------+--------------------+---------+-------------+--------------------+-------------+
|AppId|AppName|             BoxTime|ChapterID| Contract|CustomerID|DefaultGetway|  Directors|Duration|ElapsedTimePlaying|       Event|Firmware|Folder|          Ip|   ItemId|            ItemName|ListOnFolder|LocalType|LogId|         Mac| PrimaryDNS|PublishCountry|RealTimePlaying|            Screen|SecondaryDNS|             Session|     SessionMainMenu|      SessionSubMenu|SubMenuId|   SubnetMask|                 Url|       ip_wan|
+-----+-------+--------------------+---------+---------+----------+-------------+-----------+--------+------------------+-----------

In [82]:
df_input.count() # -> đỡ phải lặp qua từng files như cách 1 

914060

## 1.2. Preprocess

In [83]:
# lấy các cột đề cần
df_input = df_input.selectExpr("Mac as MAC", "SessionMainMenu", "AppName", "LogId as LogID", "Event", "ItemId as ItemID", "RealTimePlaying")

In [39]:
df_input.show(truncate=False)

+------------+------------------------------------+-------+-----+------------+---------+---------------+
|MAC         |SessionMainMenu                     |AppName|LogID|Event       |ItemID   |RealTimePlaying|
+------------+------------------------------------+-------+-----+------------+---------+---------------+
|B046FCAC0DC1|B046FCAC0DC1:2016:02:12:12:35:13:437|VOD    |52   |StopVOD     |100052388|570.3          |
|B046FCAC0DC1|B046FCAC0DC1:2016:02:11:01:01:56:838|IPTV   |40   |EnterIPTV   |null     |0.0            |
|B046FCAC0DC1|B046FCAC0DC1:2016:02:11:01:02:29:258|VOD    |55   |NextVOD     |100052388|0.0            |
|B046FCAC0DC1|B046FCAC0DC1:2016:02:12:04:44:59:143|IPTV   |18   |ChangeModule|null     |0.0            |
|B046FCAC0DC1|B046FCAC0DC1:2016:02:12:12:35:13:437|VOD    |54   |PlayVOD     |100052388|0.0            |
|B046FCAC0DC1|B046FCAC0DC1:2016:02:12:04:44:59:143|IPTV   |40   |EnterIPTV   |null     |0.0            |
|B046FCAC0DC1|B046FCAC0DC1:2016:02:12:12:35:13:437|VOD 

---

- Handle data types

In [21]:
df_input.schema # tất cả cột đang là string -> cần chuyển về đúng type của nó

StructType([StructField('MAC', StringType(), True), StructField('SessionMainMenu', StringType(), True), StructField('AppName', StringType(), True), StructField('LogID', StringType(), True), StructField('Event', StringType(), True), StructField('ItemID', StringType(), True), StructField('RealTimePlaying', StringType(), True)])

In [84]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

df_input = df_input.withColumn("RealTimePlaying", col("RealTimePlaying").cast(FloatType()))

- Theo đề, SessionMainMenu là "thời điểm user bắt đầu sử dụng dịch vụ" nhưng trông chưa đúng format thời gian -> cần format lại theo timestamp

In [13]:
# test
df_input.withColumn("SessionStartTime", regexp_extract("SessionMainMenu", r":(.*)", 1)).show(truncate=False)

+------------+------------------------------------+-------+-----+------------+---------+---------------+-----------------------+
|MAC         |SessionMainMenu                     |AppName|LogID|Event       |ItemID   |RealTimePlaying|SessionStartTime       |
+------------+------------------------------------+-------+-----+------------+---------+---------------+-----------------------+
|B046FCAC0DC1|B046FCAC0DC1:2016:02:12:12:35:13:437|VOD    |52   |StopVOD     |100052388|570.3          |2016:02:12:12:35:13:437|
|B046FCAC0DC1|B046FCAC0DC1:2016:02:11:01:01:56:838|IPTV   |40   |EnterIPTV   |NULL     |NULL           |2016:02:11:01:01:56:838|
|B046FCAC0DC1|B046FCAC0DC1:2016:02:11:01:02:29:258|VOD    |55   |NextVOD     |100052388|NULL           |2016:02:11:01:02:29:258|
|B046FCAC0DC1|B046FCAC0DC1:2016:02:12:04:44:59:143|IPTV   |18   |ChangeModule|NULL     |NULL           |2016:02:12:04:44:59:143|
|B046FCAC0DC1|B046FCAC0DC1:2016:02:12:12:35:13:437|VOD    |54   |PlayVOD     |100052388|NULL     

In [85]:
# create SessionStartTime col from SessionMainMenu col
df_input = df_input.withColumn("SessionStartTime", 
            to_timestamp(
                regexp_extract("SessionMainMenu", r":(.*)", 1),
                'yyyy:MM:dd:HH:mm:ss:SSS'
            ))

In [27]:
df_input.printSchema()

root
 |-- MAC: string (nullable = true)
 |-- SessionMainMenu: string (nullable = true)
 |-- AppName: string (nullable = true)
 |-- LogID: string (nullable = true)
 |-- Event: string (nullable = true)
 |-- ItemID: string (nullable = true)
 |-- RealTimePlaying: float (nullable = true)
 |-- SessionStartTime: timestamp (nullable = true)



---

- Handle nulls

In [15]:
df_input.filter("RealTimePlaying is null").count()

618523

In [86]:
df_input = df_input.fillna(0, subset = ['RealTimePlaying'])

In [30]:
df_input.filter("SessionMainMenu is null").count()

28

---

In [60]:
# quan sát lần cuối
df_input.show(truncate=False)

+------------+------------------------------------+-------+-----+------------+---------+---------------+-----------------------+
|MAC         |SessionMainMenu                     |AppName|LogID|Event       |ItemID   |RealTimePlaying|SessionStartTime       |
+------------+------------------------------------+-------+-----+------------+---------+---------------+-----------------------+
|B046FCAC0DC1|B046FCAC0DC1:2016:02:12:12:35:13:437|VOD    |52   |StopVOD     |100052388|570.3          |2016-02-12 12:35:13.437|
|B046FCAC0DC1|B046FCAC0DC1:2016:02:11:01:01:56:838|IPTV   |40   |EnterIPTV   |null     |0.0            |2016-02-11 01:01:56.838|
|B046FCAC0DC1|B046FCAC0DC1:2016:02:11:01:02:29:258|VOD    |55   |NextVOD     |100052388|0.0            |2016-02-11 01:02:29.258|
|B046FCAC0DC1|B046FCAC0DC1:2016:02:12:04:44:59:143|IPTV   |18   |ChangeModule|null     |0.0            |2016-02-12 04:44:59.143|
|B046FCAC0DC1|B046FCAC0DC1:2016:02:12:12:35:13:437|VOD    |54   |PlayVOD     |100052388|0.0      

# 2. Analysis

In [77]:
# read user_info.txt 
df_user = spark.read \
    .option("header", "true").option("sep", "\t") \
    .csv("./DataSampleTest/user_info.txt")
    
df_user.show()

+----------------+---------+
|             MAC|# of days|
+----------------+---------+
|FBOXB046FCB79E0B|       20|
|FBOXB046FCB3528B|      181|
|FBOXB046FCAAFB73|      426|
|FBOXB046FCAAFB72|      426|
|FBOXB046FCAA2085|      429|
|FBOXB046FCAA0669|      380|
|FBOXB046FCB343BF|      376|
|FBOXB046FCAC0CFB|      376|
|FBOXB046FCABED45|      378|
|FBOXB046FCAD80FC|      305|
|FBOXB046FCB1E3FE|      255|
|FBOXB046FCB27666|      210|
|FBOXB046FCB42341|      142|
|FBOXB046FCB6D6B2|       46|
|FBOXB046FCB6D4BC|       46|
|FBOXB046FCB6D4B6|       46|
|FBOXB046FCA6A3F4|      583|
|FBOXB046FCA86BD5|      493|
|FBOXB046FCABE3BC|      425|
|FBOXB046FCAC125F|      374|
+----------------+---------+
only showing top 20 rows



In [78]:
# preprocess: remove 'FBOX' prefix 
from pyspark.sql.functions import *

df_user = df_user.withColumnRenamed("# of days", "no_days") \
    .withColumn("MAC", 
        when(col("MAC").startswith("FBOX"), regexp_replace(col("MAC"), "FBOX", ""))
        .otherwise(col("MAC"))
    )
    
df_user.show()

+------------+-------+
|         MAC|no_days|
+------------+-------+
|B046FCB79E0B|     20|
|B046FCB3528B|    181|
|B046FCAAFB73|    426|
|B046FCAAFB72|    426|
|B046FCAA2085|    429|
|B046FCAA0669|    380|
|B046FCB343BF|    376|
|B046FCAC0CFB|    376|
|B046FCABED45|    378|
|B046FCAD80FC|    305|
|B046FCB1E3FE|    255|
|B046FCB27666|    210|
|B046FCB42341|    142|
|B046FCB6D6B2|     46|
|B046FCB6D4BC|     46|
|B046FCB6D4B6|     46|
|B046FCA6A3F4|    583|
|B046FCA86BD5|    493|
|B046FCABE3BC|    425|
|B046FCAC125F|    374|
+------------+-------+
only showing top 20 rows



In [87]:
df_user_events = df_input.join(df_user, ["MAC"])
df_user_events.show(truncate=False)

+------------+------------------------------------+-------+-----+------------+---------+---------------+-----------------------+-------+
|MAC         |SessionMainMenu                     |AppName|LogID|Event       |ItemID   |RealTimePlaying|SessionStartTime       |no_days|
+------------+------------------------------------+-------+-----+------------+---------+---------------+-----------------------+-------+
|B046FCAC0DC1|B046FCAC0DC1:2016:02:12:12:35:13:437|VOD    |52   |StopVOD     |100052388|570.3          |2016-02-12 12:35:13.437|375    |
|B046FCAC0DC1|B046FCAC0DC1:2016:02:11:01:01:56:838|IPTV   |40   |EnterIPTV   |NULL     |0.0            |2016-02-11 01:01:56.838|375    |
|B046FCAC0DC1|B046FCAC0DC1:2016:02:11:01:02:29:258|VOD    |55   |NextVOD     |100052388|0.0            |2016-02-11 01:02:29.258|375    |
|B046FCAC0DC1|B046FCAC0DC1:2016:02:12:04:44:59:143|IPTV   |18   |ChangeModule|NULL     |0.0            |2016-02-12 04:44:59.143|375    |
|B046FCAC0DC1|B046FCAC0DC1:2016:02:12:12:

In [88]:
# run once: Save somewhere for later analysis
df_input.coalesce(1).write.csv("user_events.csv", header=True)

In [76]:
df_user_events = spark.read.csv("user_events.csv", header=True)
df_user_events.show()

+------------+--------------------+-------+-----+---------------+---------+---------------+--------------------+
|         MAC|     SessionMainMenu|AppName|LogID|          Event|   ItemID|RealTimePlaying|    SessionStartTime|
+------------+--------------------+-------+-----+---------------+---------+---------------+--------------------+
|B046FCB2DEF0|B046FCB2DEF0:2016...|   IPTV|   42|    StopChannel|      181|         55.437|2016-02-23T19:23:...|
|B046FCB2DEF0|B046FCB2DEF0:2016...|   IPTV|   41|   StartChannel|        3|            0.0|2016-02-23T19:23:...|
|B046FCB2DEF0|B046FCB2DEF0:2016...|   IPTV|  451|ExitChannelList|       11|         50.738|2016-02-23T08:47:...|
|B046FCB2DEF0|B046FCB2DEF0:2016...|   IPTV|   42|    StopChannel|      526|          1.258|2016-02-23T08:47:...|
|B046FCB2DEF0|B046FCB2DEF0:2016...|   IPTV|   41|   StartChannel|      184|            0.0|2016-02-23T08:47:...|
|B046FCB2DEF0|B046FCB2DEF0:2016...|   IPTV|   41|   StartChannel|      175|            0.0|2016-

In [89]:
# LOCKED
df_user_events.createOrReplaceTempView('user_events')

#### **Quan sát cơ bản**

In [None]:
spark.sql("""
    select Event, RealTimePlaying, SessionStartTime, no_days, AppName, ItemID
    from user_events
    where MAC = 'B046FCAC0DC1' and AppName = 'VOD' --and RealTimePlaying != 0
    order by SessionStartTime 
""").show(truncate=False)

# standby 

+--------------+---------------+-----------------------+-------+-------+---------+
|Event         |RealTimePlaying|SessionStartTime       |no_days|AppName|ItemID   |
+--------------+---------------+-----------------------+-------+-------+---------+
|Standby       |0.0            |2016-02-11 01:02:29.258|375    |VOD    |NULL     |
|PlayVOD       |0.0            |2016-02-11 01:02:29.258|375    |VOD    |100052388|
|StartVOD      |0.0            |2016-02-11 01:02:29.258|375    |VOD    |100052388|
|PlayVOD       |0.0            |2016-02-11 01:02:29.258|375    |VOD    |100052388|
|NextVOD       |0.0            |2016-02-11 01:02:29.258|375    |VOD    |100052388|
|EnterDetailVOD|0.0            |2016-02-11 01:02:29.258|375    |VOD    |100052388|
|PauseVOD      |0.0            |2016-02-11 01:02:29.258|375    |VOD    |100052388|
|PlayVOD       |0.0            |2016-02-11 01:02:29.258|375    |VOD    |100052388|
|StopVOD       |966.6          |2016-02-11 01:02:29.258|375    |VOD    |100052388|
|Sto

In [33]:
# app IPTV có những event nào
spark.sql("""
    select distinct(Event)
    from user_events
    where AppName = 'IPTV'
""").show(100, truncate=False)

+--------------------+
|Event               |
+--------------------+
|ChangeModule        |
|ExitPIP             |
|PiPSwitch           |
|StartTimeshift      |
|ShowChannelList     |
|EnterIPTV           |
|ExitChannelList     |
|StopChannel         |
|SetAlarm            |
|StopTimeshift       |
|Standby             |
|Show schedule       |
|ShowEPG             |
|ExitEPG             |
|GetDRMKeySuccessful |
|GetDRMKeyFailed     |
|StartChannel        |
|ExitSchedule        |
|Turn on from Stanby |
|FavouriteChannelList|
|RemoveFavorite      |
|SearchBS            |
+--------------------+



In [34]:
# app VOD có những event nào
spark.sql("""
    select distinct(Event)
    from user_events
    where AppName = 'VOD'
""").show(100, truncate=False)

+-------------------+
|Event              |
+-------------------+
|StartVOD           |
|ChangeModule       |
|EnterSearchVOD     |
|PlayVOD            |
|NextVOD            |
|EnterDetailVOD     |
|RemoveFavorite     |
|StopVOD            |
|EnterFolderVOD     |
|Standby            |
|InsertFavorite     |
|EnterVOD           |
|SearchVOD          |
|PreviousVOD        |
|PauseVOD           |
|Turn on from Stanby|
+-------------------+



In [35]:
spark.sql("""
    (select *
    from user_events
    where Event = 'StartChannel'
    limit 10)
    union all
    (select *
    from user_events
    where Event = 'PlayVOD'
    limit 10)
    
""").show(100, truncate=False)

+------------+------------------------------------+-------+-----+------------+---------+---------------+-----------------------+-------+
|MAC         |SessionMainMenu                     |AppName|LogID|Event       |ItemID   |RealTimePlaying|SessionStartTime       |no_days|
+------------+------------------------------------+-------+-----+------------+---------+---------------+-----------------------+-------+
|B046FCAC0DC1|B046FCAC0DC1:2016:02:17:03:00:34:105|IPTV   |41   |StartChannel|125      |0.0            |2016-02-17 03:00:34.105|375    |
|B046FCAC0DC1|B046FCAC0DC1:2016:02:21:22:11:17:200|IPTV   |41   |StartChannel|181      |0.0            |2016-02-21 22:11:17.2  |375    |
|B046FCAC0DC1|B046FCAC0DC1:2016:02:21:22:22:08:526|IPTV   |41   |StartChannel|16       |0.0            |2016-02-21 22:22:08.526|375    |
|B046FCAC0DC1|B046FCAC0DC1:2016:02:26:03:49:11:766|IPTV   |41   |StartChannel|3        |0.0            |2016-02-26 03:49:11.766|375    |
|B046FCAC0DC1|B046FCAC0DC1:2016:02:12:12:

=> Từ 2 cell trên, đoán rằng event 'StartChannel' & 'PlayVOD' là event thực sự xem chương trình gì đó

---

Rule:
- RealTimePlaying chỉ ghi nhận thời lượng xem từ lúc bắt đầu tới kết thúc session. Nên chỉ khác 0 khi rơi vào event 'Stop...' 
- Đoán nghĩa một số event: 
  - Enter{AppName} (chọn appname) -> Standby (bật để đó, chưa coi gì) -> Start (khởi động app) -> Play -> Next
  - ChangeModule (chuyển kênh)
  - PiPSwitch: picture in picture

In [None]:
spark.sql("""
    select Event, RealTimePlaying, SessionStartTime, no_days, AppName, ItemID
    from user_events
    where MAC = 'B046FCAC0DC1' and AppName = 'VOD' --and RealTimePlaying != 0
    order by SessionStartTime 
""").show(truncate=False)

# standby 

+--------------+---------------+-----------------------+-------+-------+---------+
|Event         |RealTimePlaying|SessionStartTime       |no_days|AppName|ItemID   |
+--------------+---------------+-----------------------+-------+-------+---------+
|Standby       |0.0            |2016-02-11 01:02:29.258|375    |VOD    |NULL     |
|PlayVOD       |0.0            |2016-02-11 01:02:29.258|375    |VOD    |100052388|
|StartVOD      |0.0            |2016-02-11 01:02:29.258|375    |VOD    |100052388|
|PlayVOD       |0.0            |2016-02-11 01:02:29.258|375    |VOD    |100052388|
|NextVOD       |0.0            |2016-02-11 01:02:29.258|375    |VOD    |100052388|
|EnterDetailVOD|0.0            |2016-02-11 01:02:29.258|375    |VOD    |100052388|
|PauseVOD      |0.0            |2016-02-11 01:02:29.258|375    |VOD    |100052388|
|PlayVOD       |0.0            |2016-02-11 01:02:29.258|375    |VOD    |100052388|
|StopVOD       |966.6          |2016-02-11 01:02:29.258|375    |VOD    |100052388|
|Sto

---

In [29]:
# quan sát timeframe của data
spark.sql("""
    select min(SessionStartTime) as min, max(SessionStartTime) as max
    from user_events        
""").show(truncate=False) # khoảng 2 năm

# số records mỗi năm
spark.sql("""
    select year(SessionStartTime) as year, count(1) as no_records
    from user_events
    group by year(SessionStartTime)        
""").show(truncate=False)

+-----------------------+----------------------+
|min                    |max                   |
+-----------------------+----------------------+
|2015-08-11 00:04:13.108|2017-02-03 10:19:19.11|
+-----------------------+----------------------+

+----+----------+
|year|no_records|
+----+----------+
|2015|2248      |
|2016|1026513   |
|NULL|28        |
|2017|2         |
+----+----------+



=> Năm 2016 có nhiều records nhất, đáng để phân tích

#### **User thường coi phim trên app nào**

In [None]:
# check distinct app names
spark.sql("select distinct(AppName) as appname from user_events").show() 

+-------+
|appname|
+-------+
|   IPTV|
|    VOD|
+-------+



In [None]:
df_app_usage = spark.sql("""
    select AppName, count(distinct(MAC)) as no_users
    from user_events
    group by AppName
""")

df_app_usage.show()

+-------+--------+
|AppName|no_users|
+-------+--------+
|   IPTV|    1693|
|    VOD|    1299|
+-------+--------+



=> IPTV nhiều hơn

- Follow-up: Hành vi nào dc thực hiện nhiều nhất bởi user theo từng App

In [75]:
spark.sql("""
    select *
    from (
        select AppName, Event, count(1) as event_count,
            row_number() over(partition by AppName order by count(1) desc) as seq
        from user_events
        group by AppName, Event      
        order by AppName, event_count desc    
    ) t
    where seq = 1
""").show()

+-------+--------------+-----------+---+
|AppName|         Event|event_count|seq|
+-------+--------------+-----------+---+
|   IPTV|  StartChannel|     269942|  1|
|    VOD|EnterFolderVOD|      44359|  1|
+-------+--------------+-----------+---+



- Follow up: với mỗi loại app, user session thường kéo dài bao lâu?

In [24]:
spark.sql("""
    select AppName, MAC, round(avg(RealTimePlaying), 2) as avg_session_time, count(1) as session_count
    from user_events
    where RealTimePlaying != 0
    group by AppName, MAC      
    order by AppName, avg_session_time desc    
""").show()

+-------+------------+----------------+-------------+
|AppName|         MAC|avg_session_time|session_count|
+-------+------------+----------------+-------------+
|   IPTV|B046FCB209E8|        972361.0|            6|
|   IPTV|B046FCAA1C30|       679540.81|            1|
|   IPTV|B046FCAD1490|       365302.82|            2|
|   IPTV|B046FCA7BCB5|       297794.48|           12|
|   IPTV|B046FCAD8739|       269270.79|            6|
|   IPTV|B046FCAD1EEA|       205780.11|          152|
|   IPTV|B046FCB28BCA|       186265.67|           10|
|   IPTV|B046FCB2CB2B|       117000.34|            8|
|   IPTV|B046FCAE382D|        83202.57|          212|
|   IPTV|B046FCA988C7|        70766.88|           10|
|   IPTV|B046FCB889C2|         54506.5|           29|
|   IPTV|B046FCAC1AF2|        37990.41|           19|
|   IPTV|B046FCAEDDC2|        36482.12|           33|
|   IPTV|B046FCB6B37D|        36306.78|          431|
|   IPTV|B046FCB77EA4|        32561.46|            9|
|   IPTV|B046FCABE1C2|      

In [25]:
# lấy TB cho mỗi appname
spark.sql("""
    select AppName, round(avg(RealTimePlaying), 2) as avg_session_time, count(1) as session_count
    from user_events
    where RealTimePlaying != 0
    group by AppName   
    order by AppName 
""").show()

+-------+----------------+-------------+
|AppName|avg_session_time|session_count|
+-------+----------------+-------------+
|   IPTV|         1271.01|       282950|
|    VOD|         1215.21|        43021|
+-------+----------------+-------------+



#### **Xét riêng năm 2016, thống kê lượng truy cập app theo tháng**

In [30]:
spark.sql("""
    select month(SessionStartTime) as month_2016, 
        count(distinct MAC) as no_distinct_users,
        count(distinct ItemID) as no_distinct_programs
    from user_events
    where Event like 'Enter%' and year(SessionStartTime) = 2016
    group by month(SessionStartTime)
    order by no_distinct_users desc 
""").show()

+----------+-----------------+--------------------+
|month_2016|no_distinct_users|no_distinct_programs|
+----------+-----------------+--------------------+
|         2|             1612|                3839|
|         3|              513|                1299|
|         8|               41|                 116|
|         9|               32|                  30|
|        10|                9|                   1|
|         7|                5|                   9|
+----------+-----------------+--------------------+



=> User active nhất vào tháng 2 & 3, chắc là do Tết nên nghỉ nhiều, nhu cầu giải trí nhiều \
=> Cũng 2 tháng này có user coi nhiều loại chương trình nhất

#### **Mỗi user có bao nhiêu session xem phim mỗi tháng & trung bình tháng đó có bao nhiêu lượt xem**

- Run either this cell or next cell

In [27]:
df_active_session_by_user = spark.read.csv("active_session_by_user.csv") \
    .withColumnRenamed("_c0", "MAC") \
    .withColumnRenamed("_c1", "session_start_month") \
    .withColumnRenamed("_c2", "no_active_session") 
    
df_active_session_by_user.show()

+------------+-------------------+-----------------+
|         MAC|session_start_month|no_active_session|
+------------+-------------------+-----------------+
|B046FCA69EA6|            2016-02|               91|
|B046FCA6A074|            2016-02|              206|
|B046FCA6A27E|            2016-02|              131|
|B046FCA6A450|            2016-02|              162|
|B046FCA6A47D|            2016-02|               14|
|B046FCA6A4F6|            2016-02|              149|
|B046FCA6A58E|            2016-02|              257|
|B046FCA6A61F|            2016-02|               90|
|B046FCA6A675|            2016-02|              126|
|B046FCA6A684|            2016-02|              450|
|B046FCA6A684|            2016-03|              186|
|B046FCA6A6F0|            2016-02|                1|
|B046FCA6A7D7|            2016-02|              286|
|B046FCA6A800|            2016-02|              198|
|B046FCA6A800|            2016-03|               26|
|B046FCA6A8AE|            2015-08|            

In [33]:
df_active_session_by_user = spark.sql("""
    select MAC, date_format(SessionStartTime, 'yyyy-MM') as session_start_month,
        count(1) as no_active_session
    from (
        select *
        from user_events
        where Event = 'StartVOD' or Event = 'StartChannel'
    ) t
    group by MAC, session_start_month
    order by MAC, session_start_month, no_active_session desc
""")

df_active_session_by_user.show()

+------------+-------------------+-----------------+
|         MAC|session_start_month|no_active_session|
+------------+-------------------+-----------------+
|B046FCA69EA6|            2016-02|               91|
|B046FCA6A074|            2016-02|              206|
|B046FCA6A27E|            2016-02|              131|
|B046FCA6A450|            2016-02|              162|
|B046FCA6A47D|            2016-02|               14|
|B046FCA6A4F6|            2016-02|              149|
|B046FCA6A58E|            2016-02|              257|
|B046FCA6A61F|            2016-02|               90|
|B046FCA6A675|            2016-02|              126|
|B046FCA6A684|            2016-02|              225|
|B046FCA6A684|            2016-03|               93|
|B046FCA6A6F0|            2016-02|                1|
|B046FCA6A7D7|            2016-02|              286|
|B046FCA6A800|            2016-02|              198|
|B046FCA6A800|            2016-03|               26|
|B046FCA6A8AE|            2015-08|            

In [34]:
# run once: save for later analysis
df_active_session_by_user.coalesce(1).write.csv("active_session_by_user.csv", header=True)

In [35]:
df_active_session_by_user.createOrReplaceTempView("active_session_by_user")

In [36]:
percentiles_df = spark.sql("""
    SELECT 
        session_start_month,
        percentile_approx(no_active_session, 0.25) AS low_threshold,
        percentile_approx(no_active_session, 0.5) AS med_threshold,
        percentile_approx(no_active_session, 0.75) AS high_threshold
    FROM active_session_by_user
    GROUP BY session_start_month
    ORDER BY session_start_month
""")

percentiles_df.show()

+-------------------+-------------+-------------+--------------+
|session_start_month|low_threshold|med_threshold|high_threshold|
+-------------------+-------------+-------------+--------------+
|               NULL|            1|            1|             1|
|            2015-08|            1|            3|             8|
|            2016-02|           28|          114|           237|
|            2016-03|            8|           29|            67|
|            2016-07|            3|            3|            14|
|            2016-08|            1|            5|            22|
|            2016-09|            1|            2|             9|
|            2016-10|            1|            2|             5|
+-------------------+-------------+-------------+--------------+



In [37]:
df_user_label = percentiles_df.join(df_active_session_by_user, ['session_start_month']) \
    .withColumn('label', 
        when((col('no_active_session') >= col('low_threshold')) & (col('no_active_session') < col('med_threshold')), 'low') \
        .when((col('no_active_session') >= col('med_threshold')) & (col('no_active_session') < col('high_threshold')), 'medium') \
        .otherwise('high')
    )
    
df_user_label.show()

+-------------------+-------------+-------------+--------------+------------+-----------------+------+
|session_start_month|low_threshold|med_threshold|high_threshold|         MAC|no_active_session| label|
+-------------------+-------------+-------------+--------------+------------+-----------------+------+
|            2016-02|           28|          114|           237|B046FCB5E9FD|              147|medium|
|            2016-02|           28|          114|           237|B046FCB25A46|              238|  high|
|            2016-02|           28|          114|           237|B046FCADBF9A|              192|medium|
|            2016-02|           28|          114|           237|B046FCA7C243|              368|  high|
|            2016-02|           28|          114|           237|B046FCB88C02|              134|medium|
|            2016-02|           28|          114|           237|B046FCADC4AF|               10|  high|
|            2016-02|           28|          114|           237|B046FCADC

In [38]:
df_user_label.groupBy('label').count().show()

+------+-----+
| label|count|
+------+-----+
|   low|  590|
|  high| 1104|
|medium|  575|
+------+-----+



In [45]:
df_user_label.createOrReplaceTempView('user_label')

In [46]:
# lọc HIGH & MEDIUM- users ra khỏi LOW-users
df_low_active_user = df_user_label.where("MAC not in (select MAC from user_label where label = 'high' or label = 'medium')")
df_low_active_user.show()

+-------------------+-------------+-------------+--------------+------------+-----------------+-----+
|session_start_month|low_threshold|med_threshold|high_threshold|         MAC|no_active_session|label|
+-------------------+-------------+-------------+--------------+------------+-----------------+-----+
|            2016-02|           28|          114|           237|B046FCACDF75|               90|  low|
|            2016-02|           28|          114|           237|B046FCAF1699|               84|  low|
|            2016-02|           28|          114|           237|B046FCAC8131|               46|  low|
|            2016-02|           28|          114|           237|B046FCAEEDEA|               77|  low|
|            2016-02|           28|          114|           237|B046FCACB8EF|               64|  low|
|            2016-02|           28|          114|           237|B046FCA98FF2|               49|  low|
|            2016-02|           28|          114|           237|B046FCB795EF|     

In [47]:
# check sau khi loại đi thì low user từ 590 còn 371 người thuần low
df_low_active_user.count()

371

In [62]:
# do ngày tháng
df_low_active_user = df_user_label \
    .where("MAC not in (select MAC from user_label where label = 'high' or label = 'medium')") \
    .groupBy(col("MAC")) \
    .agg(avg(col("no_active_session")).alias("avg_active_session_per_month")) \
    .withColumn("active_session_label", lit("low")) \
    .orderBy(col("MAC")) 
    
df_low_active_user.show()

+------------+----------------------------+--------------------+
|         MAC|avg_active_session_per_month|active_session_label|
+------------+----------------------------+--------------------+
|B046FCA69EA6|                        91.0|                 low|
|B046FCA6A61F|                        90.0|                 low|
|B046FCA6B12C|                        50.0|                 low|
|B046FCA7BAE6|                        35.0|                 low|
|B046FCA7BD3A|                        40.0|                 low|
|B046FCA7C34C|                        77.0|                 low|
|B046FCA7C574|                        50.0|                 low|
|B046FCA7C823|                        35.5|                 low|
|B046FCA7CA69|                        37.0|                 low|
|B046FCA85F41|                        44.0|                 low|
|B046FCA85F7E|                        39.0|                 low|
|B046FCA85F8F|                        74.0|                 low|
|B046FCA86066|           

In [63]:
df_low_active_user.count()

320

In [64]:
df_low_active_user \
    .coalesce(1) \
    .write.csv("churn_user/low_active_user.csv", header=True)

#### **Các lần xem phim giãn bao nhiêu ngày**


User trở lại xem phim khi nào kể từ lần cuối xem phim

In [106]:
# Quan sát bảng user_events
spark.sql("select * from user_events").show()

+------------+--------------------+-------+-----+------------+---------+---------------+--------------------+-------+
|         MAC|     SessionMainMenu|AppName|LogID|       Event|   ItemID|RealTimePlaying|    SessionStartTime|no_days|
+------------+--------------------+-------+-----+------------+---------+---------------+--------------------+-------+
|B046FCAC0DC1|B046FCAC0DC1:2016...|    VOD|   52|     StopVOD|100052388|          570.3|2016-02-12 12:35:...|    375|
|B046FCAC0DC1|B046FCAC0DC1:2016...|   IPTV|   40|   EnterIPTV|     NULL|            0.0|2016-02-11 01:01:...|    375|
|B046FCAC0DC1|B046FCAC0DC1:2016...|    VOD|   55|     NextVOD|100052388|            0.0|2016-02-11 01:02:...|    375|
|B046FCAC0DC1|B046FCAC0DC1:2016...|   IPTV|   18|ChangeModule|     NULL|            0.0|2016-02-12 04:44:...|    375|
|B046FCAC0DC1|B046FCAC0DC1:2016...|    VOD|   54|     PlayVOD|100052388|            0.0|2016-02-12 12:35:...|    375|
|B046FCAC0DC1|B046FCAC0DC1:2016...|   IPTV|   40|   Ente

In [20]:
# tính day gap giữa những lần sdụng app của mỗi user 
df_recency = spark.sql("""
    select MAC, session_start_date, AppName,
        lag(session_start_date) over (partition by MAC, AppName order by session_start_date) as prev_session,
        date_diff(
            session_start_date, 
            lag(session_start_date) over (partition by MAC, AppName order by session_start_date)
        ) as day_diff
    from ( 
        select MAC, SessionStartTime, date_format(SessionStartTime, 'yyyy-MM-dd') as session_start_date, AppName
        from user_events
        --where Event = 'StartVOD' or Event = 'StartChannel'
        --group by MAC, SessionStartTime, Event --make each day distinct
    ) t
    group by MAC, AppName, session_start_date
    order by MAC, session_start_date
""")

df_recency.show()

+------------+------------------+-------+------------+--------+
|         MAC|session_start_date|AppName|prev_session|day_diff|
+------------+------------------+-------+------------+--------+
|B046FCA69EA6|        2016-02-05|   IPTV|        NULL|    NULL|
|B046FCA69EA6|        2016-02-10|   IPTV|  2016-02-05|       5|
|B046FCA69EA6|        2016-02-12|   IPTV|  2016-02-10|       2|
|B046FCA69EA6|        2016-02-13|   IPTV|  2016-02-12|       1|
|B046FCA69EA6|        2016-02-14|   IPTV|  2016-02-13|       1|
|B046FCA69EA6|        2016-02-17|   IPTV|  2016-02-14|       3|
|B046FCA69EA6|        2016-02-21|   IPTV|  2016-02-17|       4|
|B046FCA69EA6|        2016-02-22|   IPTV|  2016-02-21|       1|
|B046FCA6A074|        2016-02-04|   IPTV|        NULL|    NULL|
|B046FCA6A074|        2016-02-05|   IPTV|  2016-02-04|       1|
|B046FCA6A074|        2016-02-13|   IPTV|  2016-02-05|       8|
|B046FCA6A074|        2016-02-14|   IPTV|  2016-02-13|       1|
|B046FCA6A074|        2016-02-15|   IPTV

In [21]:
df_recency.createOrReplaceTempView("user_recency")

- Để lấy độ giãn ngày thường xuyên, nghĩa là cứ thường xuyên bao lâu user mới coi phim, ta dùng median. Theo ChatGPT: *"The median is often used as a robust measure of central tendency, particularly when the data contains outliers or extreme values that could skew the mean. **It's also commonly used to describe the "typical" value of a dataset**"*

In [22]:
df_day_diff_by_user = spark.sql("""
    select MAC, median_day_diff,
        case 
            when median_day_diff <= 7 then 'week' 
            when median_day_diff <= 14 then 'half_month'
            when median_day_diff <= 30 then 'month'
            when median_day_diff <= 30 * 3 then 'quarter'
            when median_day_diff <= 30 * 6 then 'half_year'
            when median_day_diff <= 30 * 9 then 'third_quarter'
            when median_day_diff <= 365 then 'one_year'
            else 'long_ago'
        end as typical_day_diff_label
    from (
        select MAC, percentile_approx(day_diff, 0.5) AS median_day_diff
        from user_recency
        where day_diff is not null
        group by MAC
    ) t
    order by median_day_diff desc
""")

df_day_diff_by_user.show()

+------------+---------------+----------------------+
|         MAC|median_day_diff|typical_day_diff_label|
+------------+---------------+----------------------+
|B046FCAF1E7E|            216|         third_quarter|
|B046FCB2C70F|             24|                 month|
|B046FCB79EFD|             24|                 month|
|B046FCA86246|             23|                 month|
|B046FCB16317|             20|                 month|
|B046FCB8104B|             20|                 month|
|B046FCA6B1C2|             19|                 month|
|B046FCB4EF62|             19|                 month|
|B046FCB79E0B|             19|                 month|
|B046FCB53D5F|             17|                 month|
|B046FCB5E995|             17|                 month|
|B046FCB73448|             15|                 month|
|B046FCAA2175|             14|            half_month|
|B046FCB424AA|             14|            half_month|
|B046FCB5881B|             13|            half_month|
|B046FCB7124F|             1

In [23]:
df_day_diff_by_user.groupBy("typical_day_diff_label") \
    .count() \
    .show()

+----------------------+-----+
|typical_day_diff_label|count|
+----------------------+-----+
|            half_month|   27|
|                 month|   11|
|                  week| 1498|
|         third_quarter|    1|
+----------------------+-----+



=> Tần suất thấp nhất user sdụng lại app sau 1 khoảng thời gian là khoảng 6-9 tháng (có 1 user), kế đến là khoảng từ 3-6 tháng (11 user). Những user rơi vào 2 category này có xác suất rời dịch vụ cao hơn những người còn lại do ko thường xuyên sdụng app 


In [25]:
df_infrequent_usage = df_day_diff_by_user.where("typical_day_diff_label = 'third_quarter' or typical_day_diff_label = 'month'") \
    .select("MAC", "median_day_diff") 
    
df_infrequent_usage.show()

+------------+
|         MAC|
+------------+
|B046FCAF1E7E|
|B046FCB2C70F|
|B046FCB79EFD|
|B046FCA86246|
|B046FCB16317|
|B046FCB8104B|
|B046FCA6B1C2|
|B046FCB4EF62|
|B046FCB79E0B|
|B046FCB53D5F|
|B046FCB5E995|
|B046FCB73448|
+------------+



In [26]:
df_infrequent_usage.coalesce(1).write.csv("churn_user/user_infrequent_usage.csv", header=True)

#### **Tính từ ngày mới nhất trong dữ liệu, đã bao nhiêu ngày user chưa sdụng app**

In [75]:
# Quan sát bảng user_events
spark.sql("select * from user_events").show()

+------------+--------------------+-------+-----+---------------+---------+---------------+--------------------+
|         MAC|     SessionMainMenu|AppName|LogID|          Event|   ItemID|RealTimePlaying|    SessionStartTime|
+------------+--------------------+-------+-----+---------------+---------+---------------+--------------------+
|B046FCB2DEF0|B046FCB2DEF0:2016...|   IPTV|   42|    StopChannel|      181|         55.437|2016-02-23T19:23:...|
|B046FCB2DEF0|B046FCB2DEF0:2016...|   IPTV|   41|   StartChannel|        3|            0.0|2016-02-23T19:23:...|
|B046FCB2DEF0|B046FCB2DEF0:2016...|   IPTV|  451|ExitChannelList|       11|         50.738|2016-02-23T08:47:...|
|B046FCB2DEF0|B046FCB2DEF0:2016...|   IPTV|   42|    StopChannel|      526|          1.258|2016-02-23T08:47:...|
|B046FCB2DEF0|B046FCB2DEF0:2016...|   IPTV|   41|   StartChannel|      184|            0.0|2016-02-23T08:47:...|
|B046FCB2DEF0|B046FCB2DEF0:2016...|   IPTV|   41|   StartChannel|      175|            0.0|2016-

In [93]:
df_idle_day = spark.sql("""
    select *, case 
        when idle_days <= 7 then 'week' 
        when idle_days <= 14 then 'half_month'
        when idle_days <= 30 then 'month'
        when idle_days <= 30 * 3 then 'quarter'
        when idle_days <= 30 * 6 then 'half_year'
        when idle_days <= 30 * 9 then 'third_quarter'
        when idle_days <= 365 then 'one_year'
        else 'long_ago'
    end as idle_days_label
    from (
        select MAC, date_diff(max_date, session_start_date) as idle_days
        from (
            select MAC, date_format(SessionStartTime, 'yyyy-MM-dd') as session_start_date,
                row_number() over (partition by MAC order by SessionStartTime desc) as seq,
                date_format(max(SessionStartTime) over(), 'yyyy-MM-dd') as max_date
            from user_events
        ) t
        where seq = 1
        order by 2 desc
    )
""")

df_idle_day.show()

+------------+---------+---------------+
|         MAC|idle_days|idle_days_label|
+------------+---------+---------------+
|B046FCAC7D75|      368|       long_ago|
|B046FCA97F7E|      366|       long_ago|
|B046FCB2927A|      366|       long_ago|
|B046FCB30992|      366|       long_ago|
|B046FCB36175|      366|       long_ago|
|B046FCB52912|      366|       long_ago|
|B046FCB7A227|      366|       long_ago|
|B046FCAD9B70|      365|       one_year|
|B046FCAE2B23|      365|       one_year|
|B046FCACC5D9|      365|       one_year|
|B046FCB1B553|      365|       one_year|
|B046FCAA15D7|      365|       one_year|
|B046FCB1BB23|      365|       one_year|
|B046FCAB09D8|      365|       one_year|
|B046FCB233C4|      365|       one_year|
|B046FCB2DD4E|      365|       one_year|
|B046FCAA1A43|      365|       one_year|
|B046FCB7A479|      365|       one_year|
|B046FCA980B7|      365|       one_year|
|B046FCAD869B|      365|       one_year|
+------------+---------+---------------+
only showing top

In [95]:
df_long_idle_day = df_idle_day.where("idle_days_label in ('long_ago', 'one_year', 'third_quarter', 'half_year')")
df_long_idle_day.count()

1692

In [96]:
df_long_idle_day.coalesce(1).write.csv("churn_user/long_idle_days.csv", header=True)

#### Free temp views


In [None]:
spark.catalog.dropTempView("active_session_by_user")

In [None]:
spark.catalog.dropTempView("user_label")

In [64]:
# LOCKED
spark.catalog.dropTempView("user_events")

True

# 3. Tỷ lệ khách hàng rời bỏ dịch vụ

In [3]:
df_long_idle_days = spark.read.csv("churn_user/long_idle_days.csv", header=True)
df_low_active_user = spark.read.csv("churn_user/low_active_user.csv", header=True)
df_user_infrequent_usage = spark.read.csv("churn_user/user_infrequent_usage.csv", header=True)

In [7]:
print('df_long_idle_days count:', df_long_idle_days.count())
print('df_low_active_user count:', df_low_active_user.count())
print('df_user_infrequent_usage count:', df_user_infrequent_usage.count())

df_long_idle_days count: 1692
df_low_active_user count: 320
df_user_infrequent_usage count: 12


- Phân user vào 1 t trong 3 nhóm rời bỏ: HIGH (khả năng cao rời bỏ), MODERATE (khả năng rời bỏ mức vừa) & LOW (khả năng rời bỏ mức thấp) dựa vào số giá trị null của 3 cột 'idle_days_label', 'avg_active_session_per_month', 'active_session_label'. Nếu join cả 3 dataframe & một user nào đó đều có gtrị khác null ở cả 3 field này chứng tỏ user này có khả năng cao rời bỏ (HIGH), tương tự cho 2 gtrị khá null (MODERATE) & 1 gtrị khác null (LOW)

In [15]:
from pyspark.sql.functions import *

def count_non_null_values(idle_days_label, avg_active_session_per_month, active_session_label):
    list_values = [idle_days_label, avg_active_session_per_month, active_session_label]
    list_nulls = [v for v in list_values if v is not None]
    
    return len(list_nulls)


udf_count_non_null_values = udf(count_non_null_values, IntegerType())

df_churn_user = df_long_idle_days.join(df_low_active_user, ['MAC'], 'outer') \
    .join(df_user_infrequent_usage, ['MAC'], 'outer') \
    .select('MAC', 'idle_days_label', 'avg_active_session_per_month', 'active_session_label') \
    .withColumn('churn_rank', udf_count_non_null_values(col('idle_days_label'), col('avg_active_session_per_month'), col('active_session_label'))) \
    .orderBy(col('churn_rank').desc()) \
    .withColumn('churn_possibility', 
        when(col('churn_rank') == 3, 'HIGH')
        .when(col('churn_rank') == 2, 'MODERATE')
        .otherwise('LOW')
    )

df_churn_user.show()

+------------+---------------+----------------------------+--------------------+----------+-----------------+
|         MAC|idle_days_label|avg_active_session_per_month|active_session_label|churn_rank|churn_possibility|
+------------+---------------+----------------------------+--------------------+----------+-----------------+
|B046FCA6B12C|       one_year|                        50.0|                 low|         3|             HIGH|
|B046FCA86AE2|       one_year|                        93.0|                 low|         3|             HIGH|
|B046FCA7BAE6|      half_year|                        35.0|                 low|         3|             HIGH|
|B046FCA86066|       one_year|                        51.0|                 low|         3|             HIGH|
|B046FCA86A56|       one_year|                        38.0|                 low|         3|             HIGH|
|B046FCA860BC|       one_year|                        51.0|                 low|         3|             HIGH|
|B046FCA69

In [20]:
df_churn_user.where('idle_days_label is null').count()

0

In [17]:
# thống kê 
df_churn_user.groupBy('churn_possibility').count().show()

+-----------------+-----+
|churn_possibility|count|
+-----------------+-----+
|             HIGH|  320|
|              LOW| 1372|
+-----------------+-----+



=> chỉ có nhóm HIGH & LOW. Lọc ra nhóm HIGH để lấy ra những người khả năng cao rời bỏ dịch vụ

In [None]:
spark.stop()