In [None]:
from pathlib import Path
from typing import Iterable

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import mean_squared_error, classification_report
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from lightgbm import LGBMRegressor

from common.data import load_data

sns.set_theme()

In [None]:
base_path = Path("../data/5. Battery Data Set/1. BatteryAgingARC-FY08Q4")

def make_paths(names: Iterable[str]):
    return [
        base_path.joinpath(name)
        for name in names
    ]

train_paths = make_paths(["B0005.mat", "B0006.mat"])
valid_paths = make_paths(["B0007.mat"])
test_paths = make_paths(["B0018.mat"])

In [None]:
def load_paths(paths: Iterable[Path]) -> pd.DataFrame:
    data = pd.concat({Path(p).stem: load_data(p, "discharge") for p in paths})
    data.index.names = ["file", "index"]
    return data

In [None]:
train_data = load_paths(train_paths)
valid_data = load_paths(valid_paths)
test_data = load_paths(test_paths)

In [None]:
def window_dataframe(df: pd.DataFrame, size: int) -> pd.DataFrame:
    windows = []
    for s in range(size):
        shifted = df.shift(s)
        shifted.columns = shifted.columns.map(lambda c: f"{c}_b{s}")
        windows.append(shifted)
    return pd.concat(windows, axis=1).iloc[size-1:].reset_index(drop=True)

def process_data(data: pd.DataFrame, window_size: int, rolling_size=20):
    X = []
    y = []
    etc = []

    for _, group_df in data.groupby("file"):
        operation_df = group_df.groupby("operation_id").agg({
            "Time": ["max"],
            "Capacity": ["first"],
        })
        operation_df.columns = operation_df.columns.map(lambda c: "_".join(c))

        capacity_change = operation_df["Capacity_first"].rolling(rolling_size).mean().diff().shift(-1)
        time_change = operation_df["Time_max"].rolling(rolling_size).mean().diff().shift(-1)

        is_alive = operation_df.eval("Capacity_first > 1.4")
        alive_cycles = is_alive.sum()
        rul_cycles = -(np.arange(len(is_alive)) - alive_cycles)
        
        X.append(operation_df.iloc[rolling_size-1:-1])
        y.append(pd.DataFrame({
            "time": time_change.iloc[rolling_size-1:-1],
            "cap": capacity_change.iloc[rolling_size-1:-1],
        }))
        etc.append(pd.DataFrame({
            "rul": rul_cycles[rolling_size-1:-1],
        }))

    X = pd.concat(X, ignore_index=True)
    y = pd.concat(y, ignore_index=True)
    etc = pd.concat(etc, ignore_index=True)

    X_win = window_dataframe(X, window_size)
    y_win = y.iloc[window_size-1:].reset_index(drop=True)
    etc_win = etc[window_size-1:].reset_index(drop=True)

    return X_win, y_win, etc_win

In [None]:
window_size = 5
train_X, train_y, train_etc = process_data(train_data, window_size=window_size)
valid_X, valid_y, valid_etc = process_data(valid_data, window_size=window_size)
test_X, test_y, test_etc = process_data(test_data, window_size=window_size)

In [None]:
norm = StandardScaler().fit(train_X)
train_X_norm = norm.transform(train_X)
valid_X_norm = norm.transform(valid_X)
test_X_norm = norm.transform(test_X)

In [None]:
models = {
    target: LGBMRegressor(verbose=-1).fit(
        train_X_norm, train_y[target],
        eval_set=(valid_X_norm, valid_y[target])
    )
    for target in train_y.columns
}

In [None]:
def plot_prediction(y, pred):
    plot_min = min(pred.min(), y.min())
    plot_max = min(pred.max(), y.max())

    plt.scatter(pred, y, alpha=0.2)
    plt.plot(
        [plot_min, plot_max], [plot_min, plot_max], 
        color="orange", 
        linestyle="dashed",
    )
    plt.title("Prediction vs Real")
    plt.xlabel("Predictions")
    plt.ylabel("Real")
    plt.show()


def regression_report(model, X, y):
    pred = model.predict(X)

    print(f"rmse: {mean_squared_error(y, pred):0.4f}")
    plot_prediction(y, pred)


In [None]:
for target, model in models.items():
    print(target)
    regression_report(model, train_X_norm, train_y[target]) 

In [None]:
for target, model in models.items():
    print(target)
    regression_report(model, valid_X_norm, valid_y[target]) 

In [None]:
for target, model in models.items():
    print(target)
    regression_report(model, test_X_norm, test_y[target]) 

In [None]:
def run_simulation(models, norm, X: pd.DataFrame, life_window: int):
    curr_X = X
    for _ in range(life_window):
        X_norm = norm.transform(curr_X)
        preds = [
            model.predict(X_norm)
            for _, model in models.items()
        ]
        new_values = curr_X.iloc[:, :2] + np.column_stack(preds) 
        new_features = np.column_stack((new_values, curr_X.iloc[:, :-2].values))
        curr_X = pd.DataFrame(new_features, columns=X.columns)

    return curr_X

In [None]:
def evaluate_simulation(models, norm, X, etc, life_window):
    sim_res = run_simulation(models, norm, X, 30)
    dead_sim = sim_res["Capacity_first_b0"] < 1.4
    dead_rul = etc["rul"] < 30
    print(classification_report(dead_rul, dead_sim))

In [None]:
evaluate_simulation(models, norm, train_X, train_etc, 30)

In [None]:
evaluate_simulation(models, norm, valid_X, valid_etc, 30)

In [None]:
evaluate_simulation(models, norm, test_X, test_etc, 30)