In [16]:
import findspark
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
import pandas as pd
from pyspark.sql.window import Window
import pyspark.sql.functions as sf
from pyspark.sql.functions import concat_ws
from datetime import datetime, timedelta
from pyspark.sql import functions as F
import os

import json
from openai import OpenAI

from pyspark.sql.functions import coalesce, lit, col, when, concat

In [3]:
spark = SparkSession.builder.config("spark.driver.memory", "8g").config("spark.executor.cores", 8).getOrCreate()

In [None]:
data.printSchema()

In [24]:
from pyspark.sql.functions import col, month, to_timestamp

def ETL_all_day(path):
    folder_list = os.listdir(path)

    all_paths = [f"{path}/{folder}/*.parquet" for folder in folder_list]
    df = spark.read.parquet(*all_paths)
    df = df.withColumn("datetime_ts", to_timestamp(col("datetime")))
    df = df.withColumn("Month", month(col("datetime_ts")))
    df = df.filter((col("action") == "search") & (col("user_id").isNotNull()) & (col("keyword").isNotNull()))
    return df.cache()

def most_search(df, month):
    df = df.filter(col("Month") == month)
    df = (df.groupBy("user_id", "keyword", "month").count().withColumnRenamed("count", "Total_search"))
    window = Window.partitionBy("user_id").orderBy(col("Total_search").desc())
    df = df.withColumn("Rank", row_number().over(window))
    df = (
        df.filter(col("Rank") == 1)
          .withColumnRenamed("keyword", "Most_Search")
          .select("user_id", "Most_Search", "Month")
    )
    return df

def import_to_postgresql(result):
    url = "jdbc:postgresql://localhost:5432/test_etl"
    properties = {
        "driver": "org.postgresql.Driver",
        "user": "postgres",
        "password": "1"  
    }

    (
        result.write.format("jdbc")
        .option("url", url)
        .option("dbtable", "final_project_bigdata")
        .option("user", properties["user"])
        .option("password", properties["password"])
        .option("driver", properties["driver"])
        .mode("append")  
        .save()
    )

    print("Data import successfully!")



In [5]:
label = spark.read.csv(
    "D:/study/output/most_search_values_inner.csv",
    header=True,
    inferSchema=True
)

pdf = label.toPandas()

client = OpenAI(api_key="sk-123")

def classify_batch(movie_list):
    if not movie_list:
        return {}
    prompt = f"""
    B·∫°n l√† h·ªá th·ªëng ph√¢n lo·∫°i phim, show truy·ªÅn h√¨nh v√† n·ªôi dung gi·∫£i tr√≠.  

    D·ªØ li·ªáu ƒë·∫ßu v√†o: danh s√°ch t√™n (c√≥ th·ªÉ sai ch√≠nh t·∫£, vi·∫øt t·∫Øt, thi·∫øu ch·ªØ).  

    Nhi·ªám v·ª• c·ªßa b·∫°n:
    1. Nh·∫≠n di·ªán t√™n g·∫ßn ƒë√∫ng nh·∫•t.  
    2. X√°c ƒë·ªãnh th·ªÉ lo·∫°i ch√≠nh. N·∫øu th·ªÉ lo·∫°i ch∆∞a c√≥ trong danh s√°ch, b·∫°n ƒë∆∞·ª£c ph√©p t·ª± t·∫°o ra m·ªôt nh√£n th·ªÉ lo·∫°i ng·∫Øn g·ªçn, d·ªÖ hi·ªÉu.  

    M·ªôt s·ªë nh√≥m g·ª£i √Ω:  
    - Action  
    - Romance  
    - Comedy  
    - Horror  
    - Animation  
    - Drama  
    - C Drama  
    - K Drama  
    - Sports  
    - Music  
    - Reality Show  
    - TV Channel  
    - News  
    - Other  

    ‚ö†Ô∏è Tr·∫£ l·ªùi DUY NH·∫§T 1 JSON object h·ª£p l·ªá.  
    Kh√¥ng th√™m gi·∫£i th√≠ch, kh√¥ng th√™m ch·ªØ n√†o kh√°c ngo√†i JSON.  

    V√≠ d·ª•:  
    {{"Titanic": "Romance", "VTV6": "TV Channel", "Tuy·ªÉn Vi·ªát Nam": "Sports"}}  

    Danh s√°ch:  
    {movie_list}
    """

    try:
        resp = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role":"user","content":prompt}],
            temperature=0
        )
        text = resp.choices[0].message.content.strip()

        # L·∫•y JSON
        start, end = text.find("{"), text.rfind("}")
        if start == -1 or end == -1:
            return {m: "Other" for m in movie_list}
        parsed = json.loads(text[start:end+1])

        # Map k·∫øt qu·∫£
        return {title: parsed.get(title, "Other") for title in movie_list}

    except Exception as e:
        print("Error:", e)
        return {m: "Other" for m in movie_list}


