In [2]:
from sklearn.svm import OneClassSVM
from sklearn.neighbors import NearestNeighbors
from sklearn.metrics import accuracy_score, confusion_matrix, roc_auc_score
from torchvision import transforms as T
from torch.utils.data import Dataset, DataLoader, TensorDataset
from PIL import Image
from pytorch_msssim import ssim
import os
import numpy as np
import torch
import matplotlib.pyplot as plt
import torch.nn as nn
import csv
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim


#### Data Path

In [None]:
dataset_path = './mvtec_anomaly_detection/'

In [None]:
# print('Training Set: \t\t' + ', '.join(train_set))
# print('Testing Folders: \t' + ', '.join(test_folders))
# print('Ground Truth Folders \t' + ', '.join(ground_truth_folders))

# Pre-Processing & Data augmentation

In [None]:
transform_pipeline = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    transforms.ToTensor(),
])

#### Read image and convert it to gray image

In [None]:
img_height, img_width, img_channels = 256, 256, 1
X, y_good, y_bad, ground_truth = [], [], [], []
all_items = os.listdir(dataset_path)
class_names = [item for item in all_items 
               if os.path.isdir(os.path.join(dataset_path, item))]

for class_name in class_names:
    train_path = dataset_path + class_name + '/train/good/'

    test_path = dataset_path + class_name + '/test/'

    ground_truth_path = dataset_path + class_name + '/ground_truth/'

    train_set = os.listdir(train_path)

    test_folders = os.listdir(test_path)

    ground_truth_folders = os.listdir(ground_truth_path)
    
    for img_name in train_set:
        img_path = train_path + img_name
        img = Image.open(img_path)
        img = img.resize((img_height, img_width))
        img_gray = img.convert('L')
        img_gray = transform_pipeline(img_gray)
        X.append(img_gray)

    for test_folder in test_folders:
        test_folder_path = test_path + test_folder

        for img_name in sorted(os.listdir(test_folder_path)):
            img_path = test_folder_path + '/' + img_name
            img = Image.open(img_path)
            img = img.resize((img_height, img_width))
            img_gray = img.convert('L')
            img_gray = transform_pipeline(img_gray)

            if test_folder == 'good':
                y_good.append(img_gray)
            else:
                y_bad.append(img_gray)
            
    for ground_truth_folder in ground_truth_folders:
        ground_truth_folder_path = ground_truth_path + ground_truth_folder

        for img_name in sorted(os.listdir(ground_truth_folder_path)):
            img_path = ground_truth_folder_path + '/' + img_name
            img = Image.open(img_path)
            img = img.resize((img_height, img_width))
            img_gray = img.convert('L')
            img_gray = transform_pipeline(img_gray)
            ground_truth.append(img_gray)

print("Training: \t\t", np.shape(X))
print("Good Testing: \t\t", np.shape(y_good))
print("Bad Testing: \t\t", np.shape(y_bad))
print("Ground Truth: \t\t", np.shape(ground_truth))

In [None]:
print(X[0])

# SVM

In [None]:
X_tensor = torch.stack(X)
y_good_tensor = torch.stack(y_good)
y_bad_tensor = torch.stack(y_bad)

n_features = img_height * img_width
X_train_features = X_tensor.reshape(-1, n_features)
y_good_features = y_good_tensor.reshape(-1, n_features)
y_bad_features = y_bad_tensor.reshape(-1, n_features)

X_train_features = X_train_features.detach().cpu().numpy()
y_good_features = y_good_features.detach().cpu().numpy()
y_bad_features = y_bad_features.detach().cpu().numpy()

print("Training feature shape: \t", X_train_features.shape)
print("Good test feature shape: \t", y_good_features.shape)
print("Bad test feature shape: \t", y_bad_features.shape)

In [None]:
print("Training One-Class SVM...")
svm = OneClassSVM(kernel='rbf', nu=0.1, gamma='auto')

svm.fit(X_train_features)
print("SVM training complete")

In [None]:
X_test_features = np.concatenate((y_good_features, y_bad_features), axis=0)

labels_good = np.zeros(y_good_features.shape[0])
labels_bad = np.ones(y_bad_features.shape[0])
y_true_labels = np.concatenate((labels_good, labels_bad), axis=0)

print("Total test features shape: \t", X_test_features.shape)
print("Total test labels shape: \t", y_true_labels.shape)

In [None]:
svm_predictions = svm.predict(X_test_features)

y_pred_labels = [0 if p == 1 else 1 for p in svm_predictions]

accuracy = accuracy_score(y_true_labels, y_pred_labels)
auc = roc_auc_score(y_true_labels, y_pred_labels)

print("--- Evaluation Results ---")
print(f"Accuracy: {accuracy * 100:.2f}%")
print(f"AUC Score: {auc:.4f}")

