In [1]:
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from pathlib import Path

In [2]:
import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from tqdm_joblib import tqdm_joblib
from tqdm import tqdm
from general_utils.constants import spectral_bands, indices


class TimeSeriesAggregator:
    DEFAULT_FEATURES = spectral_bands + indices

    def __init__(self, features=None, window=120, step=90, n_jobs=-1):
        self.features = features or self.DEFAULT_FEATURES
        self.window = window
        self.step = step
        self.n_jobs = n_jobs

    @staticmethod
    def calc_trend(x, y):
        if len(x) < 2:
            return 0
        xm, ym = np.mean(x), np.mean(y)
        denom = np.sum((x - xm) ** 2)
        return 0 if denom == 0 else np.sum((x - xm) * (y - ym)) / denom

    def process_id(self, id_, group):
        group = group.sort_values("time")
        species = group["species"].iloc[0]
        times = group["time_num"].values
        feats = []
        start = 0

        while start < times.max():
            end = start + self.window
            win = group[(group["time_num"] >= start) & (group["time_num"] < end)]
            if len(win) < 2:
                start += self.step
                continue

            f = {"id": id_, "species": species}
            for col in self.features:
                vals = win[col].values
                t = win["time_num"].values
                f[f"{col}_mean"] = vals.mean()
                f[f"{col}_std"] = vals.std()
                f[f"{col}_min"] = vals.min()
                f[f"{col}_max"] = vals.max()
                f[f"{col}_trend"] = self.calc_trend(t, vals)
            feats.append(f)
            start += self.step
        return feats

    def aggregate_time_series(self, df):
        df = df.copy()
        df["time"] = pd.to_datetime(df["time"])
        df["time_num"] = (df["time"] - df["time"].min()).dt.days
        groups = list(df.groupby("id"))

        with tqdm_joblib(
            tqdm(desc="Processing IDs", total=len(groups))
        ) as progress_bar:
            res = Parallel(n_jobs=self.n_jobs)(
                delayed(self.process_id)(i, g) for i, g in groups
            )

        return pd.DataFrame([f for sub in res for f in sub])

    def aggregate_to_single_row_keep_windows(self, df_windows):
        df_windows = df_windows.copy()
        df_windows["window_idx"] = df_windows.groupby("id").cumcount()
        df_pivot = df_windows.pivot_table(index=["id", "species"], columns="window_idx")
        df_pivot.columns = [
            f"{col[0]}_w{col[1]}" if isinstance(col, tuple) else col
            for col in df_pivot.columns
        ]
        return df_pivot.reset_index()

    def run(self, df):
        """Shortcut: vollständiger Lauf."""
        df_win = self.aggregate_time_series(df)
        return self.aggregate_to_single_row_keep_windows(df_win)

  from tqdm.autonotebook import tqdm


In [3]:
# ts_builder = TimeSeriesAggregator(window=120, step=90)
# train_df = ts_builder.run(df)

In [4]:
class DataLoader:
    def __init__(self):
        pass

    def load_transform(self, path):
        """load data and perform basic aggregation and sorting"""
        try:
            df = pd.read_csv(path)
        except FileNotFoundError:
            raise FileNotFoundError(f"File {path} not found.")
        except pd.errors.ParserError:
            raise ValueError(f"File {path} could not be read as CSV.")

        if not {"time", "id"}.issubset(df.columns):
            raise ValueError("CSV must contain 'time' and 'id' columns.")

        df["time"] = pd.to_datetime(df["time"], errors="coerce")
        if df["time"].isna().any():
            raise ValueError(
                "Some values in 'time' could not be converted to datetime."
            )

        agg_dict = {
            col: "mean" if pd.api.types.is_numeric_dtype(dtype) else "first"
            for col, dtype in df.dtypes.items()
            if col not in ["time", "id"]
        }

        df = df.groupby(["time", "id"], as_index=False).agg(agg_dict)
        df = df.sort_values(["id", "time"]).reset_index(drop=True)
        return df

In [5]:
TRAIN_PATH = "../../../data/processed/trainset.csv"
TEST_PATH = "../../../data/processed/testset.csv"
VAL_PATH = "../../../data/processed/valset.csv"


def load_data(*paths: Path):
    """
    Loads and transforms data from one or more file paths.

    Returns a DataFrame if one path is given, otherwise a list of DataFrames.
    """
    dataloader = DataLoader()
    dataframes = []

    for path in paths:
        df = dataloader.load_transform(path)
        print(f"Data import and transformation finished for: {path}")
        dataframes.append(df)

    if len(dataframes) == 1:
        return dataframes[0]
    return dataframes


def define_models():
    return [
        (RandomForestClassifier, {"n_estimators": 5, "max_depth": 2}),
        # (lgb.LGBMClassifier, {"n_estimators": 100, "learning_rate": 0.1}),
        # (xgb.XGBClassifier, {"n_estimators": 100, "learning_rate": 0.1, "max_depth": 6}),
    ]


