In [1]:
import scipy.io as sio
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
import os

In [2]:
from sklearn.manifold import TSNE

In [3]:
# ================== Dataset Selector ==================
def load_dataset(name):
    if name == 'pavia':
        data = sio.loadmat('/content/PaviaU.mat')['paviaU']
        gt = sio.loadmat('/content/PaviaU_gt.mat')['paviaU_gt']
    elif name == 'salinas':
        data = sio.loadmat('/content/Salinas_corrected.mat')['salinas_corrected']
        gt = sio.loadmat('/content/Salinas_gt.mat')['salinas_gt']
    elif name == 'indian':
        data = sio.loadmat('/content/Indian_pines_corrected.mat')['indian_pines_corrected']
        gt = sio.loadmat('/content/Indian_pines_gt.mat')['indian_pines_gt']
    else:
        raise ValueError("Unsupported dataset name")
    return data, gt

In [4]:
# ================== Preprocessing ==================
def preprocess(data, gt, dataset_name):
    h, w, bands = data.shape
    if dataset_name == 'indian':
       noisy_bands = [b for b in (list(range(104, 109)) + list(range(150, 164)) + [220]) if b < data.shape[-1]]
       data = np.delete(data, noisy_bands, axis=2)
    scaler = MinMaxScaler()
    data_reshaped = data.reshape(-1, data.shape[2])
    data_scaled = scaler.fit_transform(data_reshaped)
    pca_components = 30 if dataset_name != 'indian' else 40
    pca = PCA(n_components=pca_components)
    data_pca = pca.fit_transform(data_scaled)
    data_pca = data_pca.reshape(h, w, -1)
    return data_pca, gt, h, w, pca_components

In [5]:
# ================== Patch Extraction ==================
def extract_patches(data, gt, patch_size):
    h, w, _ = data.shape
    margin = patch_size // 2
    padded_data = np.pad(data, ((margin, margin), (margin, margin), (0, 0)), mode='reflect')
    padded_gt = np.pad(gt, ((margin, margin), (margin, margin)), mode='reflect')
    patches, labels, coords = [], [], []
    for i in range(margin, margin + h):
        for j in range(margin, margin + w):
            patch = padded_data[i - margin:i + margin, j - margin:j + margin, :]
            label = padded_gt[i, j]
            if label != 0:
                patches.append(patch)
                labels.append(label)
                coords.append((i - margin, j - margin))
    return np.array(patches), np.array(labels), np.array(coords), h, w

In [6]:
# ================== Autoencoder ==================
class PatchAutoencoder(nn.Module):
    def __init__(self, latent_dim, input_dim):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Flatten(),
            nn.Linear(input_dim, 256),
            nn.ReLU(),
            nn.Linear(256, latent_dim)
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 256),
            nn.ReLU(),
            nn.Linear(256, input_dim)
        )

    def forward(self, x):
        z = self.encoder(x)
        out = self.decoder(z)
        return out, z

