In [2]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# refine_iot_occupancy.py — RAW → REFINED (15 min, UTC, QA flags) + upload MinIO

from pathlib import Path
import argparse, os, sys, json
import numpy as np
import pandas as pd

# ===== MinIO/S3 =====
def get_s3_client(endpoint, access, secret, secure):
    import boto3
    from botocore.config import Config
    return boto3.client(
        "s3",
        endpoint_url=endpoint,
        aws_access_key_id=access,
        aws_secret_access_key=secret,
        use_ssl=bool(secure),
        verify=bool(secure),
        region_name="us-east-1",
        config=Config(signature_version="s3v4"),
    )

def ensure_bucket(s3, bucket):
    import botocore
    try:
        s3.head_bucket(Bucket=bucket)
    except botocore.exceptions.ClientError:
        s3.create_bucket(Bucket=bucket)

def s3_upload_file(s3, bucket, local_path:Path, key:str):
    s3.upload_file(str(local_path), bucket, key)

# ===== Helpers =====
def _read_csv(p:Path):
    if not p.exists(): raise FileNotFoundError(p)
    return pd.read_csv(p)

def _std_index(df, tcol="ts", freq="15min"):
    df = df.copy()
    df[tcol] = pd.to_datetime(df[tcol], utc=True)
    df = df.drop_duplicates(subset=[tcol]).set_index(tcol).sort_index()
    rng = pd.date_range(df.index.min(), df.index.max(), freq=freq, tz="UTC")
    return df.reindex(rng)

def _clip_series(s, lo, hi):
    return s.clip(lower=lo, upper=hi)

def _qa_fill(s, limit=4):
    x = s.astype("float64")
    x = x.interpolate("time", limit=limit)
    x = x.ffill(limit=limit).bfill(limit=limit)
    return x

def refine_iot(raw_csv:Path):
    df = _read_csv(raw_csv)
    idx = _std_index(df, "ts", freq="15min")

    cols = {
        "temp_int_c": (-5, 50),
        "rh_int_pct": (0, 100),
        "co2_ppm": (350, 5000),
        "pir_bin": (0, 1),
        "power_total_kw": (0, 50),
    }
    out = pd.DataFrame(index=idx.index)
    if "zone_id" in idx.columns:
        out["zone_id"] = idx["zone_id"].ffill().bfill()
    else:
        out["zone_id"] = "brick:Room_101"

    qa = {}
    for c,(lo,hi) in cols.items():
        if c not in idx.columns: continue
        raw = idx[c]
        clipped = _clip_series(raw, lo, hi)
        filled = _qa_fill(clipped)
        out[c] = filled
        qa[c] = {
            "n_raw_null": int(raw.isna().sum()),
            "n_clipped": int(((raw<lo)|(raw>hi)).sum()),
            "n_final_null": int(out[c].isna().sum()),
        }

    if "pir_bin" in out.columns:
        out["pir_bin"] = out["pir_bin"].round().clip(0,1).astype("Int64")

    out = out.reset_index().rename(columns={"index":"ts"})
    return out, qa

def refine_occ(raw_csv:Path):
    df = _read_csv(raw_csv)
    idx = _std_index(df, "ts", freq="15min")
    out = pd.DataFrame(index=idx.index)

    p = idx["presence"] if "presence" in idx.columns else pd.Series(index=idx.index, dtype="float64")
    p = _qa_fill(p).round().clip(0,1).astype("Int64")
    out["presence"] = p

    if "level" in idx.columns:
        level = idx["level"].ffill().fillna("low")
    else:
        level = pd.Series(["low"]*len(idx.index), index=idx.index, dtype=object)
    out["level"] = level

    out = out.reset_index().rename(columns={"index":"ts"})
    qa = {
        "presence_null_final": int(out["presence"].isna().sum()),
        "level_null_final": int(out["level"].isna().sum()),
    }
    return out, qa

def fuse_iot_occ(iot_df, occ_df):
    return pd.merge(iot_df, occ_df, on="ts", how="left")

