In [None]:
import os
os.getcwd()
os.chdir("C:\\Users\\nathi\\Downloads\\3 Semester\\AMS\\")

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest
import numpy.typing as npt
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
import itertools
import datetime

In [None]:
def ts_train_test_split(
    ts: npt.ArrayLike, training_size: int
) -> tuple[pd.DataFrame, pd.DataFrame] | tuple[np.ndarray, np.ndarray]:
    """
    Time series train test split. Performs a single split of the series.

    Parameters:
    ----------
    ts: array-like
        univariate time series data set

    training_size: int
        Size of the training set. The test set length

    Returns:
    -------
    Tuple[pd.DataFrame, pd.DataFrame] | Tuple[np.ndarray, np.ndarray]
        A tuple containing the training and test sets,
        either as DataFrames or NumPy arrays
    """
    if training_size >= len(ts):
        raise ValueError("training_size must be < length of series")

    if isinstance(ts, pd.DataFrame):
        return ts.iloc[:training_size], ts.iloc[training_size:]

    return np.asarray(ts[:training_size]), np.asarray(ts[training_size:])

In [None]:
path = 'datasets_pseudo/Threshold/'
names = os.listdir(path)
date_str = datetime.datetime.now().strftime("%m%d%Y_%H%M%S")
dfs = {}

for name in names:
    machine, material, component, _ = name.replace('.csv', '').split('_')
    tempdf = pd.read_csv(f"{path}\\{name}") 
    # tempdf[['Machine', 'Material', 'Component']] = [machine, material, component]
    # print(f"Size of {name}: {tempdf.shape}")

    dfs[name.replace('.csv', '')] = tempdf

df = pd.concat(dfs.values(), ignore_index=True)

In [None]:
columns_to_predict = ['CURRENT|1', 'CURRENT|2', 'CURRENT|3', 'CURRENT|6']

In [None]:
# Define parameter grid
contamination_values = [0.01, 0.05, 0.02]
n_estimators_list = [50, 100, 150]
feature_sets = {
    'all_sensors': list(set(df.columns) - set(['Machine', 'Material', 'Component']) - set(['CURRENT|1_Peak', 'CURRENT|2_Peak', 'CURRENT|3_Peak', 'CURRENT|6_Peak'])), 
    'torques' : ['TORQUE|1', 'TORQUE|2', 'TORQUE|3', 'TORQUE|6']
}

results = []
# names = ['threshold_CMX1_AL_CP1.csv']

for name in names:

    df_filtered = pd.read_csv(f"{path}\\{name}")

    for column in columns_to_predict:  # adjust if needed
        pseudo_label = f"{column}_Peak"

        for feature_label, features in feature_sets.items():
            feature_set = features.copy()
            if column not in feature_set:
                feature_set.append(column)

            # Drop NA
            data = df_filtered[feature_set + [pseudo_label]].dropna()
            full_index = data.index
            X = data[feature_set].drop(columns=[column])
            y_full = data[[column]].values.ravel()
            y_true = data[pseudo_label].astype(bool)

            # Train-test split
            train_size = round(0.5 * data.shape[0])
            X_train, X_test = ts_train_test_split(X, train_size)
            y_train, y_test = ts_train_test_split(y_full, train_size)
            y_pseudo = y_true.iloc[train_size:].values.ravel()
            idx_train, idx_test = ts_train_test_split(full_index, train_size)

            for contamination, n_estimators in itertools.product(contamination_values, n_estimators_list):

                plot_dir = f'Plots\\IT_{date_str}_{contamination}_{n_estimators}_{name}\\' 
                if not os.path.exists(plot_dir):
                    os.makedirs(plot_dir)

                model = IsolationForest(contamination=contamination, n_estimators=n_estimators, random_state=42)
                model.fit(X_train)

                preds = model.predict(X_test)
                y_pred = (preds == -1).astype(bool)

                # Evaluation
                accuracy = accuracy_score(y_pseudo, y_pred)
                precision = precision_score(y_pseudo, y_pred, zero_division=0)
                recall = recall_score(y_pseudo, y_pred, zero_division=0)
                f1 = f1_score(y_pseudo, y_pred, zero_division=0)
                n_peaks = y_pred.sum()

                results.append({
                    'Dataset': name.replace(".csv", ""),
                    'Column': column,
                    'Features': feature_label,
                    'Contamination': contamination,
                    'n_estimators': n_estimators,
                    'Accuracy': accuracy,
                    'Precision': precision,
                    'Recall': recall,
                    'F1': f1,
                    'Detected_Peaks': n_peaks
                })

                # Plot Actual vs Predicted Peaks
                fig, ax = plt.subplots(figsize=(14, 6))
                ax.plot(idx_train, y_train, label='Train Data', alpha=0.5, color='blue')
                ax.plot(idx_test, y_test, label='Test Data', alpha=0.8, color='green')

                # plot as shaded areas
                for i, is_peak in enumerate(y_pred):
                    if is_peak:
                        ax.axvspan(idx_test[i], idx_test[i], color='red', alpha=0.8)

                # # Plot as points
                # peak_indices = [idx_test[i] for i, is_peak in enumerate(y_pred) if is_peak]
                # peak_values = [y_test[i] for i, is_peak in enumerate(y_pred) if is_peak]
                # ax.plot(peak_indices, peak_values, 'ro', label='Detected Peaks')

                ax.set_title(f"Detected Peaks in {column} using Isolation Forest cont: {contamination} and est: {n_estimators}")
                ax.set_xlabel("Index")
                ax.set_ylabel("Current")
                ax.legend()
                plt.tight_layout()
                plt.savefig(f'{name}\\{column.replace("|", "")}.png')
                # plt.savefig(f"Plots/IT_plot_{column.replace('|', '')}_{feature_label}.png")
                plt.show()

# Convert to DataFrame and sort
results_df = pd.DataFrame(results)  
results_df.sort_values(by='F1', ascending=False, inplace=True)
display(results_df)

results_df.to_csv(f"Results/IT_results_{date_str}_all.csv")



In [None]:
# sensitivity analysis 
# regression to identify peaks and use those as pseudo labels
# use secondary variables to be able to identify the peaks