Utilizzo per la Manutenzione Predittiva
Una volta rilevate le anomalie, si può:

Identificare le tendenze che precedono gli eventi anomali per fare previsioni future.
Integrare il modello con sistemi di avviso per notificare gli operatori in caso di anomalie.
Ottimizzare le procedure di manutenzione basandoti sulle previsioni delle anomalie

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.layers import Dense, LeakyReLU, BatchNormalization
from tensorflow.keras.models import Sequential

# Caricamento dei dati
data = pd.read_csv('measure_24_before_high_events_for_model_Indiium.csv', sep=';')

# Normalizzazione dei dati
data = (data - data.min()) / (data.max() - data.min())

# Parametri GAN
latent_dim = 100
epochs = 10000
batch_size = 64

# Creazione del generatore
def build_generator(latent_dim):
    model = Sequential()
    model.add(Dense(128, input_dim=latent_dim))
    model.add(LeakyReLU(alpha=0.01))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Dense(256))
    model.add(LeakyReLU(alpha=0.01))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Dense(data.shape[1], activation='sigmoid'))
    return model

# Creazione del discriminatore
def build_discriminator(input_shape):
    model = Sequential()
    model.add(Dense(256, input_shape=input_shape))
    model.add(LeakyReLU(alpha=0.01))
    model.add(Dense(128))
    model.add(LeakyReLU(alpha=0.01))
    model.add(Dense(1, activation='sigmoid'))
    return model

# Costruzione del GAN
discriminator = build_discriminator((data.shape[1],))
discriminator.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

generator = build_generator(latent_dim)
z = tf.keras.Input(shape=(latent_dim,))
img = generator(z)
discriminator.trainable = False
valid = discriminator(img)

combined = tf.keras.Model(z, valid)
combined.compile(loss='binary_crossentropy', optimizer='adam')

# Addestramento del GAN
for epoch in range(epochs):
    idx = np.random.randint(0, data.shape[0], batch_size)
    real_data = data.iloc[idx].values

    noise = np.random.normal(0, 1, (batch_size, latent_dim))
    gen_data = generator.predict(noise)

    d_loss_real = discriminator.train_on_batch(real_data, np.ones((batch_size, 1)))
    d_loss_fake = discriminator.train_on_batch(gen_data, np.zeros((batch_size, 1)))
    d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

    noise = np.random.normal(0, 1, (batch_size, latent_dim))
    g_loss = combined.train_on_batch(noise, np.ones((batch_size, 1)))

    if epoch % 1000 == 0:
        print(f"Epoch: {epoch} | D Loss: {d_loss[0]} | D Acc.: {100 * d_loss[1]} | G Loss: {g_loss}")

# Rilevamento delle anomalie
def detect_anomalies(data, generator, threshold=0.1):
    noise = np.random.normal(0, 1, (len(data), latent_dim))
    gen_data = generator.predict(noise)
    reconstruction_error = np.mean(np.abs(data.values - gen_data), axis=1)
    anomalies = reconstruction_error > threshold
    return anomalies

anomalies = detect_anomalies(data, generator)

# Visualizzazione delle anomalie
data['anomaly'] = anomalies
print(data[data['anomaly'] == True])
