# Data Cleaning

In [1]:
DATA_PATH = "../output/raw-csv"
OUTPUT_PATH = "../output/csv"
OUTPUT_STAT_PATH = "../output/csv-stat"

OUTLIER_THRESHOLDS = {
    "pH": (0, 14),
    "EC": (0, 2000),
    "Temp": (0, 50),
    "DO": (0, 20)
}

In [2]:

from pathlib import Path
from pandas import DataFrame, read_csv
from logging import basicConfig, INFO
from dataclasses import dataclass
from IPython.display import display

basicConfig(level=INFO)

from mp import mp_print, mp_exec

INFO:mp:Set start method to fork


In [3]:
DATA_PATH = Path(DATA_PATH).resolve()
OUTPUT_PATH = Path(OUTPUT_PATH).resolve()
OUTPUT_STAT_PATH = Path(OUTPUT_STAT_PATH).resolve()

OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
OUTPUT_STAT_PATH.mkdir(parents=True, exist_ok=True)

In [4]:
def iter_files():
    for file in filter(lambda x: x.is_file() and x.is_file(), DATA_PATH.iterdir()):
        yield file

In [5]:
@dataclass
class Stat:
    station: str
    total: int
    duplicate: int
    missing: int
    outliers: int
    valid: int
    threshold_outliers: int
    iqr_outliers: int
    ph_iqr: float
    ph_lb: float
    ph_up: float
    ec_iqr: float
    ec_lb: float
    ec_up: float
    temp_iqr: float
    temp_lb: float
    temp_up: float
    do_iqr: float
    do_lb: float
    do_up: float

def task(file: Path):
    station = file.name.split(".")[0]
    mp_print(f"Processing : {station}")
    
    stat = Stat(station,0,0,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0)
    df = read_csv(file, parse_dates=["Datetime"])
    stat.total = len(df)

    df = df[["Datetime", "pH", "EC", "Temp", "DO"]]

    size = len(df)
    df.dropna(inplace=True)
    stat.missing = size - len(df)

    size = len(df)
    df.drop_duplicates(inplace=True)
    df.drop_duplicates(subset=["pH", "EC", "Temp", "DO"], inplace=True)
    stat.duplicate = size - len(df)

    size = len(df)
    for column, (low, high) in OUTLIER_THRESHOLDS.items():
        df = df[(df[column] >= low) & (df[column] <= high)]
        stat.threshold_outliers += size - len(df)
        size = len(df)

    Q1 = df[["pH", "EC", "Temp", "DO"]].quantile(0.25)
    Q3 = df[["pH", "EC", "Temp", "DO"]].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    df = df[
        (df["pH"] >= lower_bound["pH"]) & (df["pH"] <= upper_bound["pH"]) &
        (df["EC"] >= lower_bound["EC"]) & (df["EC"] <= upper_bound["EC"]) &
        (df["Temp"] >= lower_bound["Temp"]) & (df["Temp"] <= upper_bound["Temp"]) &
        (df["DO"] >= lower_bound["DO"]) & (df["DO"] <= upper_bound["DO"])
    ]
    
    for column in ["pH", "EC", "Temp", "DO"]:
        setattr(stat, f"{column.lower()}_iqr", IQR[column])
        setattr(stat, f"{column.lower()}_lb", lower_bound[column])
        setattr(stat, f"{column.lower()}_ub", upper_bound[column])

    stat.iqr_outliers = size - len(df)
    stat.outliers = stat.threshold_outliers + stat.iqr_outliers
    stat.valid = len(df)

    assert stat.total == stat.missing + stat.duplicate + stat.outliers + stat.valid

    return (station, df, stat)

In [6]:
data: dict[str, DataFrame] = {x: (y, z) for x, y, z in mp_exec(task, iter_files(), unorder=True)}

stats: list[Stat] = []

for station, values in data.items():
    df, stat = values
    print(f"Station : {station}, data : {df.shape}")
    stats.append(stat)
    df.to_csv(OUTPUT_PATH / f"{station}.csv", index=False)

Processing : ป่าสัก3สถานี แก่งคอย 2558-2563
Processing : ยม สามง่าม 2558-2563
Processing : ยม โพทะเล 2558-2563
Processing : ยม สุโขทัย 2558-2563
Processing : แม่น้ำท่าจีน บางเลน 2558-2563
Processing : แม่น้ำท่าจีน กระทุ่มแบน 2558-2563
Processing : ป่าสัก3สถานี นครหลวง 2558-2563
Processing : ป่าสัก3สถานี เสาไห้ 2558-2563
Processing : วัง เกาะคา 2558-2563
Processing : แม่น้ำท่าจีน สองพี่น้อง 2558-2563
Processing : แม่น้ำท่าจีน นครชัยศรี 2558-2563
Processing : แม่น้ำท่าจีน หันคา 2558-2563
Processing : แม่น้ำท่าจีน สุพรรณบุรี 2558-2563
Processing : แม่น้ำท่าจีน สามชุก 2558-2563
Processing : ปิง เชียงใหม่ 2558-2563
Processing : ปิง กำแพงเพชร 2558-2563
Processing : น่าน อุตรดิตถ์ 2558-2563


  df = read_csv(file, parse_dates=["Datetime"])


