In [None]:
import pandas as pd
from pathlib import Path
from IPython.display import display
from sklearn.preprocessing import OneHotEncoder

from pyspark.sql import SparkSession
from pyspark.sql import functions as F


In [None]:
def inspect_dataframe(df, name="df", n_head=5):
    print(f"DataFrame: {name}")
    print(f"Shape: {df.shape[0]} rows x {df.shape[1]} columns\n")
    print("Columns:")
    print(df.dtypes)
    print("\nHead:")
    display(df.head(n_head))


In [None]:
# สร้าง SparkSession
spark = SparkSession.builder.appName("CleanScrapingData").getOrCreate()

# ระบุ path ไฟล์ดิบ
raw_path = Path("../data/scraping_data.csv")

if not raw_path.exists():
    raise FileNotFoundError(f"{raw_path} not found in {Path.cwd()}")

# โหลดไฟล์ CSV ด้วย Spark
spark_df_raw = (
    spark.read
    .option("header", True)       # แถวแรกเป็น header
    .option("inferSchema", True)  # ให้เดา type อัตโนมัติ
    .csv(str(raw_path))
)

# สร้าง temp view เพื่อใช้ Spark SQL
spark_df_raw.createOrReplaceTempView("scraping_raw")

print("จำนวน row ดิบ (Spark):", spark_df_raw.count())
spark_df_raw.printSchema()


In [None]:
# ระบุคอลัมน์ที่ต้องไม่เป็น null หรือ "unknown"
cols_to_check = [
    "date",
    "day_of_week",
    "start",
    "end",
    "name",
    "location",
    "district",
    "province",
    "temp",
    "rain",
    "wind_gust",
]

# สร้างเงื่อนไข WHERE แบบ dynamic ด้วย Python
conditions = " AND ".join(
    [
        f"{c} IS NOT NULL AND lower(trim({c})) <> 'unknown'"
        for c in cols_to_check
    ]
)

query_step1 = f"""
    SELECT *
    FROM scraping_raw
    WHERE {conditions}
"""

spark_df_step1 = spark.sql(query_step1)
spark_df_step1.createOrReplaceTempView("scraping_clean_step1")

print("จำนวน row หลังลบ null / 'unknown':", spark_df_step1.count())
spark_df_step1.printSchema()


In [None]:
# แปลง start/end เป็น timestamp ก่อน
spark_df_with_times = spark.sql("""
    SELECT
        *,
        to_timestamp(start, 'HH:mm') AS _start_dt,
        to_timestamp(end,   'HH:mm') AS _end_dt
    FROM scraping_clean_step1
""")

spark_df_with_times.createOrReplaceTempView("scraping_with_times")

# filter แถวที่เวลาไม่สมบูรณ์ + คำนวณ time_range และ duration
spark_df_step2 = spark.sql("""
    SELECT
        *,
        date_format(_start_dt, 'HH:mm') || ' - ' || date_format(_end_dt, 'HH:mm') AS time_range,
        (unix_timestamp(_end_dt) - unix_timestamp(_start_dt)) / 60.0 AS duration_minutes
    FROM scraping_with_times
    WHERE _start_dt IS NOT NULL
      AND _end_dt IS NOT NULL
      AND _end_dt >= _start_dt
""")

spark_df_step2.createOrReplaceTempView("scraping_clean_step2")

print("จำนวน row หลังจัดการเวลา:", spark_df_step2.count())


In [None]:
spark_df_final = spark.sql("""
    SELECT
        date,
        day_of_week,
        start,
        end,
        name,
        district,
        province,
        temp,
        rain,
        wind_gust,
        time_range,
        duration_minutes
    FROM scraping_clean_step2
    WHERE lower(province) = 'bangkok'
""")

print("จำนวน row หลัง filter province = 'bangkok':", spark_df_final.count())
spark_df_final.printSchema()


In [None]:
# แปลง Spark DataFrame → pandas DataFrame
df = spark_df_final.toPandas()

# ดูข้อมูลหลัง clean แล้ว
inspect_dataframe(df, name="cleaned_df", n_head=10)

# บันทึกไฟล์ผลลัพธ์
clean_path = Path("../data/clean_scraping_data.csv")
df.to_csv(clean_path, index=False, encoding="utf-8-sig")

print(f"บันทึกไฟล์เรียบร้อยที่: {clean_path.resolve()}")
