In [None]:
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from statistics import mean

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
import seaborn as sns

from himodule.custom_classes import NasaDataset, LossAndMetric
from himodule.ae_metrics import MAPE
from himodule.normalisation import StandardScaler, MinMaxScaler
from himodule.secondary_funcs import save_object, load_object, check_path, split_dataset, \
    seed_everything, split_anomaly_normal, split_anomaly_normal23
from himodule.linear_regression import LinearRegression

from collections import defaultdict

import os
import glob

sns.set_theme(style='whitegrid', font_scale=1.2)

In [None]:
# Check for GPU availability

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f'{device=}')

In [None]:
def get_targets(path: str):
    arrays = dict()
    for pth in glob.glob(os.path.join(path, '*.dat')):
        arr = np.fromfile(pth)
        arrays[int(pth.rsplit('\\', maxsplit=1)[-1][:-4])] = arr
    
    new_arrays = dict()
    keys = sorted(list(arrays.keys()))
    for key in keys:
        new_arrays[key] = arrays[key]
    return new_arrays

def transform_targets(targets: dict):
    targets = [np.array((targs, [machine_id]*len(targs))) for machine_id, targs in targets.items()]
    targets = np.concatenate(targets, axis=1)[0]
    return targets

In [None]:
def sparse_train_dataset(dataset: NasaDataset, step: int = 10):
    new_indeces = list()
    for machine_id in dataset.machine_ids.unique():
        machine_id = int(machine_id.item())
        indeces = dataset.get_indeces(machine_id)
        hi_less7, hi_greater7 = (indeces[(dataset.targets[indeces] <= 0.7).flatten()],
                                 indeces[(dataset.targets[indeces] > 0.7).flatten()])
        hi_greater7_mask = torch.BoolTensor([True]*len(hi_greater7))
        hi_greater7_mask[::step] = False
        indeces = torch.concat((hi_greater7[hi_greater7_mask], hi_less7))
        new_indeces.append(indeces)

    new_indeces = torch.concat(new_indeces)
    
    return NasaDataset(dataset_dict={
        'sensors': dataset.dataset[new_indeces],
        'rul': dataset.ruls[new_indeces],
        'machine_id': dataset.machine_ids[new_indeces]
    }, targets=dataset.targets[new_indeces])

In [None]:
train_targets = transform_targets(get_targets('../Smoothed/cae/train'))[:, None]
test_targets = transform_targets(get_targets('../Smoothed/cae/test'))[:, None]

In [None]:
seed = 37
batch_size = 20
window_size = 5

# Whole dataset loading
train_dataset = NasaDataset('../datasets/clean_train_data.csv', targets=train_targets)

seed_everything(seed)
train_dataset, val_dataset = split_dataset(train_dataset, test_size=.25)
train_dataset = sparse_train_dataset(train_dataset, step=5)

test_dataset = NasaDataset('../datasets/clean_test_data.csv', targets=test_targets)

scaler_path = '../scalers/MinMaxScaler.pkl'
scaler = load_object('../scalers/MinMaxScaler.pkl')
try:
    norm_name = repr(scaler).split(' ', maxsplit=2)[0].split('.')[-1]
except IndexError:
    norm_name = 'no_scaling'

for dataset in (train_dataset, val_dataset, test_dataset):
    dataset.to(device)
    dataset.dataset = scaler.transform(dataset.dataset)

seed_everything(seed)
g = torch.Generator()
g.manual_seed(seed)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False, generator=g)

seed_everything(seed)
val_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False, generator=g)

seed_everything(seed)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, generator=g)

print(f'Train: {len(train_dataset)}\nValidation: {len(val_dataset)}\nTest: {len(test_dataset)}')

input_shape = train_dataset.get_input_shape()

seed_everything(seed)
linear_model = LinearRegression(input_shape).to(device)
loss_func = nn.MSELoss()
metric_func = MAPE()
optimiser = optim.AdamW(linear_model.parameters(),
                       lr=1e-3)
optimiser_name = repr(optimiser).split(' ', maxsplit=1)[0]

In [None]:
epochs = 100
history = list()

# Model training on normal data only

for epoch in range(epochs):
    train_losses = list()
    train_metrics = list()
    for dta in train_loader:
        sample = dta['sensors']
        hi_target = dta['targets']
        sample = sample.to(device)
        hi = linear_model(sample)

        loss = loss_func(hi, hi_target)
        metric = metric_func(hi, hi_target)

        optimiser.zero_grad()
        loss.backward()
        optimiser.step()

        train_losses.append(loss.item())
        train_metrics.append(metric.item())
    
    with torch.no_grad():
        val_losses = list()
        val_metrics = list()
        for dta in test_loader:
            sample = dta['sensors']
            hi_target = dta['targets']
            sample = sample.to(device)
            hi = linear_model(sample)

            loss = loss_func(hi, hi_target)
            metric = metric_func(hi, hi_target)

            val_losses.append(loss.item())
            val_metrics.append(metric.item())
    
    train_loss, val_loss = mean(train_losses), mean(val_losses)
    train_metrics, val_metrics = mean(train_metrics), mean(val_metrics)
    history.append((epoch, train_loss, val_loss, train_metrics, val_metrics))
    if (epoch + 1) % 10 == 0 or epoch == epochs - 1:
        print(f'{epoch+1:>3}/{epochs:>3}: {train_loss=:.4f}, {val_loss=:.4f}, {train_metrics=:.4f}%, {val_metrics=:.4f}%')

