In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [2]:

spark = (
    SparkSession.builder
    .appName("ReadLocalParquet")
    .master("local[*]")
    .config("spark.hadoop.fs.defaultFS", "file:///")
    .config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem")
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .getOrCreate()
)

In [3]:
parquet_path="log_search/20220601/"
import os

print(os.listdir(parquet_path))



['.part-00000-a4016f8f-c573-42d0-aa92-3367f02f0c94-c000.snappy.parquet.crc', 'part-00000-a4016f8f-c573-42d0-aa92-3367f02f0c94-c000.snappy.parquet', '_SUCCESS']


In [3]:
import os
size_mb = os.path.getsize("log_search/20220601/part-00000-a4016f8f-c573-42d0-aa92-3367f02f0c94-c000.snappy.parquet") / (1024*1024)
print(f"File size: {size_mb:.2f} MB")


File size: 4.62 MB


In [4]:
# df = spark.read.parquet("file:///D:/study_de/Homework/log_search_etl/log_search/20220601/part-00000-a4016f8f-c573-42d0-aa92-3367f02f0c94-c000.snappy.parquet")
# df.printSchema()


In [4]:
import glob
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ReadParquet").getOrCreate()

# Tìm tất cả file .parquet trong thư mục
files = glob.glob(r"D:\study_de\Homework\log_search_etl\log_search\20220601\*.parquet")

# Đọc tất cả file và union lại
df = spark.read.parquet(*files)
df.printSchema()


root
 |-- eventID: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- keyword: string (nullable = true)
 |-- category: string (nullable = true)
 |-- proxy_isp: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- networkType: string (nullable = true)
 |-- action: string (nullable = true)
 |-- userPlansMap: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [5]:
df.select("category").distinct().show(truncate=False)

+--------+
|category|
+--------+
|enter   |
|quit    |
+--------+



In [None]:
df.show(5)

+--------------------+--------------------+--------+--------------------+--------+---------+--------------------+-----------+------+--------------------+
|             eventID|            datetime| user_id|             keyword|category|proxy_isp|            platform|networkType|action|        userPlansMap|
+--------------------+--------------------+--------+--------------------+--------+---------+--------------------+-----------+------+--------------------+
|61804316-6b89-4cf...|2022-06-01 18:59:...|    NULL|            trữ tình|   enter|     vnpt|   fplay-ottbox-2019|   ethernet|search|                NULL|
|22c35287-9fe1-487...|2022-06-01 18:59:...|44887906|            trữ tình|   enter|     vnpt|   fplay-ottbox-2019|   ethernet|search|                  []|
|f9af5a95-1f72-486...|2022-06-01 18:59:...| 2719170|              bolero|   enter|  viettel|   fplay-ottbox-2019|   ethernet|search|[Kênh Gia Đình:pr...|
|fd53edee-132d-4ac...|2022-06-01 15:00:...|    NULL|amy schumer: trực...|   

In [8]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, count, row_number
from pyspark.sql.window import Window

def get_top_keywords_per_user(df: DataFrame) -> DataFrame:

    # Bước 1: Đếm số lần tìm mỗi keyword của từng user
    keyword_count = (
        df.groupBy("user_id", "keyword")
          .agg(count("*").alias("search_count"))
    )

    # Bước 2: Xác định keyword có tần suất cao nhất cho từng user_id
    windowSpec = Window.partitionBy("user_id").orderBy(col("search_count").desc())

    top_keywords = (
        keyword_count
        .withColumn("rank", row_number().over(windowSpec))
        .filter(col("rank") == 1)
        .select("user_id", "keyword", "search_count")
        .orderBy(col("search_count").desc()) 
    )

    return top_keywords


In [10]:
result = get_top_keywords_per_user(df)

result.show(20, truncate=False)

+--------+--------------------+------------+
|user_id |keyword             |search_count|
+--------+--------------------+------------+
|NULL    |NULL                |681         |
|96076212|ZUMBA               |70          |
|48115586|HOOWOO              |54          |
|95436495|FPT                 |41          |
|5608949 |JOJO                |33          |
|3765556 |DOREMON             |31          |
|49513555|MINECRAFT           |28          |
|1863834 |COIDABANHTRUCTIEP   |27          |
|43849602|BOBOIBOY            |25          |
|49866211|paw                 |23          |
|5737883 |mr. queen           |21          |
|49201339|KADAOKE             |20          |
|2033606 |DMSS                |19          |
|43587350|LUFF                |19          |
|40851705|HOPE                |17          |
|49948786|NULL                |17          |
|49317752|GONJIAM             |15          |
|49628072|TRUCTIEPBONGDAHOMNAY|15          |
|92990723|yêu nhầm chị dâu    |15          |
|06480429|

In [11]:
df.select('keyword').groupBy('keyword').count().orderBy('count',ascending = False).show(10,truncate=False)

+--------------------------------------------+-----+
|keyword                                     |count|
+--------------------------------------------+-----+
|NULL                                        |5048 |
|Liên Minh Công Lý: Phiên bản của Zack Snyder|724  |
|fairy tail                                  |517  |
|cuộc chiến thượng lưu                       |416  |
|yêu nhầm chị dâu                            |335  |
|Thử Thách Thần Tượng                        |294  |
|running man                                 |292  |
|siêu nhân                                   |291  |
|kẻ quyến rũ đại tài                         |284  |
|sao băng                                    |282  |
+--------------------------------------------+-----+
only showing top 10 rows

