# Defect Classification (良品/不良品分類)

このノートブックは、良品と不良品を識別する2値分類モデル（Resnet50, EfficientedNet4D）を学習・評価・推論します。

## 0. 環境設定 (共通)

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
!pip install wandb

import os
import random
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns # 混同行列の描画用
from pathlib import Path

from PIL import Image
from sklearn.metrics import confusion_matrix, classification_report, roc_auc_score, f1_score
from tqdm.notebook import tqdm

import wandb

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms.v2 as transforms

In [None]:
def seed_everything(seed=42):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True

seed_everything(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

データの解凍

In [None]:
import zipfile

ZIP_FILE_PATH = Path("/content/phase1_dataset.zip") # Colabにアップロードしてください

# 解凍先ディレクトリ (Colabのローカル高速ストレージ)
EXTRACT_TO_DIR = Path("/content/temp_data")

# ------------------------------------

def extract_dataset(zip_path, extract_to):
    if not zip_path.exists():
        print(f"Error: ZIPファイルが見つかりません: {zip_path}")
        return

    print(f"Extracting {zip_path} to {extract_to} ...")
    extract_to.mkdir(parents=True, exist_ok=True)

    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

    print("Done!")

    # 解凍後の構造を確認
    print("\nDataset structure:")
    for root, dirs, files in os.walk(extract_to):
        level = root.replace(str(extract_to), '').count(os.sep)
        indent = ' ' * 4 * (level)
        print(f'{indent}{os.path.basename(root)}/')
        subindent = ' ' * 4 * (level + 1)
        # ファイル数は多すぎるので数だけ表示
        if files:
            print(f'{subindent}({len(files)} files)')

# 実行
extract_dataset(ZIP_FILE_PATH, EXTRACT_TO_DIR)

## 1. Config

In [None]:
class Config:
    # --- パス設定 ---
    DATA_DIR = Path("/content/temp_data/phase1_dataset")

    # 結果保存先は Google Drive のままにします
    BASE_DIR = Path("/content/2WINS_AD_Test_Results")
    OUTPUT_DIR = BASE_DIR / "results"
    OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

    IMG_SIZE = 512
    BATCH_SIZE = 8
    NUM_WORKERS = 2
    EPOCHS = 30
    LR = 1e-3
    WANDB_PROJECT = "2wins_defect_detection"
    WANDB_RUN_NAME = "EFFICIENTNETB4_IMG512_V1" # Updated run name

config = Config()

## 2. Dataset

In [None]:
# 1. train_transform (データ拡張あり)
train_transform = transforms.Compose([
    # 1. リサイズ/クロップ
    transforms.Resize((config.IMG_SIZE, config.IMG_SIZE)),
    transforms.RandomCrop(size=(config.IMG_SIZE, config.IMG_SIZE), padding=4),

    # 2. 幾何学的変換（位置ズレ・設置向き対策）
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    transforms.RandomRotation(degrees=10),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),

    # 3. 画質・照明変換（金属特有の反射対策）
    # RGB読み込みに合わせて saturation, hue も有効化
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),

    # 4. テクスチャ強調
    transforms.RandomAdjustSharpness(sharpness_factor=2, p=0.5),
    transforms.RandomAutocontrast(p=0.5),

    # 5. テンソル化と正規化
    transforms.ToImage(),
    transforms.ToDtype(torch.float32, scale=True),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),

    # 7. ロバスト性向上
    transforms.RandomErasing(p=0.2, scale=(0.02, 0.1), ratio=(0.3, 3.3)),
])

