In [None]:
#!pip install pyspark



## Import Library

In [2]:
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import os
import pandas as pd

Start a new Spark Session

In [3]:
spark = SparkSession.builder.getOrCreate()

## Read Data

The data is stored in daily files. Let's read those in and concatenate them in a single data frame
- Firstly, set the parent path where all subfolders containing files are stored.
- Then loop through each subfolder inside the parent directory
- Check if the is the valid directory
- Read each `.parquet` file into a Spark Dataframe
- Add a new column "Month" extracted from the `datetime` column

In [None]:
parent_path = "/content/drive/MyDrive/Colab Notebooks/data"
folder_names = sorted(os.listdir(parent_path))
all_dfs = []

for folder in folder_names:
    full_path = os.path.join(parent_path, folder)
    if os.path.isdir(full_path):
        parquet_files = [f for f in os.listdir(full_path) if f.endswith(".parquet")]

        for parquet_file in parquet_files:
            file_path = os.path.join(full_path, parquet_file)
            df_temp = spark.read.parquet(file_path)
            df_temp = df_temp.withColumn("Month", date_format(col("datetime"), "MM"))
            all_dfs.append(df_temp)

Merge all DataFrames from different days/months into a single DataFrame using `unionByName`. We will need to investigate both `user_id` and `keyword` columns so we need to make sure that there is not any missing value. We'll also need to include only the rows where `action` is `search`

In [5]:
df = all_dfs[0]
for df_next in all_dfs[1:]:
    df = df.unionByName(df_next)
    df = df.filter((col('action') == 'search') & (col('user_id').isNotNull()) & (col('keyword').isNotNull()))

## Inspecting and Transforming Log Search Data

Let's visually inspect the beginning of the dataset

In [6]:
df.show(10)

+--------------------+--------------------+--------+--------------------+--------+---------+--------------------+-----------+------+--------------------+-----+
|             eventID|            datetime| user_id|             keyword|category|proxy_isp|            platform|networkType|action|        userPlansMap|Month|
+--------------------+--------------------+--------+--------------------+--------+---------+--------------------+-----------+------+--------------------+-----+
|22c35287-9fe1-487...|2022-06-01 18:59:...|44887906|            trữ tình|   enter|     vnpt|   fplay-ottbox-2019|   ethernet|search|                  []|   06|
|f9af5a95-1f72-486...|2022-06-01 18:59:...| 2719170|              bolero|   enter|  viettel|   fplay-ottbox-2019|   ethernet|search|[Kênh Gia Đình:pr...|   06|
|d51e6e6c-2765-4a8...|2022-06-01 19:00:...| 8830996|cậu mang à sĩ hanako|   enter|     vnpt|smarttv-sony-android|       wifi|search|[Kênh Gia Đình:pr...|   06|
|3948ea18-8c86-451...|2022-06-01 19:00:.

We need to identify the most frequently searched keyword for each `user_id` during a specified month
- Filters the DataFrame to include only rows from the specified month
- Groups the data by `user_id`, `keyword`, and `Month` to count how many times each keyword was searched by each user during that month
- Defines a window function that partitions the data by `user_id` and orders rows within each user by `Total_search` in descending order
- Assigns a rank to each keyword per user based on the number of times it was searched and filters to keep only the top-ranked keyword per user

In [7]:
def most_search(df, month):
    df = df.filter(col('Month') == month)
    df = df.select('user_id', 'keyword', 'Month')
    df = df.groupBy('user_id', 'keyword', 'Month').count()
    df = df.withColumnRenamed('count', 'Total_search').orderBy('Total_search', ascending= False)
    window = Window.partitionBy('user_id').orderBy(col('Total_search').desc())
    df = df.withColumn('Rank', row_number().over(window))
    df = df.filter(col('Rank') == 1)
    df = df.withColumnRenamed('keyword','Most_Search')
    df = df.select('user_id','Most_Search', 'Month')
    return df

In [8]:
df_t6 = most_search(df, month = 6)
df_t7 = most_search(df, month = 7)

- Read the file that maps each keyword to a category
- Convert the Pandas DataFrame to a Spark DataFrame (`key_category`)

In [None]:
key_category_pd = pd.read_excel('/content/drive/MyDrive/Colab Notebooks/mapping-keywords-with-category.xlsx')
key_category = spark.createDataFrame(key_category_pd)

- Join June (T6) and July (T7) search results with category info
- Inner join the two DataFrames on `user_id` to align T6 and T7 search data for the same user

In [10]:
df_t6 = df_t6.join(key_category, 'Most_Search', 'inner').select('user_id', 'Most_Search', 'Category')
df_t6 = df_t6.withColumnRenamed('Most_Search', 'Most_Search_T6')\
              .withColumnRenamed('Category', 'Category_T6')

In [11]:
df_t7 = df_t7.join(key_category, 'Most_Search', 'inner').select('user_id', 'Most_Search', 'Category')
df_t7 = df_t7.withColumnRenamed('Most_Search', 'Most_Search_T7')\
              .withColumnRenamed('Category', 'Category_T7')

In [12]:
merge_df = df_t6.join(df_t7, 'user_id', 'inner')

Detecting category changes - this is useful for user behavior analysis:
- Tracking how users shift their interests over time by comparing the user's top category from June and July

In [13]:
condition = (col('Category_T6') == col('Category_T7'))
merge_df = merge_df.withColumn('Category_Change', when(condition, 'No Change').otherwise(concat(merge_df['Category_T6'], lit(' - '), merge_df['Category_T7'])))
merge_df.show()

+-------+--------------------+--------------------+--------------------+--------------------+--------------------+
|user_id|      Most_Search_T6|         Category_T6|      Most_Search_T7|         Category_T7|     Category_Change|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+
|0014111|         running man|        Reality Show|        cẩm y chi hạ|Historical / Costume|Reality Show - Hi...|
|0015590|tấm cám: chuyện c...|Historical / Costume|    taxi, em tên gì?|              Comedy|Historical / Cost...|
|0019920|           thiếu nhi|     Anime / Cartoon|              bolero|               Music|Anime / Cartoon -...|
|0026325|               mẹ ma|Horror / Supernat...|               mẹ ma|Horror / Supernat...|           No Change|
|0036165|            trữ tình|               Music|            trữ tình|               Music|           No Change|
|0048616|         penthouse 2|               Drama|      shooting stars|        