## Imports

In [None]:
import os
import re
from functools import reduce
from typing import List, Dict, Iterable, Optional, Tuple

import numpy as np
import pandas as pd

## Functions

### Helper functions for data preparation

In [None]:
# ===== 1. Read and format Excel files ===== #
def read_excel_file(input_path:str, input_file:str, usecols:Optional[List[str]] = None) -> pd.DataFrame:

    """
    Reads an Excel file.

    Optional: usecols allows you to specify which columns to keep from the Excel file.
    It accepts up to 3 columns.
    These columns must correspond to Rainfall, Effective rainfall, and Groundwater level values.
    """

    df = pd.read_excel(os.path.join(input_path, input_file))
    
    if 'Date' in df.columns:
        df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
        df = df.sort_values('Date').reset_index(drop=True)

    if usecols:
        keep = ['Date'] + [c for c in usecols if c in df.columns]
        df = df[keep]

    return df

# ===== 2. Data import ===== #
def data_importation(input_path, input_file, input_usecols):

    """
    Uses the read_excel_file function.
    Returns the DataFrame corresponding to the imported Excel file, while renaming the columns defined in usecols.

    Output DataFrame -> Values of the predictive external time series.
    """

    hm_df = read_excel_file(input_path, input_file, input_usecols)

    # Rename historical columns to standard names
    rename_map = {old: new for old, new in zip(input_usecols, ['R', 'ER', 'GWL'][:len(input_usecols)])}
    hm_df = hm_df.rename(columns=rename_map)

    return hm_df

# ===== 2BIS. Data import for Séchilienne ===== #
def data_importation_Sech(input_path, input_file, input_usecols):

    hm_df = read_excel_file(input_path, input_file, input_usecols)

    # Rename historical columns to standard names
    rename_map = {old: new for old, new in zip(input_usecols, ['R', 'ER', 'WLI', 'WLM'][:len(input_usecols)])}
    hm_df = hm_df.rename(columns=rename_map)

    return hm_df

### Helper functions to compute rainfall (R) and effective rainfall (ER) features

In [None]:
# ===== FEATURES : Rolling features ===== #
def _rolling_features(df, cols, T_periods):

    out = {}
    for col in cols:
        s = df[col]
        for T in T_periods:
            out[f'{col}_{T}'] = s.rolling(window=T).sum()
            out[f'{col}_max{T}'] = s.rolling(window=T).max()

    return out


# ===== FEATURES : Saturation indices ===== #
def _staturation_indices(df, cols, short_terms=(3, 5, 10, 20, 30, 40, 60), long_terms=(30, 60, 90),
                        rain_threshold=3.0, eps=1e-6):

    out = {}
    for col in cols:
        filtered = df[col].where(df[col] >= rain_threshold, 0.0)
        
        for sT in short_terms:
            short_sum = filtered.rolling(window=sT, min_periods=1).sum()
            for lT in long_terms:
                long_sum = filtered.shift(sT).rolling(window=lT, min_periods=1).sum()

                ratio = short_sum / long_sum.mask(long_sum.abs() < eps, 1.0)
                out[f'{col}_sat{sT}/{lT}'] = ratio

    return out


# ===== FEATURES : Anomaly indices ===== #
def _anomaly_indices(df, cols, T_periods):

    out = {}
    for col in cols:
        clim_mean = df.loc[df[col] > 0, col].mean()

        for T in T_periods:
            out[f'{col}_anomaly{T}'] = df[col].rolling(window=T).mean() / clim_mean

    return out


# ===== Event ID definition ===== #
def _compute_event_ids(series, dry_tolerance=2):

    """
    Event series with a tolerance of 'dry_tolerance' internal dry days.
    Returns a np.ndarray of IDs (0 = no event).
    """

    ids = np.zeros(len(series), dtype=int)
    ev_id = 0
    dry = 0

    for i, v in enumerate(series.values):
        if v > 0:
            if ev_id == 0:
                ev_id = (ids.max() + 1) if ids.max() > 0 else 1
            dry = 0
            ids[i] = ev_id
        else:
            if ev_id == 0:
                ids[i] = ev_id
            else:
                dry += 1
                if dry <= dry_tolerance:
                    ids[i] = ev_id
                else:
                    ev_id = 0
                    dry = 0
                    ids[i] = 0

    return ids


