## Import libraries


In [None]:
import os
from glob import glob

import pandas as pd
from joblib import Parallel, delayed
from tqdm import tqdm

WORK_DIR = "/beegfs/halder/GITHUB/RESEARCH/crop-yield-forecasting-germany/"
os.chdir(WORK_DIR)

In [None]:
CROP = "silage_maize"

## Define the paths


In [None]:
data_dir = os.path.join(WORK_DIR, "data", "interim")
climate_dir = os.path.join(data_dir, "climate", CROP)
rs_dir = os.path.join(data_dir, "remote_sensing", CROP, "combined")
out_dir = os.path.join(WORK_DIR, "data", "processed", CROP, "timeseries")

os.makedirs(out_dir, exist_ok=True)

## Load the valid indices


In [None]:
# Read the valid indices
split_df = pd.read_csv(
    os.path.join(WORK_DIR, "data", "processed", CROP, "train_test_val_split.csv")
)

valid_indices = (
    split_df.groupby(["NUTS_ID", "year"])
    .apply(lambda x: x.to_dict("records"))
    .to_dict()
)

print("Number of valid indices:", len(valid_indices))

## Process the timeseries


In [None]:
def process_grouped_files(nuts_id, year, climate_dir, rs_dir, out_dir):
    """
    Process all grid-level climate CSVs belonging to a given NUTS_ID and year.
    For each grid_id:
        - Read climate CSV (grid-year)
        - Read RS CSV (grid only)
        - Merge them on 'date'
    Concatenate all merged grids into one dataframe and save as parquet.
    """

    try:

        # ---- Build input paths ----
        c_path = os.path.join(climate_dir, f"{nuts_id}_{year}.csv")
        r_path = os.path.join(rs_dir, f"{nuts_id}.csv")

        if not os.path.exists(c_path):
            return f"Missing climate file: {c_path}"

        if not os.path.exists(r_path):
            return f"Missing RS file: {r_path}"

        # ---- Read climate ----
        c_df = pd.read_csv(c_path, parse_dates=["date"])

        # ---- Read RS ----
        r_df = pd.read_csv(r_path, parse_dates=["date"])

        # ---- Merge ----
        df_merged = pd.merge(c_df, r_df, on="date", how="inner")

        if df_merged.empty:
            return f"No data for {nuts_id}_{year}"

        # ---- Remove leap day (Feb 29) ----
        df_merged = df_merged[
            ~((df_merged["date"].dt.month == 2) & (df_merged["date"].dt.day == 29))
        ]

        # ---- Downcast float64 â†’ float32 ----
        float_cols = df_merged.select_dtypes(include=["float64"]).columns
        df_merged[float_cols] = df_merged[float_cols].astype("float32")

        # ---- Save parquet ----
        out_path = os.path.join(out_dir, f"{nuts_id}_{year}.parquet")
        df_merged.to_parquet(out_path, index=False)

        return f"Saved: {nuts_id}_{year}"

    except Exception as e:
        return f"Error in group {nuts_id}_{year}: {e}"


def convert_grouped_to_parquet_parallel(climate_dir, rs_dir, out_dir, n_jobs=-1):
    """
    Parallel runner for process_grouped_files().
    Expects valid_indices to be a dict-like:
        valid_indices[(nuts_id, year)] = [grid_ids...]
    """

    # All (NUTS_ID, year) groups to process
    grouped_keys = list(valid_indices.keys())

    print(f"ðŸš€ Starting parallel processing on {len(grouped_keys)} NUTS-year groups...")

    results = Parallel(n_jobs=n_jobs)(
        delayed(process_grouped_files)(nuts_id, year, climate_dir, rs_dir, out_dir)
        for (nuts_id, year) in tqdm(grouped_keys)
    )

    # Print summary
    for r in results:
        print(r)

    print("âœ… Done!")

In [None]:
# Run parallel conversion
convert_grouped_to_parquet_parallel(
    climate_dir=climate_dir, rs_dir=rs_dir, out_dir=out_dir, n_jobs=70
)

## Check all files has same number of rows


In [None]:
parquet_file_paths = glob(os.path.join(out_dir, "*.parquet"))
n_rows = len(pd.read_parquet(parquet_file_paths[0]))
error_files = []
for path in tqdm(parquet_file_paths):
    f_name = os.path.basename(path)
    if len(pd.read_parquet(path)) != n_rows:
        error_files.append(f_name)
        print("Error files:", f_name, f_name.shape)