In [None]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset
import torch.nn as nn
import matplotlib.pyplot as plt
import os
import random
from sklearn.metrics import mean_absolute_error, mean_squared_error
import copy
from torch.optim.lr_scheduler import ReduceLROnPlateau
from scipy.signal import butter, filtfilt

all_file_paths = [f"modified_data_Tip_Position_-2mm/corrected_2000RPMfeedspeed30mmmin_IL_bone{i}.csv" for i in range(1, 269)]

random.seed(42)
random.shuffle(all_file_paths)

total_files = len(all_file_paths)
num_train = int(total_files * 0.6)
num_val = int(total_files * 0.2)
num_test = total_files - num_train - num_val

train_paths = all_file_paths[:num_train]
val_paths = all_file_paths[num_train : num_train + num_val]
test_paths = all_file_paths[num_train + num_val :]

def butter_lowpass_filter(data, cutoff, fs, order=4):
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    return filtfilt(b, a, data)

class TipLevelDataset(Dataset):
    def __init__(self, file_paths, feature_columns=['Fz'], target_column='Tip_Position', sampling_rate=100, fs=10000, cutoff=50):
        self.inputs = []
        self.targets = []
        self.feature_columns = feature_columns
        self.target_column = target_column

        for path in file_paths:
            df = pd.read_csv(path)[feature_columns + [target_column]].dropna()

            for col in feature_columns:
                filtered = butter_lowpass_filter(df[col].values, cutoff=cutoff, fs=fs)
                df[col] = (filtered - np.mean(filtered)) / np.std(filtered)

            df_downsampled  = df.iloc[::100, :].copy()

            input_tensor = torch.tensor(df_downsampled [feature_columns].values, dtype=torch.float32)  # shape: (seq_len, num_features)
            target_tensor = torch.tensor(df_downsampled [target_column].values, dtype=torch.float32)   # shape: (seq_len,)

            self.inputs.append(input_tensor)
            self.targets.append(target_tensor)

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        return self.inputs[idx], self.targets[idx]

class LSTM(nn.Module):
    def __init__(self, input_size=1, hidden_size=64, num_layers=1):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.output_layer = nn.Linear(hidden_size, 1)

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        output = self.output_layer(lstm_out).squeeze(-1)
        return output

def train_lstm(model, train_dataset, val_dataset, epochs=300, lr=0.01):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=10, verbose=True)

    best_val_loss = float('inf')
    best_model_state = None

    train_loss_history = []
    val_loss_history = []

    for epoch in range(epochs):
        model.train()
        total_train_loss = 0

        for x_train, y_train in train_dataset:
            x_train = x_train.unsqueeze(0).to(device)
            y_train = y_train.unsqueeze(0).to(device)

            optimizer.zero_grad()
            y_pred = model(x_train)
            loss = criterion(y_pred, y_train)
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()

        train_loss_history.append(total_train_loss)

        model.eval()
        total_val_loss = 0
        with torch.no_grad():
            for x_val, y_val in val_dataset:
                x_val = x_val.unsqueeze(0).to(device)
                y_val = y_val.unsqueeze(0).to(device)
                y_pred = model(x_val)
                loss = criterion(y_pred, y_val)
                total_val_loss += loss.item()

        val_loss_history.append(total_val_loss)
        scheduler.step(total_val_loss)

        if total_val_loss < best_val_loss:
            best_val_loss = total_val_loss
            best_model_state = copy.deepcopy(model.state_dict())
            print(f"Epoch {epoch+1}: Train Loss = {total_train_loss:.4f}, Validation Loss = {total_val_loss:.4f} (Best model saved!)")
        else:
            print(f"Epoch {epoch+1}: Train Loss = {total_train_loss:.4f}, Validation Loss = {total_val_loss:.4f}")

    return model, best_model_state, train_loss_history, val_loss_history

def plot_learning_curve(train_loss, val_loss):
    plt.figure(figsize=(10, 5))
    plt.plot(train_loss, label='Training Loss')
    plt.plot(val_loss, label='Validation Loss')
    plt.xlabel("Epoch")
    plt.ylabel("Loss (MSE)")
    plt.title("Learning Curve")
    plt.legend()
    plt.grid(False)
    plt.tight_layout()
    plt.show()

