In [2]:
import pandas as pd
import pyarrow.parquet as pq
import yaml
from pathlib import Path
import glob
import logging
import os


# Logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")


# Load YAML config
def load_tonnage_config(yaml_path):
    with open(yaml_path, "r") as f:
        cfg = yaml.safe_load(f)
    return cfg.get("case_def_vars", {})


# Build daily parquet directory based on config
def build_daily_input_dir(cfg):
    folder_type = cfg.get("folder_type")
    tonnage = cfg.get("tonnage")

    PROJECT_ROOT = Path.cwd().parent.parent
    daily_dir = PROJECT_ROOT / "data" / "curated" / "enriched" / "daily" / folder_type / tonnage

    if not daily_dir.exists():
        raise FileNotFoundError(f"Daily parquet directory not found: {daily_dir}")

    return daily_dir


# Build monthly output directory
def build_monthly_output_dir(cfg):
    folder_type = cfg.get("folder_type")
    tonnage = cfg.get("tonnage")

    PROJECT_ROOT = Path.cwd().parent.parent
    monthly_dir = PROJECT_ROOT / "data" / "curated" / "enriched" / "monthly" / folder_type / tonnage
    monthly_dir.mkdir(parents=True, exist_ok=True)

    return monthly_dir


# Process daily → monthly aggregation
def combine_daily_into_monthly(daily_dir, monthly_dir, tonnage, folder_type):

    parquet_files = sorted(glob.glob(str(daily_dir / "*.parquet")))

    if not parquet_files:
        raise FileNotFoundError(f"No daily parquet files found in {daily_dir}")

    logging.info(f"Found {len(parquet_files)} daily parquet files.")

    # Group files by Year-Month
    monthly_groups = {}

    for fpath in parquet_files:
        fname = Path(fpath).name   # e.g., mav28_can_2022_10_31.parquet
        parts = fname.replace(".parquet", "").split("_")

        # Expected: tonnage, folder_type, YYYY, MM, DD
        year = parts[-3]
        month = parts[-2]

        ym = f"{year}_{month}"
        monthly_groups.setdefault(ym, []).append(fpath)

    logging.info(f"Monthly groups detected: {list(monthly_groups.keys())}")

    # For each month → append and write
    for ym, files in monthly_groups.items():
        df_list = []

        for fp in files:
            df = pd.read_parquet(fp)
            df_list.append(df)

        final_df = pd.concat(df_list, ignore_index=True)

        out_name = f"{tonnage}_{folder_type}_appended_{ym}.parquet"
        out_path = monthly_dir / out_name

        final_df.to_parquet(out_path, index=False)
        logging.info(f"Saved monthly parquet → {out_path}")


# Process one config file
def process_monthly_aggregation(yaml_path):
    cfg = load_tonnage_config(yaml_path)
    tonnage = cfg.get("tonnage")
    folder_type = cfg.get("folder_type")

    logging.info(f"Starting MONTHLY aggregation for {tonnage}/{folder_type}")

    daily_dir = build_daily_input_dir(cfg)
    monthly_dir = build_monthly_output_dir(cfg)

    combine_daily_into_monthly(daily_dir, monthly_dir, tonnage, folder_type)

    logging.info(f"Completed MONTHLY aggregation for {tonnage}/{folder_type}\n")


# Run for all YAML configs
if __name__ == "__main__":
    configs_glob = "../../configs/enrich_data_monthly_configs/*.yml"
    yaml_files = sorted(glob.glob(configs_glob))

    for y in yaml_files:
        try:
            process_monthly_aggregation(y)
        except Exception as e:
            logging.error(f"Failed to process config {y}: {e}")

2025-11-15 23:36:18,394 - INFO - Starting MONTHLY aggregation for const/can
2025-11-15 23:36:18,443 - INFO - Found 1034 daily parquet files.
2025-11-15 23:36:18,448 - INFO - Monthly groups detected: ['2023_01', '2023_02', '2023_03', '2023_04', '2023_05', '2023_06', '2023_07', '2023_08', '2023_09', '2023_10', '2023_11', '2023_12', '2024_01', '2024_02', '2024_03', '2024_04', '2024_05', '2024_06', '2024_07', '2024_08', '2024_09', '2024_10', '2024_11', '2024_12', '2025_01', '2025_02', '2025_03', '2025_04', '2025_05', '2025_06', '2025_07', '2025_08', '2025_09', '2025_10']
2025-11-15 23:36:19,335 - INFO - Saved monthly parquet → d:\Data Science Projects\Machine Learning\fuel-cost-optimization\data\curated\enriched\monthly\can\const\const_can_appended_2023_01.parquet
2025-11-15 23:36:19,967 - INFO - Saved monthly parquet → d:\Data Science Projects\Machine Learning\fuel-cost-optimization\data\curated\enriched\monthly\can\const\const_can_appended_2023_02.parquet
2025-11-15 23:36:20,666 - INFO -