print("\nConfusion Matrix:")
print(confusion_matrix(y_true_labels, y_pred_labels))

In [None]:
import csv
with open('submit_SVM.csv', mode='w', newline='') as submit_file:
    csv_writer = csv.writer(submit_file)
    header = ['id', 'prediction']
    print(header)
    csv_writer.writerow(header)
    for i in range(svm_predictions.shape[0]):
        row = [str(i), 0 if svm_predictions[i] == -1 else 1]
        csv_writer.writerow(row)
        print(row)

# KNN

In [None]:
k = 5

print("Fitting KNN model...")
knn = NearestNeighbors(n_neighbors=k, n_jobs=-1)
knn.fit(X_train_features)
print(X_train_features)
print("Model fitting complete")

In [None]:
distances, indices = knn.kneighbors(X_test_features)

anomaly_scores = np.mean(distances, axis=1)

print(f"Calculated {len(anomaly_scores)} anomaly scores")

In [None]:
train_distances, _ = knn.kneighbors(X_train_features)
train_anomaly_scores = np.mean(train_distances, axis=1)

threshold = np.percentile(train_anomaly_scores, 95)
print(f"Anomaly Threshold set to: {threshold:.4f}")

y_pred_labels = (anomaly_scores > threshold).astype(int)

accuracy = accuracy_score(y_true_labels, y_pred_labels)
auc = roc_auc_score(y_true_labels, anomaly_scores)
print("\n--- Evaluation result ---")
print(f"Accuracy: {accuracy * 100:.2f}%")
print(f"AUC Score: {auc:.4f}")
print("\nConfusion Matrix:")
print(confusion_matrix(y_true_labels, y_pred_labels))

In [None]:
with open('submit_KNN.csv', mode='w', newline='') as submit_file:
    csv_writer = csv.writer(submit_file)
    header = ['id', 'prediction']
    print(header)
    csv_writer.writerow(header)
    for i in range(svm_predictions.shape[0]):
        row = [str(i), y_pred_labels[i]]
        csv_writer.writerow(row)
        print(row)

# AutoEncoder

In [None]:
class AutoEncoder(nn.Module):
    def __init__(self):
        super(AutoEncoder, self).__init__()

        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=2, padding=1), # [N, 1, 256, 256] -> [N, 16, 128, 128]
            nn.ReLU(True),
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1), # [N, 16, 128, 128] -> [N, 32, 64, 64]
            nn.ReLU(True),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1), # [N, 32, 64, 64] -> [N, 64, 32, 32]
            nn.ReLU(True),
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1), # [N, 64, 32, 32] -> [N, 128, 16, 16]
            nn.ReLU(True),
            nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1), # [N, 128, 16, 16] -> [N, 256, 8, 8]
            nn.ReLU(True),
        )

        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, kernel_size=3, stride=2, padding=1, output_padding=1), # [N, 256, 8, 8] -> [N, 128, 16, 16]
            nn.ReLU(True),
            nn.ConvTranspose2d(128, 64, kernel_size=3, stride=2, padding=1, output_padding=1), # [N, 128, 16, 16] -> [N, 64, 32, 32]
            nn.ReLU(True),
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1), # [N, 64, 32, 32] -> [N, 32, 64, 64]
            nn.ReLU(True),
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1), # [N, 32, 64, 64] -> [N, 16, 128,128]
            nn.ReLU(True),
            nn.ConvTranspose2d(16, 1, kernel_size=3, stride=2, padding=1, output_padding=1), #[N, 16, 128, 128] -> #[N, 1, 256, 256]
            nn.Sigmoid(),
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [None]:
num_epochs = 50
batch_size = 64
learning_rate = 1e-3

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device {device}")

train_dataset = TensorDataset(X_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

model = AutoEncoder().to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

print("--- Starting Training ---")

for epoch in range(num_epochs):
    epoch_loss = 0.0
    for data in train_loader:
        images = data[0].to(device)

        outputs = model(images)

        loss = criterion(outputs, images)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item() * images.size(0)

    avg_epoch_loss = epoch_loss / len(train_dataset)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_epoch_loss:.6f}")

print("--- Training Complete ---")

In [None]:
model.eval()

y_good_tensor = y_good_tensor.to(device)
y_bad_tensor = y_bad_tensor.to(device)

with torch.no_grad():
    reconstructed_good = model(y_good_tensor)
    reconstructed_bad = model(y_bad_tensor)

    test_loss = criterion(reconstructed_good, y_good_tensor)

loss_fn = nn.MSELoss(reduction='none')

error_good = loss_fn(reconstructed_good, y_good_tensor)
scores_good = torch.mean(error_good, dim=(1, 2, 3))