In [7]:
class SimpleTransformer(nn.Module):
    def __init__(self, dim=32, heads=4):
        super().__init__()
        self.attn = nn.MultiheadAttention(embed_dim=dim, num_heads=heads, batch_first=True)
        self.linear = nn.Sequential(
            nn.Linear(dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, z):
        # z should be [batch_size, latent_dim] → [batch_size, 1, latent_dim]
        z = z.unsqueeze(1)
        if z.shape[-1] != self.attn.embed_dim:
            print(f"[ERROR] Input dim mismatch: z.shape = {z.shape}, expected embed_dim = {self.attn.embed_dim}")
            raise ValueError("Latent dim mismatch with transformer input.")

        attn_out, _ = self.attn(z, z, z)
        # Should be [batch_size, 1, dim], squeeze to [batch_size, dim]
        squeezed = attn_out.squeeze(1)
        if squeezed.shape[-1] != self.linear[0].in_features:
            print(f"[ERROR] Linear layer input mismatch: {squeezed.shape}")
            raise ValueError("Transformer output mismatch with linear layer.")

        scores = self.linear(squeezed).squeeze()
        return scores


In [None]:
# # ================== Dynamic Pipeline ==================
# def run_pipeline(dataset_name, patch_size=24, latent_dim=32):
#     torch.cuda.empty_cache()  # Free GPU memory

#     data, gt = load_dataset(dataset_name)
#     data_pca, gt, h, w, pca_dim = preprocess(data, gt, dataset_name)  # h, w, pca_dim returned here
#     input_dim = patch_size * patch_size * pca_dim

#     patches, labels, coords, h, w = extract_patches(data_pca, gt, patch_size=patch_size)

#     device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#     model = PatchAutoencoder(latent_dim=latent_dim, input_dim=input_dim).to(device)
#     patches_tensor = torch.tensor(patches).permute(0, 3, 1, 2).float().to(device)
#     patches_tensor = patches_tensor.reshape(-1, input_dim)

#     criterion = nn.MSELoss()
#     optimizer = optim.Adam(model.parameters(), lr=0.001)

#     print("Training Autoencoder:")
#     for epoch in range(5):  # Reduce epochs for memory safety
#         model.train()
#         optimizer.zero_grad()
#         output, _ = model(patches_tensor)
#         loss = criterion(output, patches_tensor)
#         loss.backward()
#         optimizer.step()
#         print(f"Epoch {epoch+1}/5, Training Loss: {loss.item():.4f}")

#     model.eval()
#     with torch.no_grad():
#         _, latent_z = model(patches_tensor)

#     transformer = SimpleTransformer(dim=latent_dim).to(device)
#     trans_scores = []
#     batch_size = 1024
#     for i in range(0, latent_z.shape[0], batch_size):
#         batch = latent_z[i:i+batch_size]
#         with torch.no_grad():
#             scores = transformer(batch.to(device)).cpu().numpy()
#         trans_scores.extend(scores)
#     trans_scores = np.array(trans_scores)

In [None]:
# # Visualize Anomaly Scores on Ground Truth
# anomaly_map = np.zeros((h, w))
# for idx, (x, y) in enumerate(coords):
#     anomaly_map[x, y] = trans_scores[idx]

# plt.figure(figsize=(12, 6))
# plt.subplot(1, 3, 1)
# plt.title("Ground Truth")
# plt.imshow(gt, cmap='gray')
# plt.axis('off')

# plt.subplot(1, 3, 2)
# plt.title("Anomaly Heatmap")
# plt.imshow(anomaly_map, cmap='hot')
# plt.axis('off')

# plt.subplot(1, 3, 3)
# plt.title("Overlay")
# plt.imshow(gt, cmap='gray')
# plt.imshow(anomaly_map, cmap='hot', alpha=0.5)
# plt.axis('off')

# plt.tight_layout()
# plt.show()

# X_train, X_test, y_train, y_test = train_test_split(latent_z.cpu().numpy(), labels, test_size=0.3, random_state=42)
# svm = SVC(kernel='rbf', class_weight='balanced')
# svm.fit(X_train, y_train)
# y_pred = svm.predict(X_test)

# print("SVM Classification Report:")
# print(classification_report(y_test, y_pred))
# conf = confusion_matrix(y_test, y_pred)
# sns.heatmap(conf, annot=True, fmt='d', cmap='Blues')
# plt.title("Confusion Matrix")
# plt.xlabel("Predicted")
# plt.ylabel("True")
# plt.show()

# plt.figure(figsize=(10, 6))
# sns.histplot(trans_scores, bins=100, kde=True, color='orange')
# plt.title(f"Transformer Anomaly Score Distribution - {dataset_name.upper()}")
# plt.xlabel("Anomaly Score")
# plt.ylabel("Frequency")
# plt.grid()
# plt.show()


NameError: name 'h' is not defined

In [10]:
# ================== Run Pipeline ==================
def run_pipeline(dataset_name, patch_size=16, latent_dim=32, num_epochs=10):
    os.makedirs("outputs", exist_ok=True)
    torch.cuda.empty_cache()
    print("Loading dataset and preprocessing...")
    data, gt = load_dataset(dataset_name)
    data_pca, gt, h, w, pca_dim = preprocess(data, gt, dataset_name)
    input_dim = patch_size * patch_size * pca_dim
    patches, labels, coords, h, w = extract_patches(data_pca, gt, patch_size=patch_size)

    print("Initializing Autoencoder...")
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = PatchAutoencoder(latent_dim=latent_dim, input_dim=input_dim).to(device)

    patches_tensor = torch.tensor(patches).permute(0, 3, 1, 2).float()
    patches_tensor = patches_tensor.reshape(-1, input_dim)
    train_loader = DataLoader(TensorDataset(patches_tensor), batch_size=256, shuffle=True)

    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    epoch_losses = []
    print("Training Autoencoder:")
    for epoch in range(1, num_epochs + 1):
        model.train()
        epoch_loss = 0
        for batch in train_loader:
            batch = batch[0].to(device)
            optimizer.zero_grad()
            output, _ = model(batch)
            loss = criterion(output, batch)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        avg_loss = epoch_loss / len(train_loader)
        epoch_losses.append(avg_loss)
        print(f"Epoch {epoch}/{num_epochs} - Loss: {avg_loss:.6f}")

    plt.figure()
    plt.plot(range(1, num_epochs+1), epoch_losses, marker='o')
    plt.title("Autoencoder Training Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.grid()
    plt.savefig(f"outputs/{dataset_name}_ae_loss_curve.png")
    plt.close()

    print("Extracting latent features...")
    model.eval()
    with torch.no_grad():
        _, latent_z = model(patches_tensor.to(device))

    print("Visualizing Latent Space...")
    visualize_latent_space(latent_z.cpu().numpy(), labels, dataset_name)

    print("Running Transformer on latent features...")
    transformer = SimpleTransformer(dim=latent_dim).to(device)
    trans_scores = []
    batch_size = 256
    for i in range(0, latent_z.shape[0], batch_size):
        batch = latent_z[i:i+batch_size]
        with torch.no_grad():
            scores = transformer(batch.to(device)).cpu().numpy()
        trans_scores.extend(scores)
    trans_scores = np.array(trans_scores)

    print("Generating Anomaly Map...")
    anomaly_map = np.zeros((h, w))
    for idx, (x, y) in enumerate(coords):
        anomaly_map[x, y] = trans_scores[idx]

    plt.figure(figsize=(12, 6))
    plt.subplot(1, 3, 1)
    plt.title("Ground Truth")
    plt.imshow(gt, cmap='gray')
    plt.axis('off')

    plt.subplot(1, 3, 2)
    plt.title("Anomaly Heatmap")
    plt.imshow(anomaly_map, cmap='hot')
    plt.axis('off')

    plt.subplot(1, 3, 3)
    plt.title("Overlay")
    plt.imshow(gt, cmap='gray')
    plt.imshow(anomaly_map, cmap='hot', alpha=0.5)
    plt.axis('off')
    plt.tight_layout()
    plt.savefig(f"outputs/{dataset_name}_anomaly_map.png")
    plt.close()

    print("Training SVM Classifier on Latent Features...")
    X = latent_z.cpu().numpy()
    y = np.array(labels)


    # Sanity check for NaN/Inf
    if np.isnan(X).any() or np.isinf(X).any():
        print("[WARNING] Found NaNs or Infs in latent features. Cleaning up...")
        X = np.nan_to_num(X, nan=0.0, posinf=1e6, neginf=-1e6) # Correctly call np.nan_to_num

    # Optional: remove duplicate entries if any
    #unique_rows = ~np.all(np.isclose(X, 0.0), axis=1) #This line was incomplete and causing the original error
    #X = X[unique_rows]
    #y = y[unique_rows]

    # SVM Training
    try:
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
        svm = SVC(kernel='rbf', class_weight='balanced')
        svm.fit(X_train, y_train)
        y_pred = svm.predict(X_test)

        print("SVM Classification Report:")
        print(classification_report(y_test, y_pred))
        conf = confusion_matrix(y_test, y_pred)
        plt.figure()
        sns.heatmap(conf, annot=True, fmt='d', cmap='Blues')
        plt.title("Confusion Matrix")
        plt.xlabel("Predicted")
        plt.ylabel("True")
        plt.savefig(f"outputs/{dataset_name}_conf_matrix_dynamic.png")
        plt.close()
    except Exception as e:
        print(f"[ERROR] SVM training failed: {e}")

In [11]:
# ================== Visualizing Latent Space (t-SNE) ==================
def visualize_latent_space(z, labels, dataset_name):
    tsne = TSNE(n_components=2, random_state=42)
    z_2d = tsne.fit_transform(z)
    plt.figure(figsize=(8, 6))
    scatter = plt.scatter(z_2d[:, 0], z_2d[:, 1], c=labels, cmap='tab20', s=5)
    plt.title(f"Latent Space t-SNE Visualization - {dataset_name.upper()}")
    plt.colorbar(scatter)
    plt.tight_layout()
    plt.savefig(f"outputs/{dataset_name}_latent_tsne.png")
    plt.close()

In [13]:
# ================== Run ==================
run_pipeline('pavia')

Loading dataset and preprocessing...
Initializing Autoencoder...
Training Autoencoder:
Epoch 1/10 - Loss: 0.010470
Epoch 2/10 - Loss: 0.004926
Epoch 3/10 - Loss: 0.004067
Epoch 4/10 - Loss: 0.003738
Epoch 5/10 - Loss: 0.003597
Epoch 6/10 - Loss: 0.003503
Epoch 7/10 - Loss: 0.003431
Epoch 8/10 - Loss: 0.003362
Epoch 9/10 - Loss: 0.003331
Epoch 10/10 - Loss: 0.003284
Extracting latent features...
Visualizing Latent Space...
Running Transformer on latent features...
Generating Anomaly Map...
Training SVM Classifier on Latent Features...
SVM Classification Report:
              precision    recall  f1-score   support

           1       0.99      0.93      0.96      1994
           2       0.98      0.86      0.91      5617
           3       0.89      0.97      0.93       648
           4       1.00      0.99      0.99       894
           5       1.00      1.00      1.00       414
           6       0.64      0.94      0.76      1508
           7       0.82      0.99      0.90       395
