<a href="https://colab.research.google.com/github/Abdulmathin-shaik/Kaggle/blob/main/Casting_Data_AD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.
import kagglehub
ravirajsinh45_real_life_industrial_dataset_of_casting_product_path = kagglehub.dataset_download('ravirajsinh45/real-life-industrial-dataset-of-casting-product')

print('Data source import complete.')


In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All"
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
pip install faiss-gpu-cu12

In [None]:

import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import numpy as np
import os
from PIL import Image
import faiss  # For efficient nearest-neighbor search
import matplotlib.pyplot as plt

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
import os
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

class IndustrialCastingDataset(Dataset):
    def __init__(self, category="casting", phase="train", img_size=(224, 224)):
        self.phase = phase
        self.transform = transforms.Compose([
            transforms.Resize(img_size),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # ImageNet stats
        ])
        base_path = r"/kaggle/input/real-life-industrial-dataset-of-casting-product/casting_data/casting_data"  # Replace with your dataset path
        data_path = os.path.join(base_path, phase)
        self.images = []
        self.labels = []
        for label in ["ok_front", "def_front"]:
            label_path = os.path.join(data_path, label)
            for img_name in os.listdir(label_path):
                self.images.append(os.path.join(label_path, img_name))
                self.labels.append(0 if label == "ok_front" else 1)  # 0: normal, 1: anomaly

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = self.images[idx]
        label = self.labels[idx]
        img = Image.open(img_path).convert("RGB")
        img = self.transform(img)
        return img, label

# Load datasets
train_dataset = IndustrialCastingDataset(category="casting", phase="train")
test_dataset = IndustrialCastingDataset(category="casting", phase="test")
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

In [None]:
class PatchCore:
    def __init__(self, backbone="wide_resnet50_2", patch_size=3, sample_ratio=0.1):
        # Load pre-trained backbone
        self.model = models.__getattribute__(backbone)(pretrained=True).to(device)
        self.model.eval()
        self.patch_size = patch_size
        self.sample_ratio = sample_ratio
        self.memory_bank = None

        # Hook to extract intermediate features (e.g., layer2 of WideResNet)
        self.features = []
        def hook(module, input, output):
            self.features.append(output)
        self.model.layer2.register_forward_hook(hook)

    def extract_patches(self, x):
        with torch.no_grad():
            self.features = []
            _ = self.model(x)
            features = self.features[0]  # Shape: (B, C, H, W)
            B, C, H, W = features.shape
            unfold = nn.Unfold(kernel_size=self.patch_size, stride=self.patch_size)
            patches = unfold(features).transpose(1, 2)  # Shape: (B, N_patches, C*patch_size^2)
            return patches.reshape(B, -1, C, self.patch_size, self.patch_size)

    def build_memory_bank(self, loader):
        all_patches = []
        for images, _ in loader:
            images = images.to(device)
            patches = self.extract_patches(images)  # (B, N_patches, C, P, P)
            patches = patches.mean(dim=(3, 4))  # Average over spatial dimensions
            all_patches.append(patches.cpu().numpy())

        all_patches = np.concatenate(all_patches, axis=0)  # (N_total, N_patches, C)
        all_patches = all_patches.reshape(-1, all_patches.shape[-1])  # (N_total * N_patches, C)

        # Subsample patches for efficiency
        n_samples = int(self.sample_ratio * len(all_patches))
        indices = np.random.choice(len(all_patches), n_samples, replace=False)
        self.memory_bank = all_patches[indices]

        # Build FAISS index for fast nearest-neighbor search
        self.index = faiss.IndexFlatL2(self.memory_bank.shape[1])
        self.index.add(self.memory_bank)

    def detect_anomaly(self, image):
        image = image.to(device)
        patches = self.extract_patches(image)  # (1, N_patches, C, P, P)
        patches = patches.mean(dim=(3, 4)).cpu().numpy()  # (1, N_patches, C)
        patches = patches.reshape(-1, patches.shape[-1])  # (N_patches, C)

        # Compute anomaly score (max distance to nearest neighbor in memory bank)
        distances, _ = self.index.search(patches, 1)
        anomaly_score = distances.max()
        return anomaly_score

