In [None]:
import os
import pandas as pd
from tqdm import tqdm
from typing import Optional, Tuple


INPUT_ROOT  = "/Users/kukkaii/Documents/DA PRoject/Clean_RoadData/Tambon/Extracted_bz2"
OUTPUT_ROOT = "/Users/kukkaii/Documents/DA PRoject/Clean_RoadData/Tambon/Filtered_engine1_light0"


CHUNK_SIZE  = 500_000


STD_COLS = [
    "vehicle_id","gps_valid","lat","lon","timestamp",
    "speed_kmh","heading_deg","for_hire_light","engine_acc"
]
REQUIRED = {"lat","lon","timestamp","speed_kmh","engine_acc","for_hire_light"}
KEEP_COLS = ["lat","lon","timestamp","speed_kmh"]

def ensure_dir(path: str):
    os.makedirs(path, exist_ok=True)

def sniff_has_header(path: str) -> bool:
    """ตรวจสอบเบื้องต้นว่าไฟล์มี header หรือไม่"""
    try:
        first_row = pd.read_csv(path, nrows=1, engine="python", on_bad_lines="skip")
        cols = [c.strip().lower() for c in first_row.columns.tolist()]
        return {"timestamp","lat","lon"}.issubset(set(cols))
    except Exception:
        return False

def process_with_header(in_path: str, out_path: str) -> Tuple[int, Optional[str]]:
    """กรณีไฟล์มี header"""
    written = 0
    wrote_header = False
    if os.path.exists(out_path):
        os.remove(out_path)
    try:
        for chunk in pd.read_csv(
            in_path,
            chunksize=CHUNK_SIZE,
            engine="python",
            on_bad_lines="skip"
        ):
        
            cols_lower = {c.lower(): c for c in chunk.columns}
            if not REQUIRED.issubset(set(cols_lower.keys())):
                missing = REQUIRED - set(cols_lower.keys())
                return 0, f"missing required columns: {missing}"

            # map column names
            lat = cols_lower["lat"]
            lon = cols_lower["lon"]
            ts  = cols_lower["timestamp"]
            spd = cols_lower["speed_kmh"]
            eng = cols_lower["engine_acc"]
            light = cols_lower["for_hire_light"]

            # convert to numeric
            chunk[eng]   = pd.to_numeric(chunk[eng], errors="coerce")
            chunk[light] = pd.to_numeric(chunk[light], errors="coerce")

            mask = (chunk[eng] == 1) & (chunk[light] == 0)
            filtered = chunk.loc[mask, [lat, lon, ts, spd]].copy()
            if filtered.empty:
                continue

            # add Date column
            filtered["Date"] = pd.to_datetime(filtered[ts], errors="coerce").dt.strftime("%Y-%m-%d")
            filtered = filtered.dropna(subset=["Date"])
            filtered.columns = ["lat","lon","timestamp","speed_kmh","Date"]

            ensure_dir(os.path.dirname(out_path))
            filtered.to_csv(out_path, mode="a", index=False, header=not wrote_header, encoding="utf-8-sig")
            wrote_header = True
            written += len(filtered)

        if written == 0 and os.path.exists(out_path):
            os.remove(out_path)
        return written, None
    except Exception as e:
        if os.path.exists(out_path) and os.path.getsize(out_path) == 0:
            os.remove(out_path)
        return 0, str(e)

def process_no_header(in_path: str, out_path: str) -> Tuple[int, Optional[str]]:
    """ no header"""
    written = 0
    wrote_header = False
    if os.path.exists(out_path):
        os.remove(out_path)
    try:
        for chunk in pd.read_csv(
            in_path,
            header=None,
            names=STD_COLS,
            chunksize=CHUNK_SIZE,
            engine="python",
            on_bad_lines="skip"
        ):
            chunk["engine_acc"]    = pd.to_numeric(chunk["engine_acc"], errors="coerce")
            chunk["for_hire_light"]= pd.to_numeric(chunk["for_hire_light"], errors="coerce")

            mask = (chunk["engine_acc"] == 1) & (chunk["for_hire_light"] == 0)
            filtered = chunk.loc[mask, KEEP_COLS].copy()
            if filtered.empty:
                continue

            filtered["Date"] = pd.to_datetime(filtered["timestamp"], errors="coerce").dt.strftime("%Y-%m-%d")
            filtered = filtered.dropna(subset=["Date"])

            ensure_dir(os.path.dirname(out_path))
            filtered.to_csv(out_path, mode="a", index=False, header=not wrote_header, encoding="utf-8-sig")
            wrote_header = True
            written += len(filtered)

        if written == 0 and os.path.exists(out_path):
            os.remove(out_path)
        return written, None
    except Exception as e:
        if os.path.exists(out_path) and os.path.getsize(out_path) == 0:
            os.remove(out_path)
        return 0, str(e)