with torch.no_grad():
    test_losses = list()
    test_metrics = list()
    for dta in test_loader:
        sample = dta['sensors']
        hi_target = dta['targets']
        sample = sample.to(device)
        hi = linear_model(sample)

        loss = loss_func(hi, hi_target)
        metric = metric_func(hi, hi_target)

        test_losses.append(loss.item())
        test_metrics.append(metric.item())
    
    print(f'\nTest: {mean(test_losses)=:.4f}, {mean(test_metrics)=:.4f}%')

#--------------------------------#
model_path = '../LinearRegression'

if True:
    check_path(model_path)
    torch.save(linear_model.state_dict(), os.path.join(model_path, 'regression_cae.pth'))

## Plots

In [None]:
seed = 37
batch_size = 20
window_size = 5

train_targets = transform_targets(get_targets('../Smoothed/cae/train'))[:, None]
test_targets = transform_targets(get_targets('../Smoothed/cae/test'))[:, None]

train_dataset = NasaDataset('../datasets/clean_train_data.csv', targets=train_targets)
seed_everything(seed)
_, val_dataset = split_dataset(train_dataset, .25)

test_dataset = NasaDataset('../datasets/clean_test_data.csv', targets=test_targets)

scaler_path = '../scalers/MinMaxScaler.pkl'
scaler = load_object('../scalers/MinMaxScaler.pkl')
try:
    norm_name = repr(scaler).split(' ', maxsplit=2)[0].split('.')[-1]
except IndexError:
    norm_name = 'no_scaling'

for dataset in (test_dataset, val_dataset):
    dataset.to(device)
    dataset.dataset = scaler.transform(dataset.dataset)

g = torch.Generator()
g.manual_seed(seed)
seed_everything(seed)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, generator=g)

seed_everything(seed)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, generator=g)

print(f'Test: {len(test_dataset)}\nValidation: {len(val_dataset)}')

input_shape = test_dataset.get_input_shape()

model_path = '../LinearRegression'
seed_everything(seed)
linear_model = LinearRegression(input_shape).to(device)
linear_model.load_state_dict(torch.load(os.path.join(model_path, f'regression_cae.pth')))
linear_model = linear_model.to(device)

In [None]:
def get_predictions(loader: DataLoader, dataset: NasaDataset, linear_model: LinearRegression):
    predictions = list()

    with torch.no_grad():
        for dta in loader:
            sample = dta['sensors']
            sample = sample.to(device)
            hi = linear_model(sample)

            predictions.append(hi)

    predictions = torch.vstack(predictions)
    plot_arr = torch.concat((dataset.machine_ids[:, None], dataset.targets, predictions), dim=1)
    
    return plot_arr

In [None]:
def get_plot_df(plot_arr: torch.Tensor, machine_id: int) -> pd.DataFrame:
    arr = plot_arr[plot_arr[:,0] == machine_id][:, 1:]
    plot_df = pd.DataFrame(arr.cpu(), columns=('true', 'predicted')).melt(ignore_index=False)
    return plot_df

def make_plot(plot_df: pd.DataFrame, machine_id: int = None, save_path: str = None):
    plt.close()
    fig, ax = plt.subplots()
    fig.set_size_inches(10, 5)

    sns.lineplot(data=plot_df,
                 x=plot_df.index,
                 y='value',
                 hue='variable',
                 ax=ax)
    
    ax.set_ylabel('Health Index')
    ax.set_xlabel('Cycle')
    ax.set_title(f'Machine id: {machine_id}')
    ax.legend(title=None)

    plt.tight_layout()
    if save_path:
        plt.savefig(save_path)
    # plt.show()

In [None]:
plot_arr = get_predictions(test_loader, test_dataset, linear_model)

plots_path = '../Plots/HIs/cae/test'
check_path(plots_path)

for machine_id in test_dataset.machine_ids.unique():
    machine_id = int(machine_id.item())
    plot_df = get_plot_df(plot_arr, machine_id)
    make_plot(plot_df, machine_id, save_path=os.path.join(plots_path, f'{machine_id}.png'))

In [None]:
plot_arr = get_predictions(val_loader, val_dataset, linear_model)

plots_path = '../Plots/HIs/cae/validation'
check_path(plots_path)

for machine_id in val_dataset.machine_ids.unique():
    machine_id = int(machine_id.item())
    plot_df = get_plot_df(plot_arr, machine_id)
    make_plot(plot_df, machine_id, save_path=os.path.join(plots_path, f'{machine_id}.png'))