def train_models(train_df, test_df, models, target_col="species"):
    pipeline = GenericPipeline(target_col=target_col)
    pipeline.run(train_df, test_df, models)
    print("Training finished!")

In [6]:
train_df, test_df, val_df = load_data(TRAIN_PATH, TEST_PATH, VAL_PATH)
models = define_models()
train_df

Data import and transformation finished for: ../../../data/processed/trainset.csv
Data import and transformation finished for: ../../../data/processed/testset.csv
Data import and transformation finished for: ../../../data/processed/valset.csv


Unnamed: 0,time,id,disturbance_year,doy,b2,b3,b4,b5,b6,b7,b8,b8a,b11,b12,species
0,2017-01-13,4,0.0,13.0,176.0,229.0,205.0,405.0,1192.0,1308.0,1316.0,1529.0,396.0,180.0,Scots_pine
1,2017-01-27,4,0.0,27.0,382.0,429.0,205.0,405.0,1192.0,1308.0,1316.0,1529.0,396.0,180.0,Scots_pine
2,2017-02-19,4,0.0,50.0,240.0,382.0,259.0,544.0,1387.0,1609.0,1825.0,1762.0,732.0,338.0,Scots_pine
3,2017-03-27,4,0.0,86.0,244.0,375.0,287.0,579.0,1536.0,1776.0,1878.0,1884.0,1013.0,488.0,Scots_pine
4,2017-04-27,4,0.0,117.0,215.0,327.0,274.0,548.0,1499.0,1803.0,1766.0,1957.0,1003.0,540.0,Scots_pine
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9709,2022-06-27,99,0.0,178.0,285.0,474.0,271.0,767.0,2062.5,2542.5,2341.0,2829.0,1158.0,569.0,Norway_spruce_mixed
9710,2022-07-23,99,0.0,204.0,235.0,403.0,247.0,627.0,1936.0,2454.0,2211.0,2648.0,1162.0,547.0,Norway_spruce_mixed
9711,2022-07-24,99,0.0,205.0,172.0,315.0,157.0,540.0,1818.0,2214.0,2166.0,2620.0,1100.0,494.0,Norway_spruce_mixed
9712,2022-08-01,99,0.0,213.0,240.0,417.0,244.0,638.0,1956.0,2235.0,2376.0,2617.0,1184.0,518.0,Norway_spruce_mixed


In [7]:
import pandas as pd
from sklearn.preprocessing import LabelEncoder, StandardScaler


class EnsemblePipeline:
    def __init__(self, target_col):
        self.target_col = target_col
        self.label_encoder = LabelEncoder()
        self.scaler = StandardScaler()
        self.categorical_cols = []
        self.fitted = False

    def drop_columns(self, df):
        """Remove unnecessary columns."""
        return df.drop(
            columns=["time", "id", "disturbed", "disturbance_year"], errors="ignore"
        )

    def _encode_features(self, df):
        """One-hot encode categorical features."""
        return pd.get_dummies(df, columns=self.categorical_cols, drop_first=True)

    def fit(self, train_df):
        ts_builder = TimeSeriesAggregator(window=120, step=90)
        feature_df = ts_builder.run(train_df)

        feature_df[self.target_col] = (
            train_df.groupby("id")[self.target_col].first().values
        )

        df = self.drop_columns(feature_df.copy())

        df[self.target_col] = self.label_encoder.fit_transform(df[self.target_col])

        self.categorical_cols = [
            c
            for c in df.select_dtypes(include=["object", "category"]).columns
            if c != self.target_col
        ]

        df = self._encode_features(df)
        X, y = df.drop(columns=[self.target_col]), df[self.target_col]
        X_scaled = self.scaler.fit_transform(X)
        self.fitted = True

        return pd.DataFrame(X_scaled, columns=X.columns, index=X.index), y

    def transform(self, df):
        if not self.fitted:
            raise RuntimeError("Pipeline must be fitted before transform().")

        ts_builder = TimeSeriesAggregator(window=120, step=90)
        feature_df = ts_builder.run(df)

        if self.target_col in df.columns:
            feature_df[self.target_col] = (
                df.groupby("id")[self.target_col].first().values
            )

        df = self.drop_columns(feature_df.copy())

        if self.target_col in df.columns:
            df[self.target_col] = self.label_encoder.transform(df[self.target_col])

        df = self._encode_features(df)
        X = df.drop(columns=[self.target_col]) if self.target_col in df.columns else df
        X = X.reindex(columns=self.scaler.feature_names_in_, fill_value=0)

        X_scaled = self.scaler.transform(X)
        y = df[self.target_col] if self.target_col in df.columns else None

        X_scaled_df = pd.DataFrame(X_scaled, columns=X.columns, index=X.index)
        return (X_scaled_df, y) if y is not None else X_scaled_df