# ===== Event strength calculation ===== #
def _event_strength_for_col(df, col, dry_tolerance=2):

    """
    Computes, for a given column, an event-strength index:
    (event cumulative sum) / (mean cumulative sum of events with the same duration).
    """
    
    s = df[col]
    ev_ids = _compute_event_ids(s, dry_tolerance=dry_tolerance)

    # Cumulative sum & duration per event
    ev_df = pd.DataFrame({'id': ev_ids, 'val': s.values})
    ev_df = ev_df[ev_df['id'] != 0]
    if ev_df.empty:
        return pd.Series(np.zeros(len(df)), index=df.index)

    grp = ev_df.groupby('id')['val']
    cumuls = grp.sum()
    durations = ev_df.groupby('id').size()

    # Mean cumulative sums by duration
    tmp = pd.DataFrame({'Duration': durations, 'Cumul': cumuls})
    mean_cumul_by_duration = tmp.groupby('Duration')['Cumul'].mean().to_dict()

    # Map back to the original index
    ev_cumul = pd.Series(cumuls, name='cumul')
    ev_dur = pd.Series(durations, name='dur')

    ev_map_cumul = pd.Series(0.0, index=np.unique(ev_ids[ev_ids != 0]))
    ev_map_dur = pd.Series(0, index=np.unique(ev_ids[ev_ids != 0]))
    ev_map_cumul.loc[ev_cumul.index] = ev_cumul.values
    ev_map_dur.loc[ev_dur.index] = ev_dur.values

    # Build the index series
    idx_vals = []
    for eid in ev_ids:
        if eid == 0:
            idx_vals.append(np.nan)
        else:
            dur = int(ev_map_dur.loc[eid])
            cumul = float(ev_map_cumul.loc[eid])
            denom = mean_cumul_by_duration.get(dur, np.nan)
            idx_vals.append(cumul / denom if denom not in (0, np.nan) else np.nan)

    return pd.Series(idx_vals, index=df.index)


# ===== FEATURES : Event strength ===== #
def _event_strength_pipeline(df, cols, dry_tolerance=2):

    out = {}
    for col in cols:
        out[f'{col}_event_strength'] = _event_strength_for_col(df, col, dry_tolerance=dry_tolerance)

    return out

### Functions to compute R and ER features

In [None]:
# ===== Compute R/ER features =====
def rainfall_and_effective_rainfall_features_calcul(df, meteo_col_list, T_periods=(1, 2, 5, 10, 20, 30, 60, 90)):

    base_df = df[['Date'] + meteo_col_list].copy()
    
    # == 1. Rolling sums & max == #
    roll_dict = _rolling_features(df, meteo_col_list, T_periods)

    # == 2. Local saturation index == #
    sat_dict = _staturation_indices(df, meteo_col_list, short_terms=(3,5,10,20,30,40,60), long_terms=(30,60,90), rain_threshold=3.0)

    # == 3. Anomalies (recent mean / mean over rainy days) == #
    anom_dict = _anomaly_indices(df, meteo_col_list, T_periods)

    # == 4. Event-based indices == #
    evt_dict = _event_strength_pipeline(df, meteo_col_list, dry_tolerance=2.0)

    # == 5. Concatenate features == #
    feats = pd.concat([base_df, pd.DataFrame(roll_dict), pd.DataFrame(sat_dict), 
                       pd.DataFrame(anom_dict), pd.DataFrame(evt_dict)], axis=1)

    # == 6. Drop unnecessary columns == #
    # i. Remove counter-type features (rainy_days and dry_days)
    drop_cols = [c for c in feats.columns if ('rainy_days' in c or 'dry_days' in c)]
    if drop_cols:
        feats = feats.drop(columns=drop_cols)

    # ii. NaNs in saturation indices => 0 (neutral)
    sat_cols = [c for c in feats.columns if 'sat' in c]
    if sat_cols:
        feats[sat_cols] = feats[sat_cols].fillna(0)

    # iii. NaNs in event_strength => 0 (no event = neutral)
    evt_cols = [c for c in feats.columns if 'event_strength' in c]
    if evt_cols:
        feats[evt_cols] = feats[evt_cols].fillna(0)

    # iv. Remove raw meteorological columns
    feats = feats.drop(columns=meteo_col_list)
    R_ER_features_df = feats.copy()

    return R_ER_features_df

### Helper functions to compute groundwater level (GWL) features 

In [None]:
# ===== Helper: Lagged series ===== #
def _lag_blocks(s, shift):
    
    """Lagged series (shifted by 'shift' steps)."""
    return s.shift(shift)