# 検証・テスト用：リサイズと正規化のみ（Augmentationなし）
val_transform = transforms.Compose([
    transforms.Resize((config.IMG_SIZE, config.IMG_SIZE)),
    transforms.ToImage(),
    transforms.ToDtype(torch.float32, scale=True),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

class ClassificationDataset(Dataset):
    def __init__(self, root_dir, split="train", transform=None):
        self.transform = transform
        self.image_paths = []
        self.labels = []

        # --- フォルダ名/分割名の揺らぎを吸収 ---
        # "valid" と指定されても、"val" フォルダを探すようにする
        if split == "valid":
            if not (root_dir / "valid").exists() and (root_dir / "val").exists():
                split = "val"
        # ------------------------------------

        target_dir = root_dir / split

        # フォルダが存在しない場合のチェック
        if not target_dir.exists():
            print(f"⚠️ Warning: Directory not found: {target_dir}")
            return

        label_map = [
            (["good", "Good"], 0), # 0: 良品
            (["bad",  "Bad"],  1)  # 1: 不良品
        ]

        for folder_names, label_idx in label_map:
            loaded_count = 0
            for folder_name in folder_names:
                class_dir = target_dir / folder_name
                if not class_dir.exists(): continue

                # 複数の拡張子に対応し、読み込む
                extensions = ["*.png"]
                for ext in extensions:
                    for img_path in class_dir.glob(ext):
                        self.image_paths.append(img_path)
                        self.labels.append(label_idx)
                        loaded_count += 1

            label_str = "bad (1)" if label_idx == 1 else "good (0)"
            print(f"[{split}] {label_str}: {loaded_count} images loaded.")

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]

        # RGBで読み込む
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)

        return image, label

In [None]:
train_ds = ClassificationDataset(config.DATA_DIR, split="train", transform=train_transform)
val_ds = ClassificationDataset(config.DATA_DIR, split="valid", transform=val_transform)
test_ds = ClassificationDataset(config.DATA_DIR, split="test", transform=val_transform)

train_loader = DataLoader(train_ds, batch_size=config.BATCH_SIZE, shuffle=True, num_workers=config.NUM_WORKERS)
val_loader = DataLoader(val_ds, batch_size=config.BATCH_SIZE, shuffle=False, num_workers=config.NUM_WORKERS)
test_loader = DataLoader(test_ds, batch_size=config.BATCH_SIZE, shuffle=False, num_workers=config.NUM_WORKERS)

## 3. Model Efficient Net B4
https://arxiv.org/pdf/1905.11946


In [None]:
import torchvision.models as models

