In [1]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import * 
from pyspark.sql.window import Window 
import os

In [2]:
spark = SparkSession.builder.config("spark.driver.memory", "15g") \
                            .config("spark.executor.cores", 16) \
                            .config('spark.driver.extraClassPath',
                            '/opt/homebrew/Cellar/apache-spark/3.5.0/libexec/jars/mysql-connector-j-8.2.0.jar') \
                            .getOrCreate()

24/02/27 10:28:29 WARN Utils: Your hostname, Marcus-MacBook.local resolves to a loopback address: 127.0.0.1; using 192.168.1.6 instead (on interface en0)
24/02/27 10:28:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/27 10:28:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
directory = "/Users/marcusle02/Documents/Learning/study_de/BD/data/log_search"

folders = [f for f in os.listdir(directory) if os.path.isdir(os.path.join(directory, f))]

merged_df = None

for folder in folders:
    parquet_path = os.path.join(directory, folder)
    folder_df = spark.read.parquet(parquet_path)
    if merged_df is None:
        merged_df = folder_df
    else:
        merged_df = merged_df.union(folder_df)

                                                                                

In [4]:
# Transform datetime
merged_df = merged_df.withColumn("datetime", to_timestamp(col("datetime")))
merged_df = merged_df.withColumn("month", month(col("datetime")))

In [23]:
merged_df.printSchema()

root
 |-- eventID: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- keyword: string (nullable = true)
 |-- category: string (nullable = true)
 |-- proxy_isp: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- networkType: string (nullable = true)
 |-- action: string (nullable = true)
 |-- userPlansMap: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- month: integer (nullable = true)



In [5]:
merged_df.show()

                                                                                