# ===== Helper: Differences and rolling means ===== #
def _diff_mean_pipeline(s, lag_name, periods):

    """Computes differences and rolling means over multiple periods P."""
    out = {}
    for P in periods:
        out[f'{lag_name}_diff{P}'] = s.diff(P)
        out[f'{lag_name}_mean{P}'] = s.rolling(window=P).mean()

    return out


# ===== Helper: Rolling extrema ===== #
def _extrema_pipeline(s, lag_name, windows):

    """Computes rolling max/min over multiple window sizes."""
    out = {}
    for w in windows:
        out[f'{lag_name}_max{w}'] = s.rolling(window=w, min_periods=1).max()
        out[f'{lag_name}_min{w}'] = s.rolling(window=w, min_periods=1).min()

    return out


# ===== Helper: Deviation from global mean ===== #
def _global_mean_diff(s, lag_name, global_mean):

    """Deviation from the (constant) global mean."""
    out = {}
    out[f'{lag_name}_diff_Gmean'] = s - global_mean

    return out

### Function to compute GWL features

In [None]:
# ===== Compute GWL features =====
def gwl_features_calcul(df, hydro_col_list, shift_list=(0, 1, 5, 8, 20, 30), 
                        P_periods=(2, 5, 10, 20, 30, 60), W_periods=(5, 10, 20, 30, 60, 90)):

    base_df = [df[['Date']].copy()]

    for GWL_col in hydro_col_list:
        s = df[GWL_col]
        gmean = s.mean()

        feat_dict = {}
        for sh in shift_list:
            lag_s = _lag_blocks(s, sh)
            lag_name = f'{GWL_col}_lag{sh}'

            # The lagged series 
            feat_dict[lag_name] = lag_s

            # Feature blocks
            feat_dict.update(_diff_mean_pipeline(lag_s, lag_name, P_periods))
            feat_dict.update(_extrema_pipeline(lag_s, lag_name, W_periods))
            feat_dict.update(_global_mean_diff(lag_s, lag_name, gmean))

        # Concatenate
        base_df.append(pd.DataFrame(feat_dict, index=df.index))

    GWL_features_df = pd.concat(base_df, axis=1)

    return GWL_features_df

### Helper functions for the pipeline

In [None]:
# ===== Compute features over the full time series ===== #
def compute_features(hm_df, externe_col_list, meteo_col_list=('R', 'ER'), hydro_col_list=('GWL',)):

    """
    Computes features over the full time series (including the forecast period).

    The purpose is to compute these features only once, and then reuse the features derived from
    the true observed values within the XGBoost workflow.
    """
    
    if hm_df.empty:
        features_hm_df = pd.DataFrame({"Date": []})
    else:
        R_ER_features_df = rainfall_and_effective_rainfall_features_calcul(hm_df, list(meteo_col_list))
        GWL_features_df = gwl_features_calcul(hm_df, list(hydro_col_list))
        features_hm_df = reduce(lambda L, R: pd.merge(L, R, on='Date', how='outer'), [R_ER_features_df, GWL_features_df])

    if externe_col_list:
        extern_part = hm_df[['Date'] + [c for c in externe_col_list if c in hm_df.columns]].copy()
        R_ER_features_df = R_ER_features_df.merge(extern_part, on='Date', how='left')
        GWL_features_df = GWL_features_df.merge(extern_part, on='Date', how='left')
        features_hm_df = features_hm_df.merge(extern_part, on='Date', how='left')

    return features_hm_df, R_ER_features_df, GWL_features_df


# ===== Compute features over the full time series (Séchilienne) ===== #
def compute_features_Sech(hm_df, externe_col_list, meteo_col_list=('R', 'ER'), hydro_col_list=('WLI', 'WLM')):

    """
    Same as compute_features, but adapted to Séchilienne and its two water-level columns.
    """
    if hm_df.empty:
        features_hm_df = pd.DataFrame({"Date": []})
    else:
        R_ER_features_df = rainfall_and_effective_rainfall_features_calcul(hm_df, list(meteo_col_list))
        GWL_features_df = gwl_features_calcul(hm_df, list(hydro_col_list))
        features_hm_df = reduce(lambda L, R: pd.merge(L, R, on='Date', how='outer'), [R_ER_features_df, GWL_features_df])

    if externe_col_list:
        extern_part = hm_df[['Date'] + [c for c in externe_col_list if c in hm_df.columns]].copy()
        R_ER_features_df = R_ER_features_df.merge(extern_part, on='Date', how='left')
        GWL_features_df = GWL_features_df.merge(extern_part, on='Date', how='left')
        features_hm_df = features_hm_df.merge(extern_part, on='Date', how='left')

    return features_hm_df, R_ER_features_df, GWL_features_df