In [3]:
import pandas as pd

# Read a Parquet file
p_df = pd.read_parquet("../../data/curated/enriched/monthly/can/mav28/mav28_can_appended_2023_01.parquet")

In [4]:
p_df.head()

Unnamed: 0,timestamp,vehicleId,reg_no,vin_no,model,vehicle_type,tonnage,tripId,EngineRPM,Torque,...,DPF_LampCommand,OBDLampCommand,EngineAmberLampCommand,AirFilterClogging,WaterInFuelIndicator,EnginePerformanceBiasLevel,EngineBrakeActiveLampCommand,EngineCoolantLevelLowLampCommand,EngineCoolantTempHighLampCommand,EngineRedStopLampCommand
0,2023-01-01 09:34:00,V00151,MH96PU4691,TUB74ST7FW1ENW6TN,Tata Signa 2825.TK,Truck,MAV 28,V00151_TR000001,2012,192,...,Off,Normal,Normal,Normal,NotDetected,Normal,OFF,Normal,Normal,Normal
1,2023-01-01 09:35:00,V00151,MH96PU4691,TUB74ST7FW1ENW6TN,Tata Signa 2825.TK,Truck,MAV 28,V00151_TR000001,1353,104,...,Off,Normal,Normal,Normal,NotDetected,Normal,OFF,Normal,Normal,Normal
2,2023-01-01 09:36:00,V00151,MH96PU4691,TUB74ST7FW1ENW6TN,Tata Signa 2825.TK,Truck,MAV 28,V00151_TR000001,2047,185,...,Off,Normal,Normal,Normal,NotDetected,Economy Mode,OFF,Normal,Normal,Normal
3,2023-01-01 09:37:00,V00151,MH96PU4691,TUB74ST7FW1ENW6TN,Tata Signa 2825.TK,Truck,MAV 28,V00151_TR000001,2096,188,...,Off,Normal,Normal,Normal,NotDetected,Economy Mode,OFF,Normal,Normal,Normal
4,2023-01-01 09:38:00,V00151,MH96PU4691,TUB74ST7FW1ENW6TN,Tata Signa 2825.TK,Truck,MAV 28,V00151_TR000001,1721,122,...,Off,Normal,Normal,Normal,NotDetected,Normal,OFF,Normal,Normal,Normal


In [5]:
p_df.tail()

Unnamed: 0,timestamp,vehicleId,reg_no,vin_no,model,vehicle_type,tonnage,tripId,EngineRPM,Torque,...,DPF_LampCommand,OBDLampCommand,EngineAmberLampCommand,AirFilterClogging,WaterInFuelIndicator,EnginePerformanceBiasLevel,EngineBrakeActiveLampCommand,EngineCoolantLevelLowLampCommand,EngineCoolantTempHighLampCommand,EngineRedStopLampCommand
119645,2023-01-31 08:55:00,V00158,MH57NN4263,CD92UXGXHZ4G451R3,Tata Prima 2830.K,Truck,MAV 28,V00158_TR000095,1521,126,...,Off,Normal,Normal,Normal,NotDetected,Economy Mode,OFF,Normal,Normal,Normal
119646,2023-01-31 08:56:00,V00158,MH57NN4263,CD92UXGXHZ4G451R3,Tata Prima 2830.K,Truck,MAV 28,V00158_TR000095,1918,186,...,Off,Normal,Normal,Normal,NotDetected,Power Mode,OFF,Normal,Normal,Normal
119647,2023-01-31 08:57:00,V00158,MH57NN4263,CD92UXGXHZ4G451R3,Tata Prima 2830.K,Truck,MAV 28,V00158_TR000095,1833,129,...,Off,Normal,Normal,Normal,NotDetected,Normal,ON,Normal,Normal,Normal
119648,2023-01-31 08:58:00,V00158,MH57NN4263,CD92UXGXHZ4G451R3,Tata Prima 2830.K,Truck,MAV 28,V00158_TR000095,1395,145,...,Off,Normal,Normal,Normal,NotDetected,Economy Mode,OFF,Normal,Normal,Normal
119649,2023-01-31 08:59:00,V00158,MH57NN4263,CD92UXGXHZ4G451R3,Tata Prima 2830.K,Truck,MAV 28,V00158_TR000095,1767,125,...,Off,Normal,Normal,Normal,NotDetected,Economy Mode,OFF,Normal,Normal,Normal


In [6]:
p_df.shape

(119650, 30)