error_bad = loss_fn(reconstructed_bad, y_bad_tensor)
scores_bad = torch.mean(error_bad, dim=(1, 2, 3))

anomaly_scores_tensor = torch.cat([scores_good, scores_bad])
anomaly_scores_numpy = anomaly_scores_tensor.cpu().numpy()

auc = roc_auc_score(y_true_labels, anomaly_scores_numpy)

print("--- AutoEncoder Evaluation ---")
print(f"AUC Score: {auc:.4f}")
print(f"Testing Loss: {test_loss.item():.6f}")

# AE + KNN

In [4]:
batch_size = 32

In [8]:
# --- 1. Define your transformations ---
# We resize, convert to grayscale, and turn into a tensor
train_transform = T.Compose([
    T.Resize((256, 256)),
    T.Grayscale(num_output_channels=1),
    
    # --- Data Augmentations Added ---
    # 50% chance of flipping the image horizontally
    T.RandomHorizontalFlip(p=0.5), 
    
    # 50% chance of flipping the image vertically
    T.RandomVerticalFlip(p=0.5),
    
    # Rotate the image by a random amount (e.g., up to 20 degrees)
    T.RandomRotation(20),
    
    # Slightly change brightness and contrast
    T.ColorJitter(brightness=0.2, contrast=0.2),
    # ----------------------------------
    
    T.ToTensor(),
    # It's also a good practice to normalize your data
    # T.Normalize(mean=[0.5], std=[0.5]) 
])

# --- IMPORTANT ---
# Your test_transform should NOT have augmentations
test_transform = T.Compose([
    T.Resize((256, 256)),
    T.Grayscale(num_output_channels=1),
    T.ToTensor(),
    # T.Normalize(mean=[0.5], std=[0.5]) # Add if you normalize in training
])

# --- 2. Create your custom Dataset class ---
# (This class is good, no changes needed from last time)
class MVTecDataset(Dataset):
    def __init__(self, file_paths, transform=None):
        self.file_paths = file_paths
        self.transform = transform

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        img_path = self.file_paths[idx]
        image = Image.open(img_path)
        
        if self.transform:
            image = self.transform(image)
            
        # For this dataset, we only need the image for training
        return (image,) # Return as a tuple

# --- 3. Load your REAL training file paths ---
train_dir = './Dataset/train/'
train_files = []
image_extensions = ('.png', '.jpg', '.jpeg', '.tif', '.bmp')

print(f"Walking through {train_dir} to find 'good' training images...")

# os.walk will go through all subfolders (bottle, carpet, etc.)
for root, dirs, files in os.walk(train_dir):
    # The 'good' folder contains the normal training images
    for file in files:
        if file.lower().endswith(image_extensions):
            train_files.append(os.path.join(root, file))

print(f"Found {len(train_files)} normal training images.")

# --- 4. Create your REAL Dataset and DataLoader ---
train_dataset = MVTecDataset(train_files, transform=train_transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


test_dir = './Dataset/test/'
test_files = []
image_extensions = ('.png', '.jpg', '.jpeg', '.tif', '.bmp')

print(f"Walking through {test_dir} to find 'good' training images...")

# os.walk will go through all subfolders (bottle, carpet, etc.)
for root, dirs, files in os.walk(test_dir):
    for file in files:
        if file.lower().endswith(image_extensions):
            test_files.append(os.path.join(root, file))

print(f"Found {len(test_files)} normal training images.")
# (Assuming 'test_files' is your list of test image paths)
test_dataset = MVTecDataset(test_files, transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

Walking through ./Dataset/train/ to find 'good' training images...
Found 3629 normal training images.
Walking through ./Dataset/test/ to find 'good' training images...
Found 1725 normal training images.


In [9]:
# --- 0. Model & Helper Function Definitions ---

# This AE is for 1-channel (grayscale) 256x256 images
class AutoEncoder(nn.Module):
    def __init__(self, latent_dims=128):
        super(AutoEncoder, self).__init__()
        
        # --- Encoder ---
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=4, stride=2, padding=1),  # -> 32x128x128
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1), # -> 64x64x64
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1), # -> 128x32x32
            nn.ReLU(),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1), # -> 256x16x16
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(256 * 16 * 16, latent_dims) # -> 128
        )
        
        # --- Decoder ---
        self.decoder = nn.Sequential(
            nn.Linear(latent_dims, 256 * 16 * 16),
            nn.ReLU(),
            nn.Unflatten(1, (256, 16, 16)),
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1), # -> 128x32x32
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1), # -> 64x64x64
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1), # -> 32x128x128
            nn.ReLU(),
            nn.ConvTranspose2d(32, 1, kernel_size=4, stride=2, padding=1), # -> 1x256x256
            nn.Sigmoid() # Output images between 0 and 1
        )

    def forward(self, x):
        z = self.encoder(x)
        reconstructed = self.decoder(z)
        return reconstructed