### Saving functions

In [None]:
# ===== Save the file containing the features ===== #
def save_features_df(features_hm_df, R_ER_features_df, GWL_features_df, output_folder_path):
    
    out_dir = os.path.join(output_folder_path, '1_Features_ER_R_GWL')
    os.makedirs(out_dir, exist_ok=True)

    features_hm_df.to_excel(os.path.join(out_dir, 'Features_HM.xlsx'), index=False)
    R_ER_features_df.to_excel(os.path.join(out_dir, 'Features_METEO.xlsx'), index=False)
    GWL_features_df.to_excel(os.path.join(out_dir, 'Features_HYDRO.xlsx'), index=False)

    print(f"Files successfully saved in {out_dir}")

## End-to-end pipeline

In [None]:
def features_pipeline_totale(input_path, input_file, input_usecols, externe_col_list, output_folder_path):
    """Viella and Villerville version"""

    # ===== 1. Data import ===== #
    hm_df = data_importation(input_path, input_file, input_usecols)

    # ===== 2. Feature computation ===== #
    features_hm_df, R_ER_features_df, GWL_features_df = compute_features(hm_df, externe_col_list, meteo_col_list=('R', 'ER'), 
                                                                         hydro_col_list=('GWL',))

    # ===== 3. Saving ===== #
    save_features_df(features_hm_df, R_ER_features_df, GWL_features_df, output_folder_path)

    return features_hm_df, R_ER_features_df, GWL_features_df


def features_pipeline_totale_Sech(input_path, input_file, input_usecols, externe_col_list, output_folder_path):
    """Version Séchilienne"""

    # ===== 1. Data import ===== #
    hm_df = data_importation_Sech(input_path, input_file, input_usecols)

    # ===== 2. Features computation ===== #
    features_hm_df, R_ER_features_df, GWL_features_df = compute_features(hm_df, externe_col_list, meteo_col_list=('R', 'ER'), 
                                                                         hydro_col_list=('WLI', 'WLM'))

    # ===== 3. Saving ===== #
    save_features_df(features_hm_df, R_ER_features_df, GWL_features_df, output_folder_path)

    return features_hm_df, R_ER_features_df, GWL_features_df

## Example usage

### Viella and Villerville

In [None]:
# ===== 1. Parameters ===== #
from pathlib import Path

SITE = "Villerville"

# Repo-friendly paths (relative to the project root)
PROJECT_ROOT = Path.cwd()  # If running from /notebooks, you may want Path.cwd().parent
input_path = PROJECT_ROOT / "data" / SITE / "0_Input_dataset"
input_file = "Hydro_Meteo_traitées.xlsx"  # Adapt to the site / dataset
input_usecols = ['R', 'ER', 'PZ3']  # Columns to read from the Excel file (excluding 'Date')
externe_col_list = ['R', 'ER', 'GWL']  # External series to keep in the output

output_folder_path = PROJECT_ROOT / "outputs" / SITE

# ===== 2. Run ===== #
features_hm_df, R_ER_features_df, GWL_features_df = features_pipeline_totale(
    input_path, input_file, input_usecols,
    externe_col_list, output_folder_path
)

features_hm_df.head()


### Séchilienne

In [None]:
# ===== 1. Parameters ===== #
from pathlib import Path

SITE = "Séchilienne"

# Repo-friendly paths (relative to the project root)
PROJECT_ROOT = Path.cwd()  # If running from /notebooks, you may want Path.cwd().parent
input_path = PROJECT_ROOT / "data" / SITE / "0_Input_dataset"
input_file = "Hydro_Meteo_traitées.xlsx"  # Adapt to the site / dataset
input_usecols = ['R', 'ER', 'WLI', 'WLM']  # Columns to read from the Excel file (excluding 'Date')
externe_col_list = ['R', 'ER', 'WLI', 'WLM']  # External series to keep in the output

output_folder_path = PROJECT_ROOT / "outputs" / SITE

# ===== 2. Run ===== #
features_hm_df, R_ER_features_df, GWL_features_df = features_pipeline_totale_Sech(
    input_path, input_file, input_usecols,
    externe_col_list, output_folder_path
)

features_hm_df.head()
