<a href="https://colab.research.google.com/github/Gina-Kuo/NTHU_2025_DLIA_HW/blob/main/%E3%80%8CHW3_ipynb%E3%80%8D.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<div align="center">

#### Lab 3

# National Tsing Hua University

#### Spring 2025

#### 11320IEEM 513600

#### Deep Learning and Industrial Applications
    
## Lab 3: Anomaly Detection in Industrial Applications

</div>

### Introduction

In today's industrial landscape, the ability to detect anomalies in manufacturing processes and products is critical for maintaining quality, efficiency, and safety. This lab focuses on leveraging deep learning techniques for anomaly detection in various industrial applications, using the MVTEC Anomaly Detection Dataset. By employing ImageNet-pretrained models available in torchvision, students will gain hands-on experience in classfying defects and irregularities across different types of industrial products.

Throughout this lab, you'll be involved in the following key activities:
- Explore and process the MVTec Anomaly Detection Dataset.
- Apply ImageNet-pretrained models from [Torchvision](https://pytorch.org/vision/stable/models.html) to detect anomalies in industrial products.
- Evaluate the performance of the models to understand their effectiveness in real-world industrial applications.

### Objectives

- Understand the principles of anomaly detection in the context of industrial applications.
- Learn how to implement and utilize ImageNet-pretrained models for detecting anomalies.
- Analyze and interpret the results of the anomaly detection models to assess their practicality in industrial settings.

### Dataset

The MVTec AD Dataset is a comprehensive collection of high-resolution images across different categories of industrial products, such as bottles, cables, and metal nuts, each with various types of defects. This dataset is pivotal for developing and benchmarking anomaly detection algorithms. You can download our lab's dataset [here](https://drive.google.com/file/d/19600hUOpx0hl78TdpdH0oyy-gGTk_F_o/view?usp=share_link). You can drop downloaded data and drop to colab, or you can put into yor google drive.

### References
- [MVTec AD Dataset](https://www.kaggle.com/datasets/ipythonx/mvtec-ad/data) for the dataset used in this lab.
- [Torchvision Models](https://pytorch.org/vision/stable/models.html) for accessing ImageNet-pretrained models to be used in anomaly detection tasks.
- [State-of-the-Art Anomaly Detection on MVTec AD](https://paperswithcode.com/sota/anomaly-detection-on-mvtec-ad) for insights into the latest benchmarks and methodologies in anomaly detection applied to the MVTec AD dataset.
- [CVPR 2019: MVTec AD — A Comprehensive Real-World Dataset for Unsupervised Anomaly Detection] for the original paper of MVTec AD dataset.

In [None]:
import glob
import matplotlib.pyplot as plt
import random
from tqdm.auto import tqdm
import cv2
import numpy as np
import os

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Set the category to extract (e.g., bottle, cable, etc.)
category = "bottle"
image_path = f"/content/drive/MyDrive/Colab Notebooks/{category}"

# Define root paths
drive_root = "/content/drive/MyDrive/Colab Notebooks/bottle"
train_dir = os.path.join(drive_root, "train")
test_dir = os.path.join(drive_root, "test")

# Get train and test image paths
train_images = glob.glob(f"{train_dir}/good/*.png")
test_images = glob.glob(f"{test_dir}/**/*.png", recursive=True)

# Count classes (excluding 'good')
defect_classes = [d for d in os.listdir(test_dir) if d != 'good']
num_defect_classes = len(defect_classes)

# Example image to get dimensions
sample_img = cv2.imread(train_images[0])
height, width, channels = sample_img.shape

# Output
print("Dataset Summary for 'bottle':")
print(f"Number of defect classes: {num_defect_classes}")
print(f"Types of defect classes: {defect_classes}")
print(f"Total images used: {len(train_images) + len(test_images)}")
print(f"  - Training images (only 'good'): {len(train_images)}")
print(f"  - Test images (good + defective): {len(test_images)}")
print(f"Image dimensions: {width} x {height} x {channels}")

In [None]:
file_paths = glob.glob("/content/drive/MyDrive/Colab Notebooks/bottle/**/*/*.png", recursive=True)

In [None]:
all_data = []

for img in tqdm(file_paths):
    img = cv2.imread(img)
    img = img[..., ::-1]
    all_data.append(img)

all_data = np.stack(all_data)
print(all_data.shape)


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Define your test directory
test_dir = "/content/drive/MyDrive/Colab Notebooks/bottle/test"

# Get class names from the test folder (e.g., good, broken_large, etc.)
classes = sorted([d for d in os.listdir(test_dir) if os.path.isdir(os.path.join(test_dir, d))])
print(f'Classes: {classes}')

# Show 2 images from each class
fig, axs = plt.subplots(len(classes), 2, figsize=(6, 4 * len(classes)))

for i, class_name in enumerate(classes):
    class_folder = os.path.join(test_dir, class_name)
    images = sorted(glob.glob(f"{class_folder}/*.png"))[:2]  # get first 2 images

    for j, img_path in enumerate(images):
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        axs[i, j].imshow(img)
        axs[i, j].axis('off')
        axs[i, j].set_title(f'{class_name}')

plt.tight_layout()
plt.show()

## A. Data Loading and Preprocessing

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torch

# 假設 all_data 是 shape=(355, H, W, C)，前 60 張為 good
num_good = 60
num_defect = len(all_data) - num_good
all_labels = np.concatenate([np.zeros(num_good), np.ones(num_defect)]).astype(int)  # 0: good, 1: defect

# ✅ 分層切分資料
x_train, x_val, y_train, y_val = train_test_split(
    all_data, all_labels, test_size=0.2, stratify=all_labels, random_state=42
)

# ✅ NHWC ➜ NCHW
x_train = np.transpose(x_train, (0, 3, 1, 2))
x_val = np.transpose(x_val, (0, 3, 1, 2))

# ✅ 自訂 Dataset 類別
class MyDataset(Dataset):
    def __init__(self, x, y, transform=None):
        self.x = x
        self.y = torch.tensor(y, dtype=torch.long)
        self.transform = transform

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        image = Image.fromarray(np.transpose(self.x[idx], (1, 2, 0)).astype(np.uint8))
        if self.transform:
            image = self.transform(image)
        return image, self.y[idx]

# ✅ 印出分布確認
print(f"x_train: {x_train.shape}, y_train: {y_train.shape}")
print(f"x_val:   {x_val.shape}, y_val:   {y_val.shape}")
print("Train label dist:", np.bincount(y_train))
print("Val label dist:  ", np.bincount(y_val))


In [None]:
for experiment_id in range(1, 5):
    print(f"\n🔁 Running Experiment {experiment_id}...")

    # === 超參數設定 ===
    if experiment_id == 1:
        input_size = 128; batch_size = 32; lr = 1e-3; epochs = 30; model_type = "resnet18"
    elif experiment_id == 2:
        input_size = 128; batch_size = 32; lr = 1e-3; epochs = 30; model_type = "resnet50"
    elif experiment_id == 3:
        input_size = 128; batch_size = 16; lr = 1e-3; epochs = 50; model_type = "resnet18"
    elif experiment_id == 4:
        input_size = 128; batch_size = 16; lr = 1e-4; epochs = 50; model_type = "resnet18"

    # === transforms 每次重建（依據 input_size）===
    train_transforms = transforms.Compose([
        transforms.Resize((input_size, input_size)),
        transforms.AutoAugment(),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
    ])
    val_transforms = transforms.Compose([
        transforms.Resize((input_size, input_size)),
        transforms.ToTensor(),
    ])

    # === 每次都重建 Dataset & DataLoader ===
    train_dataset = MyDataset(x_train, y_train, transform=train_transforms)
    val_dataset = MyDataset(x_val, y_val, transform=val_transforms)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True,
                              num_workers=2, pin_memory=True, persistent_workers=True)
    val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False,
                              num_workers=1, pin_memory=True, persistent_workers=True)

    # ✅ 接下來繼續 model 初始化、訓練等流程...


## B. Defining Neural Networks

In [None]:
import torch.nn as nn
from torchvision import models

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
results = {}

# --- 跑四個實驗 ---
for experiment_id in range(1, 5):
    print(f"\n\n Running Experiment {experiment_id}...")

    # === 超參數設定 ===
    if experiment_id == 1:
        input_size = 128; batch_size = 32; lr = 1e-3; epochs = 30
    elif experiment_id == 2:
        input_size = 128; batch_size = 32; lr = 1e-3; epochs = 30
    elif experiment_id == 3:
        input_size = 128; batch_size = 16; lr = 1e-3; epochs = 50
    elif experiment_id == 4:
        input_size = 128; batch_size = 16; lr = 1e-4; epochs = 50

    train_transforms = transforms.Compose([
        transforms.Resize((input_size, input_size)),
        transforms.AutoAugment(),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
    ])
    val_transforms = transforms.Compose([
        transforms.Resize((input_size, input_size)),
        transforms.ToTensor(),
    ])

    train_dataset = MyDataset(x_train, y_train, transform=train_transforms)
    val_dataset = MyDataset(x_val, y_val, transform=val_transforms)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    # === 模型架構 ===
    if experiment_id == 2:
        model = models.resnet50(weights='IMAGENET1K_V1')
    else:
        model = models.resnet18(weights='IMAGENET1K_V1')

    for param in model.parameters():
        param.requires_grad = False
    for name, param in model.named_parameters():
        if "layer4" in name:
            param.requires_grad = True

    model.fc = nn.Linear(model.fc.in_features, 2)
    model = model.to(device)

## C. Training the Neural Network

In [None]:
import pickle
# ✅ 2. Dataset 定義（如果還沒定義）
class MyDataset(Dataset):
    def __init__(self, x, y, transform=None):
        self.x = x
        self.y = torch.from_numpy(y).long()
        self.transform = transform

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        new_x = np.transpose(self.x[idx], (1, 2, 0)).astype(np.uint8)
        img = Image.fromarray(new_x)
        if self.transform:
            img = self.transform(img)
        return img, self.y[idx]

# ✅ 3. 訓練與儲存結果（必要時重新執行）
results = {}
label_counter = Counter(y_train)
class_weights = [1.0 / label_counter[0], 1.0 / label_counter[1]]
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float).to(device)

for experiment_id in range(1, 5):
    print(f"\n🔁 Running Experiment {experiment_id}...")

    # === 超參數設定 ===
    if experiment_id == 1:
        input_size = 128; batch_size = 32; lr = 1e-3; epochs = 30
    elif experiment_id == 2:
        input_size = 128; batch_size = 32; lr = 1e-3; epochs = 30
    elif experiment_id == 3:
        input_size = 128; batch_size = 16; lr = 1e-3; epochs = 50
    elif experiment_id == 4:
        input_size = 128; batch_size = 16; lr = 1e-4; epochs = 50

    # === transforms ===
    train_transforms = transforms.Compose([
        transforms.Resize((input_size, input_size)),
        transforms.AutoAugment(),
        transforms.ToTensor(),
    ])
    val_transforms = transforms.Compose([
        transforms.Resize((input_size, input_size)),
        transforms.ToTensor(),
    ])

    # === Dataloader ===
    train_dataset = MyDataset(x_train, y_train, transform=train_transforms)
    val_dataset = MyDataset(x_val, y_val, transform=val_transforms)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    # === 模型定義 ===
    if experiment_id == 2:
        model = models.resnet50(weights='IMAGENET1K_V1')
    else:
        model = models.resnet18(weights='IMAGENET1K_V1')
    for param in model.parameters():
        param.requires_grad = False
    for name, param in model.named_parameters():
        if "layer4" in name:
            param.requires_grad = True
    model.fc = nn.Linear(model.fc.in_features, 2)
    model = model.to(device)

    # === Loss, Optimizer, Scheduler ===
    criterion = nn.CrossEntropyLoss(weight=class_weights_tensor)
    optimizer = Adam(model.parameters(), lr=lr)
    scheduler = CosineAnnealingLR(optimizer, T_max=len(train_loader)*epochs, eta_min=0)

    best_val_acc = -1
    train_losses, val_losses = [], []
    train_accuracies, val_accuracies = [], []

    for epoch in tqdm(range(epochs)):
        model.train()
        total_loss, correct, total = 0.0, 0, 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            preds = outputs.argmax(dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
        train_losses.append(total_loss / len(train_loader))
        train_accuracies.append(100. * correct / total)

        model.eval()
        val_loss, val_correct, val_total = 0.0, 0, 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                val_correct += (outputs.argmax(1) == labels).sum().item()
                val_total += labels.size(0)
        val_losses.append(val_loss / len(val_loader))
        val_acc = 100. * val_correct / val_total
        val_accuracies.append(val_acc)
        scheduler.step()

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), f"model_exp{experiment_id}.pth")

        print(f"Epoch {epoch+1}/{epochs} - Train Acc: {train_accuracies[-1]:.2f}% - Val Acc: {val_acc:.2f}%")

    results[experiment_id] = {
        "train_acc": train_accuracies,
        "val_acc": val_accuracies,
        "train_loss": train_losses,
        "val_loss": val_losses,
    }