# Initialize PatchCore
patchcore = PatchCore()

In [None]:
# Build memory bank from normal images
print("Building memory bank...")
patchcore.build_memory_bank(train_loader)

In [None]:
# Evaluate on test set
threshold = None  # Set dynamically or via validation
scores = []
labels = []
for image, label in test_loader:
    score = patchcore.detect_anomaly(image)
    scores.append(score)
    labels.append(label.item())

# Simple threshold-based evaluation (tune threshold for best AUC)
scores = np.array(scores)
labels = np.array(labels)
if threshold is None:  # Example: set threshold as 95th percentile of training scores
    train_scores = [patchcore.detect_anomaly(image) for image, _ in train_loader]
    threshold = np.percentile(train_scores, 95)

predictions = (scores > threshold).astype(int)
accuracy = np.mean(predictions == labels)
print(f"Accuracy: {accuracy:.4f}")

# Optional: Compute AUC for better evaluation
from sklearn.metrics import roc_auc_score
auc = roc_auc_score(labels, scores)
print(f"AUC: {auc:.4f}")

In [None]:
import matplotlib.pyplot as plt
import random
import torch
import numpy as np

# Define a threshold for anomaly detection
threshold = 0.5  # Adjust this threshold based on your specific use case and dataset

# Visualize random images from the test set with original and prediction labels
num_images_to_show = 5  # Adjust as needed
random_indices = random.sample(range(len(test_dataset)), num_images_to_show)

fig, axes = plt.subplots(1, num_images_to_show, figsize=(15, 5))

for i, idx in enumerate(random_indices):
    image, original_label = test_dataset[idx]
    image = image.unsqueeze(0)  # Add batch dimension
    prediction_score = patchcore.detect_anomaly(image)
    prediction_label = 1 if prediction_score > threshold else 0  # Define a threshold for anomaly detection

    image = image.squeeze(0).permute(1, 2, 0)  # Reshape for display
    image = image * torch.tensor([0.229, 0.224, 0.225]) + torch.tensor([0.485, 0.456, 0.406])  # Denormalize
    image = np.clip(image.numpy(), 0, 1)  # Clip values to be within the valid range

    axes[i].imshow(image)
    axes[i].set_title(f"Original: {'Good' if original_label == 0 else 'Anomalous'}\nPrediction: {'Good' if prediction_label == 0 else 'Anomalous'}")
    axes[i].axis('off')

plt.show()

Other back bones

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
import numpy as np
import faiss