# EfficientNetB4
class EfficientNetB4Classifier(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()

        # ImageNetで事前学習済みのEfficientNetB4をロード
        # weights='IMAGENET1K_V1' は学習済み重みをロードする指定
        self.backbone = models.efficientnet_b4(weights='IMAGENET1K_V1')

        # 最後の全結合層 (Classifier) を、タスクに合わせて2クラス分類用に置き換える
        # EfficientNetB4の最終層の入力特徴量は1792です
        num_ftrs = self.backbone.classifier[1].in_features

        self.backbone.classifier = nn.Sequential(
            nn.Dropout(p=0.4, inplace=True), # デフォルトのDropout率を使用
            nn.Linear(in_features=num_ftrs, out_features=num_classes),
        )

    def forward(self, x):
        return self.backbone(x)

# メイン処理でのモデル初期化を修正
model = EfficientNetB4Classifier(num_classes=2).to(device)

## Model RESNET50

In [None]:
import torchvision.models as models

#Model ResNet50
class ResNet50Classifier(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()

        # ImageNetで事前学習済みのResNet50をロード
        # weights='IMAGENET1K_V1' で学習済み重みをロード
        self.backbone = models.resnet50(weights='IMAGENET1K_V1')

        # 最後の全結合層 (fc) を、タスクに合わせて2クラス分類用に置き換える
        # ResNet50の最終層の入力特徴量は2048です
        num_ftrs = self.backbone.fc.in_features

        # 最終分類層を置き換え
        self.backbone.fc = nn.Sequential(
            nn.Dropout(p=0.2), # デフォルトでDropoutを追加
            nn.Linear(in_features=num_ftrs, out_features=num_classes)
        )


    # オプション: バックボーンの重みを固定する関数
    def _freeze_layers(self, model):
        for name, param in model.named_parameters():
            if 'fc' not in name: # 最終層(fc)以外を固定
                param.requires_grad = False

    def forward(self, x):
        return self.backbone(x)

## 4. 学習関数(WandB & F1閾値探索 実装)

In [None]:
def find_best_threshold(y_true, y_probs):
    """F1スコアが最大になる閾値を探す"""
    best_f1 = 0
    best_thresh = 0.01
    thresholds = np.arange(0.001, 1.00, 0.001)

    for t in thresholds:
        y_pred = (y_probs >= t).astype(int)
        score = f1_score(y_true, y_pred, zero_division=0)
        if score > best_f1:
            best_f1 = score
            best_thresh = t
    return best_thresh, best_f1

def train_phase1(model, train_loader, val_loader, num_epochs):
    # WandB Initialize
    wandb.init(project=config.WANDB_PROJECT, name=config.WANDB_RUN_NAME, config=vars(config))

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=config.LR)

    # 最高のF1スコアを基準にモデルを保存する
    best_val_f1 = 0.0
    best_threshold_global = 0.5 # 学習全体を通してのベストな閾値

    print(f"\n=== Starting Phase 1 Training ===")

    for epoch in range(num_epochs):
        # --- Training ---
        model.train()
        train_loss = 0.0

        for inputs, targets in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", leave=False):
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * inputs.size(0)

        train_loss /= len(train_loader.dataset)

        # --- Validation ---
        model.eval()
        val_targets = []
        val_probs = [] # 不良品(Class 1)である確率

        with torch.no_grad():
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = model(inputs)

                # Softmaxで確率化 -> Class 1 (Bad) の確率を取得
                probs = torch.softmax(outputs, dim=1)[:, 1]

                val_targets.extend(targets.cpu().numpy())
                val_probs.extend(probs.cpu().numpy())

        val_targets = np.array(val_targets)
        val_probs = np.array(val_probs)

        # ROC-AUC 計算
        try:
            val_roc_auc = roc_auc_score(val_targets, val_probs)
        except ValueError:
            val_roc_auc = 0.0 # クラスが片方しかない場合などのエラー回避

        # 最適な閾値の探索 (F1-Scoreベース)
        current_best_thresh, current_best_f1 = find_best_threshold(val_targets, val_probs)

        print(f"Epoch {epoch+1}: Train Loss={train_loss:.4f}, Val AUC={val_roc_auc:.4f}, Val Max F1={current_best_f1:.4f} (at {current_best_thresh:.2f})")

        # WandB Logging
        wandb.log({
            "epoch": epoch + 1,
            "train_loss": train_loss,
            "val_roc_auc": val_roc_auc,
            "val_f1_score": current_best_f1,
            "best_threshold": current_best_thresh
        })

        # Best Model Save (F1スコア基準)
        if current_best_f1 > best_val_f1:
            best_val_f1 = current_best_f1
            best_threshold_global = current_best_thresh
            torch.save({
                'model_state_dict': model.state_dict(),
                'best_threshold': best_threshold_global
            }, config.OUTPUT_DIR / "phase1_best_model.pth")
            print(f"Saved Best Model (F1: {best_val_f1:.4f})")

    wandb.finish()
    return best_threshold_global

## 5. メイン処理 (学習実行)

In [None]:
# Model Init
model = ResNet50Classifier().to(device)

# Training (戻り値として最適な閾値を受け取る)
best_threshold = train_phase1(model, train_loader, val_loader, config.EPOCHS)

## 6. 評価関数

In [None]:
def evaluate_phase1(model, test_loader, threshold=0.5):
    model.eval()
    y_true, y_probs = [], []

    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            probs = torch.softmax(outputs, dim=1)[:, 1]

            y_true.extend(targets.numpy())
            y_probs.extend(probs.cpu().numpy())

    y_true = np.array(y_true)
    y_probs = np.array(y_probs)

    # 指定された閾値で0/1判定
    y_pred = (y_probs >= threshold).astype(int)

    print(f"\n=== Test Evaluation (Threshold: {threshold:.2f}) ===")
    print(f"ROC-AUC: {roc_auc_score(y_true, y_probs):.4f}")
    print(f"F1-Score: {f1_score(y_true, y_pred):.4f}")
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=["Good", "Bad"]))
    print("Confusion Matrix:")
    print(confusion_matrix(y_true, y_pred))

## 7. 推論関数

In [None]:
import time
import cv2
import numpy as np
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt
from sklearn.metrics import roc_auc_score, f1_score, classification_report, confusion_matrix