In [8]:
import pandas as pd
from tabulate import tabulate
import matplotlib.pyplot as plt
from models.ensemble_models.ensemble_utils.ensemble_model_manager import (
    EnsembleModelManager,
)
from sklearn.model_selection import GridSearchCV


class GenericPipeline:
    def __init__(self, target_col="species"):
        self.pipeline = EnsemblePipeline(target_col=target_col)
        self.ensemble = EnsembleModelManager()

    def _print_metrics(self, model_name, metrics, top_n_features=5):
        """
        Nicely prints the model metrics and top feature importances.
        """
        print(f"\n{'=' * 40}\nModel: {model_name}\n{'=' * 40}")

        # Key performance metrics
        metrics_table = [
            ["Accuracy", metrics.get("accuracy")],
            ["Precision (Macro)", metrics.get("precision_macro")],
            ["Recall (Macro)", metrics.get("recall_macro")],
            ["F1 (Macro)", metrics.get("f1_macro")],
        ]
        print("\nPerformance Metrics:")
        print(
            tabulate(
                metrics_table,
                headers=["Metric", "Score"],
                tablefmt="fancy_grid",
                floatfmt=".4f",
            )
        )

        feat_imp = metrics.get("feature_importances")
        if feat_imp:
            feat_imp_df = (
                pd.DataFrame(feat_imp)
                .sort_values(by="importance", ascending=False)
                .head(top_n_features)
            )
            print(f"\nTop {top_n_features} Feature Importances:")
            print(
                tabulate(
                    feat_imp_df, headers="keys", tablefmt="fancy_grid", floatfmt=".4f"
                )
            )

    def run(self, train_df, test_df, model_defs, val_df=None):
        """
        Trains and evaluates all models defined in model_defs.
        Optionally evaluates a validation set.
        """
        X_train, y_train = self.pipeline.fit(train_df)
        X_test, y_test = self.pipeline.transform(test_df)
        X_val, y_val = self.pipeline.transform(val_df)

        feature_names = (
            X_train.columns
            if hasattr(X_train, "columns")
            else [f"f{i}" for i in range(X_train.shape[1])]
        )

        results_summary = []

        for model_class, params in model_defs:
            model_name = model_class.__name__
            print(f"\n{'-' * 30}\nTraining {model_name}...\n{'-' * 30}")

            if any(isinstance(v, (list, tuple)) for v in params.values()):
                grid = GridSearchCV(
                    model_class(), params, cv=5, n_jobs=-1, scoring="accuracy"
                )
                grid.fit(X_train, y_train)
                hyperparams = grid.best_params_
                print(f"Best hyperparameters: {hyperparams}")
            else:
                hyperparams = params

            model, train_metrics = self.ensemble.run_training(
                model_class,
                hyperparams,
                X_train,
                y_train,
                X_train,
                y_train,
                feature_names,
            )

            _, test_metrics = self.ensemble.run_training(
                model_class,
                hyperparams,
                X_train,
                y_train,
                X_test,
                y_test,
                feature_names,
            )

            _, val_metrics = self.ensemble.run_training(
                model_class,
                hyperparams,
                X_train,
                y_train,
                X_val,
                y_val,
                feature_names,
            )

            results_summary.append(
                {
                    "model": model_name,
                    "train_acc": train_metrics["accuracy"],
                    "test_acc": test_metrics["accuracy"],
                    "val_acc": val_metrics["accuracy"],
                }
            )

            self._print_metrics(model_name, test_metrics)
        return self.ensemble.load_models(), X_train

    def _plot_performance_comparison(self, results_summary):
        df = pd.DataFrame(results_summary)
        df.set_index("model", inplace=True)
        df.plot(kind="bar", figsize=(10, 6))
        plt.ylabel("Accuracy")
        plt.title("Train / Test / Validation Performance Comparison")
        plt.xticks(rotation=45)
        plt.grid(axis="y")
        plt.tight_layout()
        plt.show()

In [9]:
results, x_t = GenericPipeline(target_col="species").run(
    train_df, test_df, models, val_df=val_df
)

 11%|█▏        | 8/70 [00:00<00:07,  8.53it/s], ?it/s]


KeyError: 'ndvi'

In [None]:
x_t

In [10]:
DEFAULT_FEATURES = spectral_bands + indices
DEFAULT_FEATURES

['b2',
 'b3',
 'b4',
 'b5',
 'b6',
 'b7',
 'b8',
 'b8a',
 'b11',
 'b12',
 'ndvi',
 'gndvi',
 'wdvi',
 'tndvi',
 'savi',
 'ipvi',
 'mcari',
 'reip',
 'masvi2',
 'dvi']