In [None]:
import csv 
import json
import uuid  
from  datetime import datetime 
from pathlib import Path



In [1]:
INPUT_DIR = Path("data/stream")
OUTPUT_DIR= Path("data/stream")
REQUIRED_COLUMNS = [ 
    "event_id",
    "sector",
    "entity_id",
    "activity_value",
    "activity_unit",
    "timestamp",
    "metadata"
]
SECTOR_RULES= { 
    "transport": { 
       "allowed_units": {"km"},
        "min_value": 0.0,
        "max_value": 5.0   
    }, 
    "electricity" : {
        "allowed_units" : {"kwh"},
        "min_value" : 0.0,
        "max_value" : 50.0
    },
    "industry": { 
        "allowed_units" :  {"kWh"},
        "min_value": 0.0,
        "max_value": 500.0  
    }
}
OUTPUT_DIR.mkdir(parents=True,exist_ok=True)


NameError: name 'Path' is not defined

In [None]:
def is_valid_id(value : str)->bool : 
    try : 
        uuid.UUID(value)
        return True
    except Exception : 
        return False 
def is_valid_timestamp(value : str)->bool : 
    try : 
        datetime.fromisoformat(value.replace("Z","+00:00"))
        return True
    except Exception : 
        return False 
def is_valid_json(value :  str) -> bool : 
    try : 
        json.loads(value)
        return True 
    except Exception : 
        return False 
    

In [2]:
def file_clean(input_path : str)->None : 
    output_path = OUTPUT_DIR/ input_path.name
    seen_keys = set()
    with open(input_path, newline="", encoding="utf-8") as fin, \
         open(output_path, "w", newline="", encoding="utf-8") as fout:

        reader = csv.DictReader(fin)
        writer = csv.DictWriter(fout, fieldnames=REQUIRED_COLUMNS)
        writer.writeheader()

        for row in reader:
            if not all(col in row for col in REQUIRED_COLUMNS):
                continue
            if not is_valid_id(row["event_id"]):
                continue
            if not is_valid_timestamp(row["timestamp"]):
                continue
            try:
                value = float(row["activity_value"])
            except Exception:
                continue
            if value < 0:
                continue
            if not is_valid_json(row["metadata"]):
                continue
            sector = row["sector"]
            rules = SECTOR_RULES.get(sector)
            if not rules:
                continue
            if row["activity_unit"] not in rules["allowed_units"]:
                continue

            if not (rules["min_value"] <= value <= rules["max_value"]):
                continue
            dedup_key = (sector, row["entity_id"], row["timestamp"])
            if dedup_key in seen_keys:
                continue
            seen_keys.add(dedup_key)
            writer.writerow({
                "event_id": row["event_id"],
                "sector": sector,
                "entity_id": row["entity_id"],
                "activity_value": value,
                "activity_unit": row["activity_unit"],
                "timestamp": row["timestamp"],
                "metadata": row["metadata"]
            })


In [4]:
def main(): 
    for csv_file in INPUT_DIR.glob("*.csv") : 
        print(f"Cleaning: { csv_file.name}")
        file_clean(csv_file)

    print("Cleaning Complete!")
if __name__ == "__main__ " : 
    main()