<a href="https://colab.research.google.com/github/Samay-jain622/Predictive-Maintenance-with-Markov-Chains-and-Anomaly-Detection-using-Variational-Autoencoders/blob/main/vae.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

vae

In [1]:
import pandas as pd

In [2]:
pip install optuna

Collecting optuna
  Downloading optuna-4.3.0-py3-none-any.whl.metadata (17 kB)
Collecting alembic>=1.5.0 (from optuna)
  Downloading alembic-1.15.2-py3-none-any.whl.metadata (7.3 kB)
Collecting colorlog (from optuna)
  Downloading colorlog-6.9.0-py3-none-any.whl.metadata (10 kB)
Downloading optuna-4.3.0-py3-none-any.whl (386 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m386.6/386.6 kB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading alembic-1.15.2-py3-none-any.whl (231 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m231.9/231.9 kB[0m [31m19.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading colorlog-6.9.0-py3-none-any.whl (11 kB)
Installing collected packages: colorlog, alembic, optuna
Successfully installed alembic-1.15.2 colorlog-6.9.0 optuna-4.3.0


In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import optuna
from torch.utils.data import DataLoader, Dataset

In [4]:
train_data=pd.read_csv('/content/som_train.csv')
test_data=pd.read_csv('/content/som_test.csv')

In [5]:
train_data.isnull().sum()

Unnamed: 0,0
A,4
B,4
C,4
L1,4
L2,4
L3,4
L4,4
L5,4


In [6]:
train_data.dropna(inplace=True)

In [7]:
test_data.dropna(inplace=True)

In [8]:
test_data.isnull().sum()

Unnamed: 0,0
A,0
B,0
C,0
L1,0
L2,0
L3,0
L4,0
L5,0


In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import optuna
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader,Dataset

In [10]:
scaler = MinMaxScaler()
train_data = scaler.fit_transform(train_data)
test_data = scaler.transform(test_data)

In [11]:
train_data=torch.tensor(train_data,dtype=torch.float32)
test_data=torch.tensor(test_data,dtype=torch.float32)

In [12]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [13]:
class VAE(nn.Module):

  def __init__(self,input_dim,latent_dim,hidden_units,dropout_rate):
    super().__init__()
    encoded_layers=[]
    prev_dim=input_dim
    for h in hidden_units:
      encoded_layers.append(nn.Linear(prev_dim,h))
      encoded_layers.append(nn.BatchNorm1d(h))
      encoded_layers.append(nn.ReLU())
      encoded_layers.append(nn.Dropout(dropout_rate))
      prev_dim=h

    self.encoder=nn.Sequential(*encoded_layers)
    self.fc_mu=nn.Linear(prev_dim,latent_dim)
    self.fc_var=nn.Linear(prev_dim,latent_dim)

    decoded_layers=[]
    prev_dim=latent_dim
    for h in hidden_units[::-1]:
      decoded_layers.append(nn.Linear(prev_dim,h))
      decoded_layers.append(nn.BatchNorm1d(h))
      decoded_layers.append(nn.ReLU())
      decoded_layers.append(nn.Dropout(dropout_rate))
      prev_dim=h
    decoded_layers.append(nn.Linear(prev_dim,input_dim))
    decoded_layers.append(nn.Sigmoid())
    self.decoder=nn.Sequential(*decoded_layers)

    self.initialize_weights()
  def initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)

  def reparameterize(self,mu,logvar):
    std=torch.exp(0.5*logvar)
    eps=torch.randn_like(std,device=device)
    return mu+std*eps

  def forward(self,x):
    encoded=self.encoder(x)
    mu,logvar=self.fc_mu(encoded),self.fc_var(encoded)
    z=self.reparameterize(mu,logvar)
    decoded=self.decoder(z)
    return decoded,mu,logvar

In [14]:
def vae_loss_function(reconstructed, original, mu, logvar):

    recon_loss = nn.functional.mse_loss(reconstructed, original, reduction='sum')

    # KL Divergence Loss
    kl_divergence = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

    return recon_loss + kl_divergence

In [15]:
train_data = train_data.to(device)
test_data = test_data.to(device)

In [None]:
def objective(trial):
    # Hyperparameter tuning
    latent_dim = trial.suggest_int("latent_dim", 1, 4)
    num_layers = trial.suggest_int("num_layers", 1,3
                                   )
    hidden_units = [trial.suggest_int(f"hidden_{i}", 16, 128, step=16) for i in range(num_layers)]
    batch_size = trial.suggest_categorical("batch_size", [32, 64, 128])
    learning_rate = trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True)
    dropout_rate = trial.suggest_float("dropout_rate", 0.1, 0.5, step=0.10)
    epochs = trial.suggest_int("epochs", 60, 100, step=20)
        # Data loaders
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

    input_dim=train_data.shape[1]
    model = VAE(input_dim, latent_dim, hidden_units,dropout_rate).to(device)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    for epoch in range(epochs):
      model.train()
      for batch in train_loader:
        batch_features=batch.to(device)
        optimizer.zero_grad()
        reconstructed,mu,logvar=model(batch_features)
        loss = vae_loss_function(reconstructed, batch_features, mu, logvar)
        loss.backward()
        optimizer.step()

    model.eval()
    with torch.no_grad():
        reconstructed, mu, logvar = model(test_data)
        reconstruction_error = ((reconstructed - test_data) ** 2).mean().item()

    return reconstruction_error


In [None]:
import optuna

study = optuna.create_study(direction='minimize')

[I 2025-04-11 17:38:00,892] A new study created in memory with name: no-name-2da0bd83-03ca-4e41-bc36-852d62d87d92


In [None]:
study.optimize(objective, n_trials=10)

[I 2025-04-11 17:43:03,682] Trial 0 finished with value: 0.04148967191576958 and parameters: {'latent_dim': 4, 'num_layers': 2, 'hidden_0': 64, 'hidden_1': 96, 'batch_size': 128, 'learning_rate': 0.00300944047506664, 'dropout_rate': 0.5, 'epochs': 80}. Best is trial 0 with value: 0.04148967191576958.
[I 2025-04-11 17:47:01,478] Trial 1 finished with value: 0.04149436578154564 and parameters: {'latent_dim': 2, 'num_layers': 1, 'hidden_0': 80, 'batch_size': 128, 'learning_rate': 1.998007897287504e-05, 'dropout_rate': 0.4, 'epochs': 80}. Best is trial 0 with value: 0.04148967191576958.
[I 2025-04-11 17:56:36,071] Trial 2 finished with value: 0.04147666320204735 and parameters: {'latent_dim': 4, 'num_layers': 2, 'hidden_0': 80, 'hidden_1': 64, 'batch_size': 64, 'learning_rate': 0.00024242558952901753, 'dropout_rate': 0.2, 'epochs': 80}. Best is trial 2 with value: 0.04147666320204735.
[I 2025-04-11 18:15:12,328] Trial 3 finished with value: 0.04178283363580704 and parameters: {'latent_dim'

In [None]:
print("Best hyperparameters:", study.best_params)
print("Best accuracy:", study.best_value)


Best hyperparameters: {'latent_dim': 3, 'num_layers': 1, 'hidden_0': 48, 'batch_size': 128, 'learning_rate': 0.00012234213687243047, 'dropout_rate': 0.1, 'epochs': 80}
Best accuracy: 0.0414409413933754


In [None]:
import optuna.visualization as vis

vis.plot_optimization_history(study).show()  # Shows accuracy improvement over trials
vis.plot_param_importances(study).show()  # Shows which hyperparameters are most important


In [16]:
best_params = {
    'latent_dim': 3,
    'num_layers': 1,
    'hidden_units': [48],
    'batch_size': 128,
    'learning_rate': 0.00012234213687243047,
    'dropout_rate': 0.1,
    'epochs': 80
}


In [17]:
train_loader = DataLoader(train_data, batch_size=best_params['batch_size'], shuffle=True)
test_loader = DataLoader(test_data, batch_size=best_params['batch_size'], shuffle=False)

In [18]:
input_dim = train_data.shape[1]
model = VAE(input_dim, best_params['latent_dim'], best_params['hidden_units'], best_params['dropout_rate']).to(device)
optimizer = optim.Adam(model.parameters(), lr=best_params['learning_rate'])

In [19]:
for epoch in range(best_params['epochs']):
    model.train()
    train_loss = 0
    for batch in train_loader:
        batch = batch.to(device)
        optimizer.zero_grad()
        recon_batch, mu, logvar = model(batch)
        loss = vae_loss_function(recon_batch, batch, mu, logvar)
        loss.backward()
        train_loss += loss.item()
        optimizer.step()

In [20]:
model.eval()
reconstruction_errors = []

with torch.no_grad():
    for batch in test_loader:
        batch = batch.to(device)
        recon_batch, mu, logvar = model(batch)
        error = torch.mean((batch - recon_batch) ** 2, dim=1)  # MSE per sample
        reconstruction_errors.extend(error.cpu().numpy())

# Convert to numpy
reconstruction_errors = np.array(reconstruction_errors)

In [21]:
reconstruction_errors

array([0.19982538, 0.20390469, 0.37022972, ..., 0.37035823, 0.3730516 ,
       0.43214378], dtype=float32)

In [None]:
reconstruction_errors

In [25]:

if isinstance(reconstruction_errors, torch.Tensor):
    reconstruction_errors = reconstruction_errors.detach().cpu().numpy()


In [26]:

threshold = np.percentile(reconstruction_errors, 95)  # Top 5% as anomalies




In [28]:

threshold = reconstruction_errors.mean() + 3 * reconstruction_errors.std()
anomalies = reconstruction_errors > threshold


In [29]:
anomalies

array([False, False, False, ..., False, False,  True])

In [30]:
print(f"Threshold: {threshold}")
print(f"Number of anomalies detected: {np.sum(anomalies)} / {len(reconstruction_errors)}")

Threshold: 0.3843795657157898
Number of anomalies detected: 990 / 19749


In [32]:
print(type(reconstructed))


<class 'tuple'>


In [33]:

if isinstance(reconstructed, tuple):
    reconstructed = reconstructed[0]  # or the appropriate index

In [34]:
print(type(reconstructed))

<class 'torch.Tensor'>


In [36]:
test_data

tensor([[0.1369, 0.1912, 0.1982,  ..., 0.2459, 0.7010, 0.6046],
        [0.2200, 0.2246, 0.2128,  ..., 0.2270, 0.6064, 0.4745],
        [0.2214, 0.2218, 0.2144,  ..., 0.1240, 0.2932, 0.1403],
        ...,
        [0.2622, 0.2630, 0.2524,  ..., 0.1146, 0.2613, 0.0932],
        [0.2625, 0.2614, 0.2507,  ..., 0.1146, 0.2552, 0.0888],
        [0.1263, 0.1138, 0.1334,  ..., 0.1146, 0.2633, 0.1314]],
       device='cuda:0')

In [38]:
faulty_components = torch.abs(reconstructed - test_data.to(device)).mean(dim=0)


In [39]:
faulty_components

tensor([0.1147, 0.1149, 0.1146, 0.0989, 0.1619, 0.0687, 0.1252, 0.1268],
       device='cuda:0')

In [40]:
print(type(anomalies))

<class 'numpy.ndarray'>


In [45]:
anomalous_indices = anomalies.nonzero()[0]
print("Anomalous Samples:", anomalous_indices)

Anomalous Samples: [    3     4     5  1135  1136  1137  1138  1139  1140  1141  1142  1143
  1144  1145  1146  1147  1148  1149  1150  1151  1152  1153  1154  1155
  1156  1157  1158  1159  1160  1161  1162  1163  1164  1165  1166  1167
  1168  1169  1170  1171  1172  1173  1174  1175  1176  1177  1178  1179
  1180  1181  1182  1183  1184  1185  1186  1187  1188  1189  1190  1191
  1192  1193  1194  1195  1196  1197  1198  1199  1200  1201  1202  1203
  1204  1205  1206  1207  1208  1209  1210  1211  1212  1213  1214  1215
  1216  1217  1218  1219  1220  1221  1222  1223  1224  1225  1226  1227
  1228  1229  1230  1231  1232  1233  1234  1235  1236  1237  1238  1239
  1240  1241  1242  1243  1244  1245  1246  1247  1248  1249  1250  1251
  1252  1253  1254  1255  1256  1257  1258  1259  1260  1261  1262  1263
  1264  1265  1266  1267  1268  1269  1270  1271  1272  1273  1274  1275
  1276  1277  1278  1279  1280  1281  1282  1283  1284  1285  1286  1287
  1288  1289  1290  1291  1292  

In [46]:
anomalous_indices = anomalies.nonzero()[0].flatten()


In [47]:
anomalous_indices

array([    3,     4,     5,  1135,  1136,  1137,  1138,  1139,  1140,
        1141,  1142,  1143,  1144,  1145,  1146,  1147,  1148,  1149,
        1150,  1151,  1152,  1153,  1154,  1155,  1156,  1157,  1158,
        1159,  1160,  1161,  1162,  1163,  1164,  1165,  1166,  1167,
        1168,  1169,  1170,  1171,  1172,  1173,  1174,  1175,  1176,
        1177,  1178,  1179,  1180,  1181,  1182,  1183,  1184,  1185,
        1186,  1187,  1188,  1189,  1190,  1191,  1192,  1193,  1194,
        1195,  1196,  1197,  1198,  1199,  1200,  1201,  1202,  1203,
        1204,  1205,  1206,  1207,  1208,  1209,  1210,  1211,  1212,
        1213,  1214,  1215,  1216,  1217,  1218,  1219,  1220,  1221,
        1222,  1223,  1224,  1225,  1226,  1227,  1228,  1229,  1230,
        1231,  1232,  1233,  1234,  1235,  1236,  1237,  1238,  1239,
        1240,  1241,  1242,  1243,  1244,  1245,  1246,  1247,  1248,
        1249,  1250,  1251,  1252,  1253,  1254,  1255,  1256,  1257,
        1258,  1259,

In [48]:

print("Fault Contribution Per Component:", faulty_components)

Fault Contribution Per Component: tensor([0.1147, 0.1149, 0.1146, 0.0989, 0.1619, 0.0687, 0.1252, 0.1268],
       device='cuda:0')


In [51]:
rounded_components = np.round(faulty_components, 3)  # round to 3 decimal places
print("Fault Contribution Per Component:", rounded_components)

Fault Contribution Per Component: [0.115 0.115 0.115 0.099 0.162 0.069 0.125 0.127]


In [54]:
rounded_components

array([0.115, 0.115, 0.115, 0.099, 0.162, 0.069, 0.125, 0.127],
      dtype=float32)

In [61]:
import plotly.graph_objects as go
import torch
import numpy as np

component_names = ["A", "B", "C", "L1", "L2", "L3", "L4", "L5"]

if isinstance(rounded_components, torch.Tensor):
    rounded_components = rounded_components.detach().cpu().numpy()

text_labels = [f"{v:.3f}" for v in rounded_components]

fig = go.Figure(data=[
    go.Bar(
        x=component_names,
        y=rounded_components,
        marker_color='indianred',
        text=text_labels,
        textposition='outside'
    )
])

fig.update_layout(
    title="Anomaly Contribution per Component",
    xaxis_title="Component",
    yaxis_title="Anomaly Score",
    yaxis=dict(showgrid=True),
    template="plotly_white",
    height=500,
    width=800
)

fig.show()