+--------------------+--------------------+--------+--------------------+--------+---------+--------------------+-----------+------+--------------------+-----+
|             eventID|            datetime| user_id|             keyword|category|proxy_isp|            platform|networkType|action|        userPlansMap|month|
+--------------------+--------------------+--------+--------------------+--------+---------+--------------------+-----------+------+--------------------+-----+
|0a07f729-d126-4e4...|2022-06-11 20:00:...|93232266|           full moon|   enter|     vnpt|                 ios|       WIFI|search|                  []|    6|
|f49475d9-08e5-484...|2022-06-11 20:00:...|42179752|     hồng kông kỳ án|   enter|     vnpt|   fplay-ottbox-2019|   ethernet|search|[Kênh Gia Đình:pr...|    6|
|612bfd7d-a744-4be...|2022-06-11 20:00:...| 0778150|mãi mãi trong tim em|   enter|     vnpt| smarttv-ceb-nextgen|       NULL|search|                  []|    6|
|f105bce4-0c64-463...|2022-06-11 20:00:.

In [7]:
merged_df.count()

                                                                                

2366972

In [5]:
def process_log_search(data):
    data = data.select('user_id', 'month', 'keyword')
    data = data.groupBy('user_id', 'month', 'keyword').count()
    data = data.withColumnRenamed('count','TotalSearch')
    data = data.orderBy('user_id', ascending = False)
    window = Window.partitionBy('user_id', 'month').orderBy(col('TotalSearch').desc())
    data = data.withColumn('Rank',row_number().over(window))
    data = data.filter(col('Rank') == 1)
    data = data.withColumnRenamed('keyword','Most_Search')
    data = data.select('user_id', 'month', 'Most_Search')
    return data 

In [6]:
data = process_log_search(merged_df)

In [10]:
data.count()

                                                                                

706704

In [7]:
data.show()

                                                                                

+--------+-----+--------------------+
| user_id|month|         Most_Search|
+--------+-----+--------------------+
|95438351|    6|              zombie|
| 9543862|    6|       vườn sao băng|
|95438662|    7|the sex lives of ...|
|95438665|    7|            CHUNG TA|
|95438692|    7|  record of ragnarok|
|95438750|    6|                0900|
|95438787|    7|     tinh hà xán lạn|
|95438807|    6|                soma|
|95438862|    6|    kimetsu no yaiba|
|95438877|    7|tại sao boss muốn...|
|95439036|    6|       bắt ma phá án|
|95439036|    7|hạnh phúc đến vạn...|
|95439148|    7|              xe đua|
|95439214|    7|định mệnh anh yêu em|
|95439541|    7|                JOJO|
|95439543|    6|trực tiếp việt na...|
|95440167|    6|           onep pice|
|95440168|    7|          JOHN WICK4|
|95440254|    6|              BORUTO|
|95440311|    6|     chạm vào tim em|
+--------+-----+--------------------+
only showing top 20 rows



In [35]:
pivoted_data = data.groupBy("user_id").pivot("month").agg(max(col("Most_Search"))).select("user_id", "6", "7") \
                    .withColumnRenamed("6", "most_search_june") \
                    .withColumnRenamed("7", "most_search_july") \
                    .filter((col("most_search_june").isNotNull()) & (col("most_search_july").isNotNull()))

pivoted_data.show()

[Stage 386:>                                                        (0 + 8) / 9]

+--------+--------------------+--------------------+
| user_id|    most_search_june|    most_search_july|
+--------+--------------------+--------------------+
|95397995|          sky castle|     hunter x hunter|
|95398085|cuộc chiến thượng...|    hậu duệ mặt trời|
|95398732|tokyo ghoul:re se...|             21 days|
|95399499|         hội pháp sư|fairy tail nhiệm ...|
|95399914|học viện quân sự ...|                 xem|
|95399978|        keep running|          thần biển |
|95400118|hay goi toi la gi...|cong to vien tai ...|
| 9540484|         hoa cua quy|  tam biet vancouver|
|95405240|buổi tập thứ hai ...|           phim ngắn|
|95405382|     cặp mắt sát thủ|               alice|
|95406019|   muốn được bên anh|     khu vườn bí mật|
| 9540704|    fight for my way| thương ngày nặng về|
|95407077|     phi vụ triệu đô|            bão ngầm|
|95407546|           one piece|             as roma|
| 9540759|thanh gươm diệt q...|vua bếp soma (phầ...|
|95409529|            HELLSING|         overlo

                                                                                

In [22]:
pivoted_data.count()

                                                                                

76720

In [33]:
# Mapping file
key_search_df = spark.read.csv("/Users/marcusle02/Documents/Learning/study_de/BD/data/key_search.csv", header = True)
key_search_df.show()

+--------------------+---------+
|         Most_Search| Category|
+--------------------+---------+
|    2 FAST 2 FURIOUS|   Action|
|              BORUTO|    Anime|
|            DORAEMON|    Anime|
|             GAP GO |  C-DRAMA|
|              GOBLIN|  K-DRAMA|
|             HAIKYUU|    Anime|
|             KENSHIN|    Anime|
|                KHIE|undefined|
|Liên Minh Công Lý...|   Action|
|               MINIO|  Comedy |
|            ONEPIECE|    Anime|
|       YOU ARE MY GL|  C-DRAMA|
|                anna|  Horror |
|attack on titan (...|    Anime|
|              bac si|  K-DRAMA|
|              boruto|    Anime|
|      bác sĩ luật sư|  K-DRAMA|
|  bí mật nơi góc tối|  C-DRAMA|
|bạn gái tôi là ng...| Romantic|
|       bắt ma phá án|  Horror |
+--------------------+---------+
only showing top 20 rows



In [43]:
# Join with key_search_df
mapped_data = pivoted_data.join(key_search_df, pivoted_data["most_search_june"] == key_search_df["Most_Search"], "left") \
    .withColumnRenamed("Category", "category_june") \
    .drop("Most_Search") \
    .join(key_search_df, pivoted_data["most_search_july"] == key_search_df["Most_Search"], "left") \
    .withColumnRenamed("Category", "category_july") \
    .drop("Most_Search")

# Determine trending
mapped_data = mapped_data.withColumn("trending", when(col("category_june") == col("category_july"), "unchanged").otherwise("changed"))

# Determine changes
mapped_data = mapped_data.withColumn("changes", when(col("category_june") == col("category_july"), "unchanged")
                                     .otherwise(concat(col("category_june"), lit("-"), col("category_july")))) \
                         .filter((col("category_june").isNotNull()) & (col("category_july").isNotNull()))

mapped_data.show()

[Stage 404:>                                                        (0 + 8) / 9]

+--------+--------------------+--------------------+-------------+-------------+---------+----------------+
| user_id|    most_search_june|    most_search_july|category_june|category_july| trending|         changes|
+--------+--------------------+--------------------+-------------+-------------+---------+----------------+
|95499666|        mộng hoa lục|            why her?|      C-DRAMA|     Romantic|  changed|C-DRAMA-Romantic|
|95515064|         running man|         running man|    RealityTV|    RealityTV|unchanged|       unchanged|
|95545171|yêu trong đau thương|  thiên nga bóng đêm|      V-DRAMA|      K-DRAMA|  changed| V-DRAMA-K-DRAMA|
|95566492|            trữ tình|           siêu nhân|        Music|        Child|  changed|     Music-Child|
|95580863|           siêu nhân|           siêu nhân|        Child|        Child|unchanged|       unchanged|
|95585283|           one piece|           one piece|        Anime|        Anime|unchanged|       unchanged|
| 9561405|        mộng hoa l

                                                                                

In [None]:
# MySQL file, to join if needed
url = "jdbc:mysql://localhost/log_db"
connection_properties = {
    "user": "root",
    "password": "5nam",
    "driver": "com.mysql.cj.jdbc.Driver"
}

dbtable = "log_summary"
mysql_df = spark.read.jdbc(url=url, table=dbtable, properties=connection_properties)

In [44]:
mapped_data.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- most_search_june: string (nullable = true)
 |-- most_search_july: string (nullable = true)
 |-- category_june: string (nullable = true)
 |-- category_july: string (nullable = true)
 |-- trending: string (nullable = false)
 |-- changes: string (nullable = true)



In [47]:
# Write data to MySQL
dbtable2 = "log_search_summary"
mapped_data.write.jdbc(url=url, table=dbtable2, mode='overwrite', properties=connection_properties)
print("Output saved to MySQL.")



Output saved to MySQL.


                                                                                