# ===== Main =====
def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--month", type=str, default="2025-03")
    ap.add_argument("--raw_base", type=str, default="~/DTE/jne_project/raw")
    ap.add_argument("--ref_base", type=str, default="~/DTE/jne_project/refined")
    # MinIO
    ap.add_argument("--endpoint", type=str, default=os.environ.get("MINIO_ENDPOINT", "http://192.168.0.173:9000"))
    ap.add_argument("--access",   type=str, default=os.environ.get("MINIO_ROOT_USER", "minioadmin"))
    ap.add_argument("--secret",   type=str, default=os.environ.get("MINIO_ROOT_PASSWORD", "minioadmin"))
    ap.add_argument("--bucket",   type=str, default="refined")
    ap.add_argument("--prefix",   type=str, default="jne_project/refined")
    ap.add_argument("--secure",   action="store_true")
    ap.add_argument("--no-upload", action="store_true")
    args, _ = ap.parse_known_args()

    raw_base = Path(args.raw_base).expanduser().resolve()
    ref_base = Path(args.ref_base).expanduser().resolve()
    month = args.month

    raw_iot = raw_base/"sensors"/month/"zone_101_sensors.csv"
    raw_occ = raw_base/"occupancy"/month/"occupancy.csv"

    out_iot_dir = ref_base/"iot"/month
    out_occ_dir = ref_base/"occupancy"/month
    out_fus_dir = ref_base/"fused"/month
    out_meta    = ref_base/"meta"/month
    for d in (out_iot_dir, out_occ_dir, out_fus_dir, out_meta):
        d.mkdir(parents=True, exist_ok=True)

    iot_df, iot_qa = refine_iot(raw_iot)
    occ_df, occ_qa = refine_occ(raw_occ)
    fused = fuse_iot_occ(iot_df, occ_df)

    p_iot = out_iot_dir/"sensors_refined.csv"
    p_occ = out_occ_dir/"occupancy_refined.csv"
    p_fus = out_fus_dir/"iot_occupancy_15min.csv"
    iot_df.to_csv(p_iot, index=False)
    occ_df.to_csv(p_occ, index=False)
    fused.to_csv(p_fus, index=False)

    manifest = {
        "version": "1.0",
        "month": month,
        "inputs": {"iot_raw": str(raw_iot), "occupancy_raw": str(raw_occ)},
        "outputs": {
            "iot_refined_csv": str(p_iot),
            "occupancy_refined_csv": str(p_occ),
            "fused_csv": str(p_fus),
        },
        "qa": {"iot": iot_qa, "occupancy": occ_qa},
        "transform": {
            "tz": "UTC", "freq": "15min", "interpolation_limit": 4,
            "clips": {"temp_int_c":[-5,50],"rh_int_pct":[0,100],"co2_ppm":[350,5000],"pir_bin":[0,1],"power_total_kw":[0,50]}
        }
    }
    p_man = out_meta/"refined_manifest_iot_occupancy.json"
    p_man.write_text(json.dumps(manifest, indent=2), encoding="utf-8")

    if not args.no_upload:
        try:
            s3 = get_s3_client(args.endpoint, args.access, args.secret, args.secure)
            ensure_bucket(s3, args.bucket)
            root = f"{args.prefix}".strip("/")
            for lp, key in [
                (p_iot, f"{root}/iot/{month}/sensors_refined.csv"),
                (p_occ, f"{root}/occupancy/{month}/occupancy_refined.csv"),
                (p_fus, f"{root}/fused/{month}/iot_occupancy_15min.csv"),
                (p_man, f"{root}/meta/{month}/refined_manifest_iot_occupancy.json"),
            ]:
                s3_upload_file(s3, args.bucket, lp, key)
        except Exception as e:
            print("ERREUR: upload MinIO:", e, file=sys.stderr); sys.exit(3)

    print("OK — refined")
    print("local:", p_iot, "|", p_occ, "|", p_fus, "|", p_man)
    if not args.no_upload:
        print("minio:", f"s3://{args.bucket}/{args.prefix}/{{iot,occupancy,fused,meta}}/{month}/...")

if __name__ == "__main__":
    main()


OK — refined
local: /home/amina/DTE/jne_project/refined/iot/2025-03/sensors_refined.csv | /home/amina/DTE/jne_project/refined/occupancy/2025-03/occupancy_refined.csv | /home/amina/DTE/jne_project/refined/fused/2025-03/iot_occupancy_15min.csv | /home/amina/DTE/jne_project/refined/meta/2025-03/refined_manifest_iot_occupancy.json
minio: s3://refined/jne_project/refined/{iot,occupancy,fused,meta}/2025-03/...
