In [1]:
import numpy as np
import os
import re
import scipy.io as scio

import numpy as np
import pandas as pd

import pickle


In [2]:
from sklearn.preprocessing import MinMaxScaler
import torch

exp = '12DriveEndFault'
with open('cwru_data.pkl', 'rb') as file:
    # Deserialize the dictionary
    loaded_dict = pickle.load(file)

data, labels = loaded_dict[exp]
normal_data, normal_labels = loaded_dict['Normal']

data = np.concatenate([data, normal_data], axis=0)
labels = np.concatenate([labels, normal_labels], axis=0)

data = data[:,:, np.newaxis]

# scaler = MinMaxScaler()
# data = scaler.fit_transform(data.reshape(-1, data.shape[1])).reshape(data.shape)

# data = data[:, :, np.newaxis]

# input_dim = data.shape[2]
# hidden_dim = 128
# num_layers = 3
# batch_size = 64
# num_epochs = 200
# base_path = 'GAN/GAN_'
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
## Newtork parameters
parameters = dict()

parameters['module'] = 'lstm'
parameters['hidden_dim'] = 24
parameters['num_layer'] = 3
parameters['iterations'] = 5000
parameters['batch_size'] = 128

In [4]:
print("cuda" if torch.cuda.is_available() else "cpu")

cuda


In [5]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.2, random_state=42)

In [6]:
from timegan import timegan
from cwru import faults_idx, exps_idx

In [7]:
X_train = torch.tensor(X_train, dtype=torch.float32)

X_augmented = []
y_augmented = []
histories = {}
for label in np.unique(y_train):
    X_label = X_train[y_train == label]
    if X_label.shape[0] > 0:
        generated_data, history = timegan(X_label, parameters)
        history[label] = history
        X_augmented.append(generated_data)

        y_augmented.extend([label] * generated_data.shape[0])
X_augmented = np.concatenate(X_augmented, axis=0)
y_augmented = np.array(y_augmented)

  X_mb = torch.tensor(X_mb, dtype=torch.float32)


Step: 0/5000, e_loss_t0: 0.0281
Step: 1000/5000, e_loss_t0: 0.0001
Step: 2000/5000, e_loss_t0: 0.0000
Step: 3000/5000, e_loss_t0: 0.0000
Step: 4000/5000, e_loss_t0: 0.0000
Finish Embedding Network Training


  Z_mb = torch.tensor(random_generator(batch_size, z_dim, T_mb, max_seq_len), dtype=torch.float32)


Step: 0/5000, G_loss_S: 0.0028
Step: 1000/5000, G_loss_S: 0.0000
Step: 2000/5000, G_loss_S: 0.0000
Step: 3000/5000, G_loss_S: 0.0000
Step: 4000/5000, G_loss_S: 0.0000
Finish Training with Supervised Loss Only


  Z_mb = torch.tensor(random_generator(batch_size, z_dim, T_mb, max_seq_len), dtype=torch.float32)


Step: 0/5000, D_loss: 1.3954, G_loss: 0.9874


KeyboardInterrupt: 

In [None]:
n_classes = np.max(y_train, axis=0)+1
# Étape 1 : Préparer les données
# Combiner les données d'entraînement et leurs augmentations
X_train_combined = np.concatenate([X_train, X_augmented], axis=0)
y_train_combined = np.concatenate([y_train, y_augmented], axis=0)

# Créer des ensembles de données au format PyTorch
train_dataset = torch.utils.data.TensorDataset(
    torch.tensor(X_train, dtype=torch.float32),
    torch.tensor(np.eye(n_classes)[y_train], dtype=torch.long)
)

train_augmented_dataset = torch.utils.data.TensorDataset(
    torch.tensor(X_train_combined, dtype=torch.float32),
    torch.tensor(np.eye(n_classes)[y_train_combined], dtype=torch.long)
)

test_dataset = torch.utils.data.TensorDataset(
    torch.tensor(X_test, dtype=torch.float32),
    torch.tensor(np.eye(n_classes)[y_test], dtype=torch.long)
)


In [None]:
from evaluation import kl_divergence

kl_divergence(torch.from_numpy(X_train), torch.from_numpy(X_augmented))

In [None]:
from evaluation import visualization

visualization(X_train, X_augmented)

In [None]:
from evaluation import model_evaluation

input_size = X_train.shape[1]  # Dimensions des caractéristiques (features)

# Évaluer sur les données d'entraînement d'origine
print("\n### Évaluation sur le jeu d'entraînement d'origine ###")
results_original = model_evaluation(
    train_dataset=train_dataset,
    test_dataset=test_dataset,
    input_size=input_size,
    num_classes=n_classes,
    num_epochs=30,
    device='cuda'
)

# Évaluer sur les données augmentées
print("\n### Évaluation sur le jeu d'entraînement augmenté ###")
results_augmented = model_evaluation(
    train_dataset=train_augmented_dataset,
    test_dataset=test_dataset,
    input_size=input_size,
    num_classes=n_classes,
    num_epochs=30,
    device='cuda'
)

In [None]:
import matplotlib.pyplot as plt

# Extract test accuracies for each model from both results
models = list(results_original.keys())
test_accuracies_original = [results_original[model]['test_accuracy'] for model in models]
test_accuracies_augmented = [results_augmented[model]['test_accuracy'] for model in models]

# Create the bar plot
bar_width = 0.35
index = range(len(models))

fig, ax = plt.subplots(figsize=(10, 6))

bar1 = ax.bar(index, test_accuracies_original, bar_width, label='Original')
bar2 = ax.bar([i + bar_width for i in index], test_accuracies_augmented, bar_width, label='Augmented')

# Labeling
ax.set_xlabel('Model', fontsize=12)
ax.set_ylabel('Test Accuracy', fontsize=12)
ax.set_title('Comparison of Test Accuracy for Different Models', fontsize=14)
ax.set_xticks([i + bar_width / 2 for i in index])
ax.set_xticklabels(models)
ax.legend()

plt.tight_layout()
plt.show()


In [None]:
import seaborn as sns
from sklearn.metrics import confusion_matrix
def plot_confusion_matrices(results_dict, y_test, title_prefix):
    """
    Plot confusion matrices for all models in a results dictionary.

    Parameters:
        results_dict (dict): Dictionary containing model results, with a 'preds' key for predictions.
        y_test (array-like): Ground truth labels.
        title_prefix (str): Prefix for the title of each plot.
    """
    num_models = len(results_dict)
    fig, axes = plt.subplots(1, num_models, figsize=(5 * num_models, 5))

    if num_models == 1:
        axes = [axes]  # Ensure axes is iterable for a single plot.

    for ax, (model_name, model_results) in zip(axes, results_dict.items()):
        preds = model_results.get('preds', [])
        if len(preds) == 0:
            print(f"No predictions found for model {model_name}. Skipping...")
            continue
        cm = confusion_matrix(y_test, preds)
        sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False, ax=ax)
        ax.set_title(f"{title_prefix} - {model_name}")
        ax.set_xlabel("Predicted")
        ax.set_ylabel("True")

    plt.tight_layout()
    plt.show()

# Exemple d'utilisation
# Remplace `results_original` et `results_augmented` par tes dictionnaires et `y_test` par les labels réels.
plot_confusion_matrices(results_original, y_test, "Original")
plot_confusion_matrices(results_augmented, y_test, "Augmented")