def classify_all(movies, batch_size=50):
    all_mapping = {}
    for i in range(0, len(movies), batch_size):
        batch = movies[i:i+batch_size]
        print(f"üîπ ƒêang x·ª≠ l√Ω batch {i//batch_size+1} ({len(batch)} items)...")
        batch_mapping = classify_batch(batch)
        all_mapping.update(batch_mapping)
    return all_mapping


# T√¨m c·ªôt "Most_Search"
col_candidates = [c for c in pdf.columns if "Most_Search" in c]
if not col_candidates:
    raise ValueError("Kh√¥ng t√¨m th·∫•y c·ªôt ch·ª©a 'Most_Search'")
col_name = col_candidates[0]

# L·∫•y danh s√°ch movies
movies = pdf[col_name].dropna().astype(str).tolist()

# Ph√¢n lo·∫°i theo batch
mapping = classify_all(movies, batch_size=50)

# Th√™m c·ªôt cateogry
pdf["Category"] = pdf[col_name].map(lambda x: mapping.get(x, "Other"))

output_path = "D:/study/output/category.csv"
pdf.to_csv(output_path, index=False, encoding="utf-8-sig")

print(f"ƒê√£ xu·∫•t file ph√¢n lo·∫°i t·∫°i: {output_path}")

üîπ ƒêang x·ª≠ l√Ω batch 1 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 2 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 3 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 4 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 5 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 6 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 7 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 8 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 9 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 10 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 11 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 12 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 13 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 14 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 15 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 16 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 17 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 18 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 19 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 20 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 21 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 22 (50 items)...
üîπ ƒêang x·ª≠ l√Ω batch 23 (50 items)...
üîπ ƒêang x·ª≠ l√Ω 

In [7]:
folder_path = 'D:/study/dataset/log_search'
df = ETL_all_day(folder_path)
data6 = most_search(df, month=6)
data7 = most_search(df, month=7)

664031

In [14]:
mapping_df = spark.read.csv("D:/study/output/category.csv", header= True, inferSchema=True)

data6 = (
    data6.join(mapping_df, on="Most_Search", how="inner")
         .select("user_id", "Most_Search", "Category")
         .withColumnRenamed("Most_Search", "Most_Search_T6")
         .withColumnRenamed("Category", "Category_T6")
)

data7 = (
    data7.join(mapping_df, on="Most_Search", how="inner")
         .select("user_id", "Most_Search", "Category")
         .withColumnRenamed("Most_Search", "Most_Search_T7")
         .withColumnRenamed("Category", "Category_T7")
)

In [18]:
df_all = data6.join(data7, on="user_id", how="inner")
condition = col("Category_T6") == col("Category_T7")
df_all = df_all.withColumn(
    "Category_change",
    when(condition, lit("NoChange"))
    .otherwise(concat(col("Category_T6"), lit(" - "), col("Category_T7")))
)


In [25]:
df_all.show()

+--------+--------------------+-----------+--------------------+-----------+------------------+
| user_id|      Most_Search_T6|Category_T6|      Most_Search_T7|Category_T7|   Category_change|
+--------+--------------------+-----------+--------------------+-----------+------------------+
| 0017684|ph√°p y t·∫ßn minh: ...|    Mystery|       b√°c sƒ© yo han|      Drama|   Mystery - Drama|
| 0019920|           thi·∫øu nhi|  Animation|              bolero|      Music| Animation - Music|
| 0099596|          cu·ªôc chi·∫øn|     Action|     h∆°n c·∫£ t√¨nh b·∫°n|    Romance|  Action - Romance|
| 0153643|y√™u em t·ª´ c√°i nh√¨...|    Romance|10 nƒÉm 3 th√°ng 30...|      Other|   Romance - Other|
|  016508|            Bigfoot |      Other|    taxi, em t√™n g√¨?|      Other|          NoChange|
| 0166772|          kh·ªßng long|  Animation|           si√™u nh√¢n|     Action|Animation - Action|
| 0177631|            tr·ªØ t√¨nh|      Music|            tr·ªØ t√¨nh|      Music|          NoChange|
| 01

In [26]:
import_to_postgresql(df_all)

Data import successfully!