def process_one_csv(in_path: str, out_path: str) -> Tuple[int, Optional[str]]:
    """เลือก process ตามว่ามี header หรือไม่"""
    has_header = sniff_has_header(in_path)
    if has_header:
        return process_with_header(in_path, out_path)
    else:
        return process_no_header(in_path, out_path)

def main():
    csv_files = []
    for root, _, files in os.walk(INPUT_ROOT):
        for f in files:
            if f.lower().endswith(".csv"):
                csv_files.append(os.path.join(root, f))

    if not csv_files:
        print("⚠️ ไม่พบไฟล์ .csv ในโฟลเดอร์ต้นทาง")
        return

    print(f"พบไฟล์ .csv ทั้งหมด {len(csv_files)} ไฟล์")
    total_rows = 0
    error_files = []

    for in_path in tqdm(csv_files, desc="Processing CSVs"):
        rel_path = os.path.relpath(in_path, INPUT_ROOT)
        out_path = os.path.join(OUTPUT_ROOT, rel_path)
        ensure_dir(os.path.dirname(out_path))

        written, err = process_one_csv(in_path, out_path)
        if err:
            print(f"⚠️ Error: {in_path} -> {err}")
            error_files.append((in_path, err))
            if os.path.exists(out_path) and os.path.getsize(out_path) == 0:
                os.remove(out_path)
        else:
            if written == 0 and os.path.exists(out_path):
                os.remove(out_path)
            total_rows += written

    print("\n✅ finished")
    print(f" number of pass row condition: {total_rows:,}")
    if error_files:
        print(f"⚠️ number of file that contain the error {len(error_files)} files (maximum display 10 files):")
        for p, e in error_files[:10]:
            print(f"   - {p} | {e}")
        if len(error_files) > 10:
            print("   ...")

if __name__ == "__main__":
    main()

พบไฟล์ .csv ทั้งหมด 276 ไฟล์


Processing CSVs: 100%|██████████| 276/276 [32:07<00:00,  6.98s/it]


✅ เสร็จสิ้น
➡️ จำนวนแถวที่ผ่านเงื่อนไขทั้งหมด: 151,066,892





In [None]:
import os
import pandas as pd
from tqdm import tqdm

INPUT_ROOT  = "/Users/kukkaii/Documents/DA PRoject/Clean_RoadData/Tambon/Filtered_engine1_light0"
OUTPUT_ROOT = "/Users/kukkaii/Documents/DA PRoject/Clean_RoadData/Tambon/Weekend_3_10_11"

def ensure_dir(path):
    os.makedirs(path, exist_ok=True)

def process_one_csv(in_path, out_path):
    """อ่านไฟล์ CSV, กรองเสาร์-อาทิตย์ เดือน 3,10,11 แล้วบันทึกผล"""
    try:
        df = pd.read_csv(in_path, parse_dates=["Date"])
        if df.empty:
            return 0, None

 
        df["month"] = df["Date"].dt.month
        df["weekday"] = df["Date"].dt.dayofweek  # 0=Mon ... 5=Sat, 6=Sun

        # filter for month 3,10,11 and sat-sun
        mask = df["month"].isin([3, 10, 11]) & df["weekday"].isin([5, 6])
        df_weekend = df.loc[mask, ["lat", "lon", "Date", "speed_kmh"]]

        if not df_weekend.empty:
            ensure_dir(os.path.dirname(out_path))
            df_weekend.to_csv(out_path, index=False, encoding="utf-8-sig")
            return len(df_weekend), None
        else:
            return 0, None
    except Exception as e:
        return 0, str(e)

def main():

    csv_files = []
    for root, _, files in os.walk(INPUT_ROOT):
        for f in files:
            if f.lower().endswith(".csv"):
                csv_files.append(os.path.join(root, f))

    if not csv_files:
        print("not found the .csv at that filder")
        return

    print(f"found .csv {len(csv_files)} files")
    total_rows = 0
    error_files = []

    for in_path in tqdm(csv_files, desc="Processing CSVs"):

        rel_path = os.path.relpath(in_path, INPUT_ROOT)
        out_path = os.path.join(OUTPUT_ROOT, rel_path)

        rows, err = process_one_csv(in_path, out_path)
        total_rows += rows
        if err:
            error_files.append((in_path, err))

    print("\n✅ finished")
    print(f"➡️ row that pass the condition: {total_rows:,}")
    if error_files:
        print(f" file have error {len(error_files)} files:")
        for p, e in error_files[:10]:
            print(f"   - {p} | {e}")
        if len(error_files) > 10:
            print("   ...")

if __name__ == "__main__":
    main()

พบไฟล์ .csv ทั้งหมด 276 ไฟล์


Processing CSVs: 100%|██████████| 276/276 [01:42<00:00,  2.68it/s]


✅ เสร็จสิ้น
➡️ แถวทั้งหมดที่ผ่านเงื่อนไข: 40,710,538