def get_features(model, dataloader, device):
    """
    Helper function to extract encoder features for an entire dataset.
    """
    model.eval()
    all_features = []
    with torch.no_grad():
        for data in dataloader:
            images = data[0].to(device)
            features = model.encoder(images)
            all_features.append(features.cpu().numpy())
            
    return np.concatenate(all_features, axis=0)

# --- 1. Setup & Train the Autoencoder ---

# Hyperparameters
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_epochs = 50 # Increase this for real data
batch_size = 32
learning_rate = 1e-3
latent_dims = 128

model = AutoEncoder(latent_dims=latent_dims).to(device)
criterion = nn.MSELoss() # Simple MSE is fine for training the feature extractor
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

print(f"Using device: {device}")
print("--- Starting AE Training ---")

model.train()
for epoch in range(num_epochs):
    epoch_loss = 0.0
    for data in train_loader:
        images = data[0].to(device)
        
        outputs = model(images)
        loss = criterion(outputs, images)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item() * images.size(0)

    avg_epoch_loss = epoch_loss / len(train_dataset)
    # if (epoch + 1) % 10 == 0:
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_epoch_loss:.6f}")

print("--- AE Training Complete ---")


# --- 2. k-NN Setup (Build the "Map of Normal") ---

print("\n--- Building k-NN Feature Map ---")
# Create a dataloader for the *full* training set (no shuffling)
train_loader_full = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)

# 1. Get all features from the "normal" training data
X_train_features = get_features(model, train_loader_full, device)
print(f"Created training feature map with shape: {X_train_features.shape}")

# 2. Fit the k-NN model on these "normal" features
knn = NearestNeighbors(n_neighbors=5, n_jobs=-1)
knn.fit(X_train_features)
print("k-NN model fitted on training features.")

Using device: cuda
--- Starting AE Training ---
Epoch [1/50], Loss: 0.038701
Epoch [2/50], Loss: 0.015337


KeyboardInterrupt: 

In [None]:
print("\n--- Evaluating Test Set ---")
# 1. Get features for all test images
test_features = get_features(model, test_loader, device)
print(f"Created test feature map with shape: {test_features.shape}")

# 2. Get distances to nearest neighbors in the "normal" map
distances, _ = knn.kneighbors(test_features)

# 3. Anomaly score = mean distance to neighbors
anomaly_scores = np.mean(distances, axis=1)
print(f"Calculated {len(anomaly_scores)} anomaly scores.")

# --- 4. Set Threshold & Get Results ---

# 1. Get scores for the *training* data to find a "normal" threshold
train_distances, _ = knn.kneighbors(X_train_features)
train_scores = np.mean(train_distances, axis=1)

# 2. Set the threshold at the 95th percentile of "normal" scores
threshold = np.percentile(train_scores, 95)
print(f"Anomaly threshold set to: {threshold:.6f}")

# 3. Make predictions
predictions = (anomaly_scores > threshold).astype(int)

# --- 5. Print Results ---
print("\n--- Results ---")
print(f"Predictions (first 20): {predictions[:20]}")
print(f"Total anomalies predicted: {np.sum(predictions)} / {len(predictions)}")

# If you have labels, you can calculate the AUC
# We use the raw scores, not the 0/1 predictions, for AUC

# Visualize a test image and its reconstruction
print("Visualizing a test image...")
model.eval()
with torch.no_grad():
    # --- THIS IS THE FIX ---
    # Get the 150th image tensor directly from your dataset
    # test_dataset[150] returns a tuple (image_tensor,)
    # so we take the first element [0]
    img_tensor = test_dataset[150][0].unsqueeze(0).to(device)
    # -----------------------

    recon = model(img_tensor)

    img = img_tensor.squeeze(0).cpu().permute(1, 2, 0).numpy()
    rec_img = recon.squeeze(0).cpu().permute(1, 2, 0).numpy()

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5))
    ax1.imshow(img, cmap='gray')
    ax1.set_title("Original Image (Index 150)")
    ax1.axis('off')

    ax2.imshow(rec_img, cmap='gray')
    ax2.set_title("Reconstructed")
    ax2.axis('off')
    plt.show()

In [None]:
with open('submit_AutoEncoder.csv', mode='w', newline='') as submit_file:
    csv_writer = csv.writer(submit_file)
    header = ['id', 'prediction']
    csv_writer.writerow(header)

    # This loop is now correct
    for i in range(len(predictions)):
        row = [str(i), predictions[i]]
        csv_writer.writerow(row)

print("--- Submission file 'submit_AutoEncoder.csv' created. ---")