def evaluate_critical_region(
    model, test_dataset, feature_names=['Fz'], feature_list=None,
    best_model_state=None, device=torch.device("cuda" if torch.cuda.is_available() else "cpu"), n_plot=5, center_threshold=-0, window=400
):
    if best_model_state:
        model.load_state_dict(best_model_state)
    model.eval()

    mae_total, rmse_total = 0, 0
    count = 0 
    all_maes = []

    for i, (x, y_true) in enumerate(test_dataset):
        with torch.no_grad():
            x_input = x.unsqueeze(0).to(device)
            y_pred = model(x_input)
            y_pred = y_pred.squeeze(0).cpu().numpy()
            x_np = x.numpy()
            y_true_np = y_true.numpy()

        idx_center = np.argmax(y_true_np >= center_threshold)
        idx_start = idx_center - window
        idx_end = idx_center + window

        y_true_slice = y_true_np[idx_start:idx_end]
        y_pred_slice = y_pred[idx_start:idx_end]
        x_slice = x_np[idx_start:idx_end, :]

        mae = mean_absolute_error(y_true_slice, y_pred_slice)
        rmse = np.sqrt(mean_squared_error(y_true_slice, y_pred_slice))
        all_maes.append(mae)
        mae_total += mae
        rmse_total += rmse
        count += 1

        if count <= n_plot:
            plt.figure(figsize=(14, 6))

            plt.subplot(2, 1, 1)
            plt.plot(y_true_slice, label='True Tip_Position')
            plt.plot(y_pred_slice, label='Predicted Tip_Position', linestyle='--')
            plt.title(f"Sample {i+1} Focused - MAE: {mae:.3f}, RMSE: {rmse:.3f}")
            plt.ylabel("Tip_Position")
            plt.legend()
            plt.grid(True)

            ax1 = plt.subplot(2, 1, 2)
            for name in feature_names:
                idx = feature_list.index(name)
                ax1.plot(x_slice[:, idx], label=name, alpha=0.4)

            ax1.set_ylabel("Input Features")
            plt.tight_layout()
            plt.ylim(-3, 3)
            plt.show()

    focus_avg_mae = mae_total / count
    focus_avg_rmse = rmse_total / count    

    print("\n=== Test Summary ===")
    print(f"Average MAE  : {focus_avg_mae:.4f}")
    print(f"Average RMSE : {focus_avg_rmse:.4f}")

    return all_maes, focus_avg_mae, focus_avg_rmse

# Thrust Force, Torque → Tip_Position

In [None]:
feature_columns_zt = ['Fz', 'Torque']

train_dataset_zt = TipLevelDataset(train_paths, feature_columns=feature_columns_zt)
val_dataset_zt   = TipLevelDataset(val_paths, feature_columns=feature_columns_zt)
test_dataset_zt  = TipLevelDataset(test_paths, feature_columns=feature_columns_zt)

In [None]:
model_zt = LSTM(input_size=len(feature_columns_zt))

model_zt, best_model_zt_state, train_loss_history_zt, val_loss_history_zt = train_lstm(
    model_zt, train_dataset_zt, val_dataset_zt
)

In [None]:
plot_learning_curve(train_loss_history_zt, val_loss_history_zt)

In [None]:
feature_columns_zt = ['Fz', 'Torque']

zt_maes, critical_region_mae_zt, critical_region_rmse_zt = evaluate_critical_region(
    model = model_zt,
    test_dataset = test_dataset_zt,
    feature_names=feature_columns_zt,
    feature_list=feature_columns_zt,
    best_model_state=best_model_zt_state,
    n_plot=5
)

# Thrust Force → Tip_Position

In [None]:
feature_columns_z = ['Fz']

train_dataset_z = TipLevelDataset(train_paths, feature_columns=feature_columns_z)
val_dataset_z   = TipLevelDataset(val_paths, feature_columns=feature_columns_z)
test_dataset_z  = TipLevelDataset(test_paths, feature_columns=feature_columns_z)

model_z = LSTM(input_size=len(feature_columns_z))
model_z, best_model_z_state, train_loss_history_z, val_loss_history_z = train_lstm(
    model_z, train_dataset_z, val_dataset_z
)

In [None]:
plot_learning_curve(train_loss_history_z, val_loss_history_z)

In [None]:
z_maes, critical_region_mae_z, critical_region_rmse_z = evaluate_critical_region(
    model = model_z,
    test_dataset = test_dataset_z,
    feature_names=feature_columns_z,
    feature_list=feature_columns_z,
    best_model_state=best_model_z_state,
    n_plot=5
)

# Torque → Tip_Position

In [None]:
feature_columns_T = ['Torque']

train_dataset_T = TipLevelDataset(train_paths, feature_columns=feature_columns_T)
val_dataset_T   = TipLevelDataset(val_paths, feature_columns=feature_columns_T)
test_dataset_T  = TipLevelDataset(test_paths, feature_columns=feature_columns_T)

model_T = LSTM(input_size=len(feature_columns_T))

model_T, best_model_T_state, train_loss_history_T, val_loss_history_T = train_lstm(
    model_T, train_dataset_T, val_dataset_T
)

In [None]:
plot_learning_curve(train_loss_history_T, val_loss_history_T)

In [None]:
t_maes, critical_region_mae_t, critical_region_rmse_t = evaluate_critical_region(
    model = model_T,
    test_dataset = test_dataset_T,
    feature_names=feature_columns_T,
    feature_list=feature_columns_T,
    best_model_state=best_model_T_state,
    n_plot=5
)

In [None]:
mae_df = pd.DataFrame({'zt': zt_maes, 'z': z_maes, 't': t_maes})
print(mae_df)
filename = f"mae_data.csv"
# mae_df.to_csv(filename, index=False)

In [None]:
mae_data = ['Fz, Torque', 'Fz', 'Torque']
mae = [critical_region_mae_zt, critical_region_mae_z, critical_region_mae_t]

plt.figure(figsize=(8, 5))
bars = plt.bar(mae_data, mae, width=0.3, color='black')
plt.ylim(0.2, 0.8)
plt.xticks(fontsize=10)
plt.yticks(fontsize=12)
plt.xlabel("Features", fontsize=18)
plt.ylabel("MAE", fontsize=18)
plt.title("comparison of MAE", fontsize=20)

for bar, value in zip(bars, mae):
    plt.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.08, f"{value:.3f}",
             ha='center', va='top', color="black", fontsize=18)
# plt.show()
plt.savefig("MAE comparison", dpi=330)