class PatchCore:
    def __init__(self, backbone="resnet101", patch_size=3, sample_ratio=0.1):
        # Load pre-trained backbone
        self.model = models.__getattribute__(backbone)(pretrained=True).to(device)
        self.model.eval()
        self.patch_size = patch_size
        self.sample_ratio = sample_ratio
        self.memory_bank = None

        # Hook to extract intermediate features
        self.features = []
        def hook(module, input, output):
            self.features.append(output)

        # Register hook based on the backbone
        if backbone.startswith("resnet"):
            self.model.layer2.register_forward_hook(hook)
        elif backbone.startswith("efficientnet"):
            self.model.features[6].register_forward_hook(hook)  # Adjust layer index as needed

    def extract_patches(self, x):
        with torch.no_grad():
            self.features = []
            _ = self.model(x)
            features = self.features[0]  # Shape: (B, C, H, W)
            B, C, H, W = features.shape
            unfold = nn.Unfold(kernel_size=self.patch_size, stride=self.patch_size)
            patches = unfold(features).transpose(1, 2)  # Shape: (B, N_patches, C*patch_size^2)
            return patches.reshape(B, -1, C, self.patch_size, self.patch_size)

    def build_memory_bank(self, loader):
        all_patches = []
        for images, _ in loader:
            images = images.to(device)
            patches = self.extract_patches(images)  # (B, N_patches, C, P, P)
            patches = patches.mean(dim=(3, 4))  # Average over spatial dimensions
            all_patches.append(patches.cpu().numpy())

        all_patches = np.concatenate(all_patches, axis=0)  # (N_total, N_patches, C)
        all_patches = all_patches.reshape(-1, all_patches.shape[-1])  # (N_total * N_patches, C)

        # Subsample patches for efficiency
        n_samples = int(self.sample_ratio * len(all_patches))
        indices = np.random.choice(len(all_patches), n_samples, replace=False)
        self.memory_bank = all_patches[indices]

        # Build FAISS index for fast nearest-neighbor search
        self.index = faiss.IndexFlatL2(self.memory_bank.shape[1])
        self.index.add(self.memory_bank)

    def detect_anomaly(self, image):
        image = image.to(device)
        patches = self.extract_patches(image)  # (1, N_patches, C, P, P)
        patches = patches.mean(dim=(3, 4)).cpu().numpy()  # (1, N_patches, C)
        patches = patches.reshape(-1, patches.shape[-1])  # (N_patches, C)

        # Compute anomaly score (max distance to nearest neighbor in memory bank)
        distances, _ = self.index.search(patches, 1)
        anomaly_score = distances.max()
        return anomaly_score

# Initialize PatchCore with resnet101
patchcore_resnet101 = PatchCore(backbone="resnet101")

# Initialize PatchCore with efficientnet_b4
patchcore_efficientnet_b4 = PatchCore(backbone="efficientnet_b4")

In [None]:
# Build memory bank from normal images
print("Building memory bank...")
patchcore_resnet101.build_memory_bank(train_loader)

In [None]:
# Build memory bank from normal images
print("Building memory bank...")
patchcore_efficientnet_b4.build_memory_bank(train_loader)

In [None]:
# Evaluate on test set
threshold = None  # Set dynamically or via validation
scores = []
labels = []
for image, label in test_loader:
    score = patchcore_resnet101.detect_anomaly(image)
    scores.append(score)
    labels.append(label.item())

# Simple threshold-based evaluation (tune threshold for best AUC)
scores = np.array(scores)
labels = np.array(labels)
if threshold is None:  # Example: set threshold as 95th percentile of training scores
    train_scores = [patchcore_resnet101.detect_anomaly(image) for image, _ in train_loader]
    threshold = np.percentile(train_scores, 95)

predictions = (scores > threshold).astype(int)
accuracy = np.mean(predictions == labels)
print(f"Accuracy: {accuracy:.4f}")

# Optional: Compute AUC for better evaluation
from sklearn.metrics import roc_auc_score
auc = roc_auc_score(labels, scores)
print(f"AUC: {auc:.4f}")

In [None]:
# Evaluate on test set
threshold = None  # Set dynamically or via validation
scores = []
labels = []
for image, label in test_loader:
    score = patchcore_efficientnet_b4.detect_anomaly(image)
    scores.append(score)
    labels.append(label.item())

# Simple threshold-based evaluation (tune threshold for best AUC)
scores = np.array(scores)
labels = np.array(labels)
if threshold is None:  # Example: set threshold as 95th percentile of training scores
    train_scores = [patchcore_efficientnet_b4.detect_anomaly(image) for image, _ in train_loader]
    threshold = np.percentile(train_scores, 95)

predictions = (scores > threshold).astype(int)
accuracy = np.mean(predictions == labels)
print(f"Accuracy: {accuracy:.4f}")

# Optional: Compute AUC for better evaluation
from sklearn.metrics import roc_auc_score
auc = roc_auc_score(labels, scores)
print(f"AUC: {auc:.4f}")