# ==========================================
# 1. Grad-CAM クラスの実装
# ==========================================
class GradCAM:
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer
        self.gradients = None
        self.activations = None

        # フックの登録
        self.target_layer.register_forward_hook(self.save_activation)
        self.target_layer.register_full_backward_hook(self.save_gradient)

    def save_activation(self, module, input, output):
        self.activations = output

    def save_gradient(self, module, grad_input, grad_output):
        # grad_output[0] が勾配
        self.gradients = grad_output[0]

    def __call__(self, x):
        # 1. 推論 (Forward)
        output = self.model(x)

        # クラス1 (Bad) のスコアに対して勾配を計算
        # 2クラス分類なので、出力のインデックス1(Bad)をターゲットにする
        score = output[:, 1]

        # 2. 勾配計算 (Backward)
        self.model.zero_grad()
        score.backward(retain_graph=True)

        # 3. Grad-CAMの計算
        gradients = self.gradients.cpu().data.numpy()[0] # Batch size 1を想定
        activations = self.activations.cpu().data.numpy()[0]

        # Global Average Pooling (勾配の平均を重みとする)
        weights = np.mean(gradients, axis=(1, 2))

        # 重み付き和
        cam = np.zeros(activations.shape[1:], dtype=np.float32)
        for i, w in enumerate(weights):
            cam += w * activations[i]

        # ReLU & Normalize
        cam = np.maximum(cam, 0)
        cam = cv2.resize(cam, (x.shape[3], x.shape[2])) # 入力画像サイズにリサイズ
        cam = cam - np.min(cam)
        cam = cam / (np.max(cam) + 1e-7) # 0-1正規化

        return cam, output

In [None]:
# ==========================================
# 2. 推論・評価関数の定義
# ==========================================
def run_inference_analysis(model, test_loader, device, model_path):
    print(f"Loading model from {model_path} ...")

    # チェックポイントのロード
    checkpoint = torch.load(model_path, map_location=device, weights_only=False)
    model.load_state_dict(checkpoint['model_state_dict'])
    best_threshold = checkpoint.get('best_threshold', 0.5)

    model.eval()

    y_true = []
    y_probs = []
    latencies = []

    print(f"Running Inference (Threshold: {best_threshold:.4f})...")

    with torch.no_grad(): # 勾配計算なしで高速化 (Grad-CAM以外)
        for inputs, targets in test_loader:
            inputs = inputs.to(device)
            targets = targets.numpy()

            # --- Latency 計測開始 ---
            start_time = time.time()
            outputs = model(inputs)
            # CUDA同期（正確な時間を測るため）
            if device.type == 'cuda':
                torch.cuda.synchronize()
            end_time = time.time()
            # --- Latency 計測終了 ---

            # バッチサイズで割って1枚あたりの時間を計算 (ms)
            batch_latency = (end_time - start_time) * 1000
            avg_latency = batch_latency / inputs.size(0)
            latencies.append(avg_latency)

            # 確率計算
            probs = torch.softmax(outputs, dim=1)[:, 1].cpu().numpy()

            y_true.extend(targets)
            y_probs.extend(probs)

    y_true = np.array(y_true)
    y_probs = np.array(y_probs)
    y_pred = (y_probs >= best_threshold).astype(int)

    # --- Metrics 計算 ---
    auc = roc_auc_score(y_true, y_probs)
    f1 = f1_score(y_true, y_pred)
    mean_latency = np.mean(latencies)

    print("\n" + "="*30)
    print(f" Evaluation Results")
    print("="*30)
    print(f"ROC-AUC      : {auc:.4f}")
    print(f"F1-Score     : {f1:.4f} (Threshold: {best_threshold:.3f})")
    print(f"Mean Latency : {mean_latency:.2f} ms/image")
    print("-" * 30)
    print("\nConfusion Matrix:")
    print(confusion_matrix(y_true, y_pred))
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=["Good", "Bad"]))

    return y_true, y_probs, y_pred, best_threshold

