In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, to_timestamp, unix_timestamp, lit, lower, coalesce, expr, percentile_approx

spark = SparkSession.builder.appName("FootpathPhase1").getOrCreate()

df = spark.read.csv("../data/raw/bangkok_traffy.csv", header=True, inferSchema=True)

# 1) Prepare keyword matching EXACT like Pandas
keywords = ["ทางเท้า", "ฟุตบาท", "ทางเดิน"]
pattern = "|".join(keywords)

df_fp = df.filter(
    lower(coalesce(col("type").cast("string"), lit(""))).contains("ทางเท้า") |
    lower(coalesce(col("type").cast("string"), lit(""))).contains("ฟุตบาท") |
    lower(coalesce(col("type").cast("string"), lit(""))).contains("เดิน") |
    lower(coalesce(col("comment").cast("string"), lit(""))).contains("ทางเท้า") |
    lower(coalesce(col("comment").cast("string"), lit(""))).contains("ฟุตบาท") |
    lower(coalesce(col("comment").cast("string"), lit(""))).contains("เดิน")
)

# 2) split coords (don't drop anything)
df_fp = df_fp.withColumn("lon", split(col("coords").cast("string"), ",")[0].cast("double"))
df_fp = df_fp.withColumn("lat", split(col("coords").cast("string"), ",")[1].cast("double"))

# 3) timestamps
df_fp = df_fp.withColumn("timestamp", to_timestamp("timestamp"))
df_fp = df_fp.withColumn("last_activity", to_timestamp("last_activity"))

# 4) duration
df_fp = df_fp.withColumn(
    "duration_hours",
    (unix_timestamp("last_activity") - unix_timestamp("timestamp")) / 3600
)

# 5) filter > 0
df_fp = df_fp.filter(col("duration_hours") > 0)

# 6) filter เสร็จสิ้น
df_fp = df_fp.filter(col("state") == "เสร็จสิ้น")

# 7) clip 1%-99%
bounds = df_fp.agg(
    percentile_approx("duration_hours", 0.01).alias("lower"),
    percentile_approx("duration_hours", 0.99).alias("upper")
).collect()[0]

lower_bound = float(bounds["lower"])
upper_bound = float(bounds["upper"])

df_fp = df_fp.withColumn(
    "duration_hours",
    expr(f"CASE WHEN duration_hours < {lower_bound} THEN {lower_bound} "
         f"WHEN duration_hours > {upper_bound} THEN {upper_bound} "
         f"ELSE duration_hours END")
)
#----------------------------------------------------------------------

df_fp = df_fp.coalesce(1)  # ทำให้เป็นไฟล์เดียว

df_fp.write.csv(
    "../data/processed/footpath_phase1_temp",
    header=True,
    mode="overwrite"
)

import os, shutil

src = "../data/processed/footpath_phase1_temp"
dst = "../data/processed/footpath_phase1.csv"

# หาไฟล์ part
for f in os.listdir(src):
    if f.startswith("part-") and f.endswith(".csv"):
        shutil.move(os.path.join(src, f), dst)

# ลบ temp folder
shutil.rmtree(src)

print("Rows:", df_fp.count())
df_fp.show(5)