# ✅ 儲存 results 結果
with open("results.pkl", "wb") as f:
    pickle.dump(results, f)

### Visualizing model performance

In [None]:
# ✅ 4. 載入結果 & 畫圖（可以單獨執行這段）
with open("results.pkl", "rb") as f:
    results = pickle.load(f)

for i in range(1, 5):
    if i not in results:
        print(f"❌ Experiment {i} 的結果不存在")
        continue
    train_acc = results[i]["train_acc"]
    val_acc = results[i]["val_acc"]
    train_loss = results[i]["train_loss"]
    val_loss = results[i]["val_loss"]

    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(train_acc, label="Train Accuracy")
    plt.plot(val_acc, label="Validation Accuracy")
    plt.title(f"Experiment {i} - Accuracy")
    plt.xlabel("Epoch"); plt.ylabel("Accuracy (%)")
    plt.grid(True); plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(train_loss, label="Train Loss")
    plt.plot(val_loss, label="Validation Loss")
    plt.title(f"Experiment {i} - Loss")
    plt.xlabel("Epoch"); plt.ylabel("Loss")
    plt.grid(True); plt.legend()
    plt.tight_layout()
    plt.show()


## D. Evaluating Your Trained Model

### Load Trained Model and Evaluate

In [None]:
import torch
import torch.nn as nn
from torchvision import models

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

for i in range(1, 5):
    print(f"\n🔍 Evaluating Experiment {i}...")

    # ✅ 正確選擇模型架構
    if i == 2:
        model = models.resnet50(weights='IMAGENET1K_V1')
        num_ftrs = model.fc.in_features  # ResNet50: 2048
    else:
        model = models.resnet18(weights='IMAGENET1K_V1')
        num_ftrs = model.fc.in_features  # ResNet18: 512

    model.fc = nn.Linear(num_ftrs, 2)
    model.load_state_dict(torch.load(f"model_exp{i}.pth", map_location=device))
    model = model.to(device)
    model.eval()

    # === 評估準確率 ===
    test_correct = 0
    test_total = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device).long()
            outputs = model(images)
            preds = outputs.argmax(-1)
            test_correct += (preds == labels).sum().item()
            test_total += labels.size(0)

    test_acc = 100. * test_correct / test_total
    print(f"✅ Final Test Accuracy for Experiment {i}: {test_acc:.2f}%")
