In [22]:
import os

import geopandas as gpd
from tqdm.autonotebook import tqdm

In [34]:
AGGREGATE_RESULT_FILE_PATH = "outputs/manitoba_rcm_ard/collection.parquet"
AGGREGATE_SEARCH_DIR_PATH = "./outputs/manitoba_rcm_ard/IGEO7"

# distinct dataframes with similar variables to aggregate together
AGGREGATE_ZONE_VARIABLES = ["rr", "rl"]
# columns used as ID to merge corresponding zones
# (note: if datetime-aware, should include a temporal component as well)
AGGREGATE_ZONE_ID_COLUMNS = ["dggrid_ISEA7H", "day"]
# any column renaming (replace) to perform prior to aggregation and merging
# (note: below extended columns are affected by this change applied before)
AGGREGATE_RENAME_COLUMNS = {
    "cell_": "",
}
# columns to extend with the relevant above variable prefix (others merged as is / duplicates)
AGGREGATE_EXTEND_COLUMNS = [
    "minimum",
    "maximum",
    "mean",
    "median",
    "stddev",
]

agg_zone_data = []
agg_walk_progress = tqdm(
    os.walk(AGGREGATE_SEARCH_DIR_PATH),
    desc="Aggregating zone data",
)
for root_dir, sub_dirs, _ in agg_walk_progress:
    if sub_dirs != AGGREGATE_ZONE_VARIABLES:
        continue

    merge_zone_data = None
    for sub_dir in sub_dirs:
        cur = root_dir.replace(AGGREGATE_SEARCH_DIR_PATH, "").strip("/")
        agg_walk_progress.set_postfix(current=cur)
        file_names = os.listdir(str(os.path.join(root_dir, sub_dir)))
        for file_name in file_names:
            if not file_name.endswith(".parquet"):
                continue
            try:
                file_path = os.path.join(root_dir, sub_dir, file_name)
                zone_data_var = gpd.read_parquet(file_path)
                zone_data_col_rename = {
                    col: col.replace(old, new)
                    for old, new in AGGREGATE_RENAME_COLUMNS.items()
                    for col in zone_data_var.columns
                }
                zone_data_var = zone_data_var.rename(columns=zone_data_col_rename)
                zone_data_col_merge = {
                    col: f"{sub_dir}_{col}"
                    for col in AGGREGATE_EXTEND_COLUMNS
                }
                zone_data_var = zone_data_var.rename(columns=zone_data_col_merge)
                if merge_zone_data is None:
                    merge_zone_data = zone_data_var
                else:
                    merge_zone_cols = AGGREGATE_ZONE_ID_COLUMNS + list(zone_data_col_merge.values())
                    merge_zone_data = merge_zone_data.merge(
                        zone_data_var[merge_zone_cols],
                        on=AGGREGATE_ZONE_ID_COLUMNS,
                        how="outer",
                        suffixes=("", ""),  # raise if something went wrong, don't do silent fixes
                    )
                break  # in case many were found, ignore others (cannot merge anyway / no priority)
            except Exception as exc:
                err_msg = f"Error while processing [{file_path}]: {exc}"
                raise Exception(err_msg) from exc

    agg_zone_data.append(merge_zone_data)
    sub_dirs[:] = []  # don't recurse further

agg_zone_data = gpd.pd.concat(agg_zone_data, ignore_index=True)
agg_zone_data = agg_zone_data.sort_values(
    by=["resolution", "datetime"],
    ascending=[True, False]
)
agg_zone_data.to_parquet(AGGREGATE_RESULT_FILE_PATH)

Aggregating zone data: 0it [00:00, ?it/s]

Exception: Error while processing [./outputs/manitoba_rcm_ard/IGEO7/L7/2025-09-01/rl/RCM1_OK3555385_PK3778047_1_SC30MCPA_20250901_000600_RL.parquet]: "['day'] not in index"