In [None]:
# ==========================================
# 3. Grad-CAM 可視化実行関数
# ==========================================
def visualize_gradcam_samples(model, dataset, device, num_samples=5):
    print("\n" + "="*30)
    print(f" Grad-CAM Visualization (Top {num_samples} Bad samples)")
    print("="*30)

    # モデルのクラス名に基づいてターゲット層を自動選択
    model_name = model.__class__.__name__

    if "ResNet" in model_name:
        # ResNet50のターゲット層: layer4 (最後の畳み込みブロック) の最終レイヤー
        target_layer = model.backbone.layer4[-1]
        print(f"Target Layer: {model_name} (layer4[-1])")
    elif "EfficientNet" in model_name:
        # EfficientNetB4のターゲット層: featuresの最後の方
        target_layer = model.backbone.features[8]
        print(f"Target Layer: {model_name} (features[8])")
    else:
        # フォールバック (最後のレイヤーを試みる)
        print(f"Warning: Unknown model type {model_name}. Using last child of backbone.")
        try:
             target_layer = list(model.backbone.children())[-1]
        except:
             target_layer = list(model.children())[-1]

    grad_cam = GradCAM(model, target_layer)

    model.eval() # Evalモードだが、勾配計算はオンにする必要がある

    # データセットから "Bad (label=1)" の画像を抽出して表示
    indices = np.where(np.array(dataset.labels) == 1)[0] # 不良品のインデックスを取得

    if len(indices) == 0:
        print("No bad samples found in dataset.")
        return

    # ランダムにシャッフル
    np.random.shuffle(indices)

    # 表示するサンプル数を調整 (データ数を超えないように)
    n_display = min(num_samples, len(indices))

    cols = 3
    rows = (n_display + cols - 1) // cols
    plt.figure(figsize=(15, 5 * rows))

    for i, idx in enumerate(indices[:n_display]):
        img_tensor, label = dataset[idx]
        img_input = img_tensor.unsqueeze(0).to(device) # バッチ次元追加

        # Grad-CAM実行
        mask, output = grad_cam(img_input)

        # 予測確率
        prob = torch.softmax(output, dim=1)[0, 1].item()

        # 画像の逆正規化 (表示用)
        # Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) の逆
        inv_normalize = transforms.Normalize(
            mean=[-0.485/0.229, -0.456/0.224, -0.406/0.225],
            std=[1/0.229, 1/0.224, 1/0.225]
        )
        img_disp = inv_normalize(img_tensor).permute(1, 2, 0).numpy()
        img_disp = np.clip(img_disp, 0, 1)

        # ヒートマップの重ね合わせ
        heatmap = cv2.applyColorMap(np.uint8(255 * mask), cv2.COLORMAP_JET)
        heatmap = cv2.cvtColor(heatmap, cv2.COLOR_BGR2RGB)
        heatmap = np.float32(heatmap) / 255

        # 重ね合わせ (元画像 0.6 + ヒートマップ 0.4)
        cam_img = heatmap * 0.4 + img_disp * 0.6
        cam_img = np.clip(cam_img, 0, 1)

        # Plot
        ax = plt.subplot(rows, cols, i + 1)
        ax.imshow(cam_img)
        ax.set_title(f"Label: Bad(1)\nPred: {prob:.4f} (Bad)\nIdx: {idx}")
        ax.axis('off')

    plt.tight_layout()
    plt.show()

## 推論処理

In [None]:
# ==========================================
# 4. Grad-CAMによる可視化 (ResNet50)
# ==========================================
# 可視化用にモデルを再定義
model_viz = ResNet50Classifier(num_classes=2).to(device)

# 重みのロード
# weights_only=False は信頼できるソースのモデルでのみ使用してください
checkpoint = torch.load(MODEL_PATH, map_location=device, weights_only=False)
model_viz.load_state_dict(checkpoint['model_state_dict'])

# 可視化実行
visualize_gradcam_samples(model_viz, test_ds, device, num_samples=6)

In [None]:
# ==========================================
# 実行ブロック (推論のみ)
# ==========================================

# 1. モデルの再定義 (重みをロードするため)
# ResNet50を使用する設定に変更
model_inference = ResNet50Classifier(num_classes=2).to(device) #

# 2. 保存されたベストモデルのパス
# ResNet50用の重みファイルパスを指定
MODEL_PATH = config.BASE_DIR / "weights" / ""

# 3. 推論と定量評価の実行
# weights_only=False は信頼できるソースのモデルでのみ使用してください
y_true, y_probs, y_pred, threshold = run_inference_analysis(
    model_inference,
    test_loader,
    device,
    MODEL_PATH
)