Processing : น่าน น่าน 2558-2563
Processing : น่าน พิษณุโลก 2558-2563
Station : ยม สุโขทัย 2558-2563, data : (5736, 5)
Station : แม่น้ำท่าจีน นครชัยศรี 2558-2563, data : (38656, 5)
Station : วัง เกาะคา 2558-2563, data : (39386, 5)
Station : แม่น้ำท่าจีน หันคา 2558-2563, data : (41256, 5)
Station : ยม โพทะเล 2558-2563, data : (30122, 5)
Station : ป่าสัก3สถานี เสาไห้ 2558-2563, data : (52813, 5)
Station : แม่น้ำท่าจีน สองพี่น้อง 2558-2563, data : (52552, 5)
Station : ป่าสัก3สถานี นครหลวง 2558-2563, data : (64976, 5)
Station : แม่น้ำท่าจีน กระทุ่มแบน 2558-2563, data : (28329, 5)
Station : แม่น้ำท่าจีน สุพรรณบุรี 2558-2563, data : (64990, 5)
Station : แม่น้ำท่าจีน สามชุก 2558-2563, data : (6793, 5)
Station : ยม สามง่าม 2558-2563, data : (49549, 5)
Station : ป่าสัก3สถานี แก่งคอย 2558-2563, data : (45771, 5)
Station : ปิง เชียงใหม่ 2558-2563, data : (38819, 5)
Station : ปิง กำแพงเพชร 2558-2563, data : (50768, 5)
Station : แม่น้ำท่าจีน บางเลน 2558-2563, data : (63101, 5)
Station : น่าน อุตรดิ

In [7]:
stats_df = DataFrame([s.__dict__ for s in stats])
stats_df.set_index("station", inplace=True)
stats_df.sort_values("valid", ascending=False, inplace=True)
stats_df.to_csv(OUTPUT_STAT_PATH / "stat.csv", index=True)

display(stats_df[["total", "missing", "duplicate", "outliers", "threshold_outliers", "iqr_outliers", "valid"]])

Unnamed: 0_level_0,total,missing,duplicate,outliers,threshold_outliers,iqr_outliers,valid
station,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
แม่น้ำท่าจีน สุพรรณบุรี 2558-2563,77994,8473,1408,3123,0,3123,64990
ป่าสัก3สถานี นครหลวง 2558-2563,85184,10056,502,9650,0,9650,64976
แม่น้ำท่าจีน บางเลน 2558-2563,98746,20740,1020,13885,0,13885,63101
น่าน พิษณุโลก 2558-2563,88265,8125,6383,13399,0,13399,60358
ป่าสัก3สถานี เสาไห้ 2558-2563,70819,2343,5285,10378,1991,8387,52813
แม่น้ำท่าจีน สองพี่น้อง 2558-2563,74837,6647,3460,12178,161,12017,52552
น่าน อุตรดิตถ์ 2558-2563,96165,29075,1685,13231,1100,12131,52174
น่าน น่าน 2558-2563,94835,32221,1218,10073,0,10073,51323
ปิง กำแพงเพชร 2558-2563,82788,20834,2119,9067,2,9065,50768
ยม สามง่าม 2558-2563,77859,3830,6076,18404,1150,17254,49549


In [8]:
display(stats_df[[f"{x}_{y}" for y in ["iqr", "lb", "up"] for x in ["ph", "ec", "temp", "do"]]].round(3))

Unnamed: 0_level_0,ph_iqr,ec_iqr,temp_iqr,do_iqr,ph_lb,ec_lb,temp_lb,do_lb,ph_up,ec_up,temp_up,do_up
station,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
แม่น้ำท่าจีน สุพรรณบุรี 2558-2563,2.7,127.2,2.632,2.7,0.95,63.9,25.52,-1.75,0.0,0.0,0.0,0.0
ป่าสัก3สถานี นครหลวง 2558-2563,0.8,107.375,2.6,2.0,5.3,122.062,25.5,-0.2,0.0,0.0,0.0,0.0
แม่น้ำท่าจีน บางเลน 2558-2563,0.7,193.1,6.89,2.3,5.15,-70.05,14.376,-2.15,0.0,0.0,0.0,0.0
น่าน พิษณุโลก 2558-2563,1.3,37.0,2.6,1.7,3.95,99.1,22.7,2.55,0.0,0.0,0.0,0.0
ป่าสัก3สถานี เสาไห้ 2558-2563,1.29,129.7,2.5,2.78,5.865,152.55,25.45,-0.35,0.0,0.0,0.0,0.0
แม่น้ำท่าจีน สองพี่น้อง 2558-2563,1.08,127.9,2.1,1.1,4.18,166.85,26.35,-0.25,0.0,0.0,0.0,0.0
น่าน อุตรดิตถ์ 2558-2563,2.148,89.5,3.6,1.3,1.93,45.15,20.6,3.55,0.0,0.0,0.0,0.0
น่าน น่าน 2558-2563,0.7,59.525,4.2,1.1,6.15,113.312,20.4,4.85,0.0,0.0,0.0,0.0
ปิง กำแพงเพชร 2558-2563,1.1,40.9,2.4,0.9,4.85,161.75,24.6,4.55,0.0,0.0,0.0,0.0
ยม สามง่าม 2558-2563,0.6,67.039,4.4,2.4,5.5,118.241,13.8,-0.6,0.0,0.0,0.0,0.0
