In [None]:
pip install torchsummary

In [None]:
from pathlib import Path
import numpy as np
import os, shutil
import matplotlib.pyplot as plt 

from PIL import Image


from tqdm.auto import tqdm

import torch
import torchvision
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from torchvision.transforms import transforms
from torchsummary import summary
from torch.utils.data.dataset import Subset
from torch import nn
import torch.optim as optim
from torchvision.models import resnet50, ResNet50_Weights
from sklearn.metrics import accuracy_score, roc_auc_score, roc_curve, confusion_matrix, ConfusionMatrixDisplay, f1_score
import seaborn as sns


# Visualize some samples

In [None]:
image_path = '/kaggle/input/goodsad/cigarette_box/cigarette_box/test/opened/000_004.jpg'
image = Image.open(image_path)
print(image.size)

In [None]:
# Define the transformation pipeline using torchvision.transforms.Compose
transform = transforms.Compose([
    transforms.Resize((224,224)),  # Resize the image to 224x224 pixels
    transforms.ToTensor()          # Convert the image to a PyTorch tensor and divide by 255.0
])

# Assuming 'image' is a PIL image object
# Apply the defined transformation pipeline to the image
image = transform(image)

# Print the shape of the transformed image tensor
print(image.shape)

# Plot the transformed image
# Permute the dimensions to (height, width, channels) as matplotlib expects
plt.imshow(image.permute(1, 2, 0))
plt.show()

## Memory consumed by the tensor

In [None]:
# Calculate the memory usage
memory_usage = image.numel() * image.element_size()

# Print the memory usage
print(f"Memory usage of the tensor: {memory_usage * 279//1024} KB") # 279 is the number of samples we have in the dataset

In [None]:

# Define the path to the directory containing the train images
train_image_path = '/kaggle/input/goodsad/cigarette_box/cigarette_box/train'

# Load the train dataset using the ImageFolder dataset class
# ImageFolder is a PyTorch dataset class for loading images from a directory
# It automatically loads images from subdirectories and applies transformations to them
# In this case, 'transform' is a transformation applied to each image in the dataset
# It preprocesses the images before they are used for training
good_dataset = ImageFolder(root=train_image_path, transform=transform)

# Access a sample from the dataset
# In this case, we're accessing the first sample
# x contains the preprocessed image data
# y contains the corresponding label (class index)
x, y = good_dataset[0]

# Print the shape of the preprocessed image data (x) and its corresponding label (y)
print("Image Shape:", x.shape)
print("Label:", y)

# AUTOENCODER

## Train test split for the autoencoder

In [None]:

# Split the dataset into training and testing subsets
# The `torch.utils.data.random_split` function randomly splits a dataset into non-overlapping subsets
# The first argument `good_dataset` is the dataset to be split
# The second argument `[0.8, 0.2]` specifies the sizes of the subsets. Here, 80% for training and 20% for testing.
train_dataset, test_dataset = torch.utils.data.random_split(good_dataset, [0.8, 0.2])

# Print the lengths of the original dataset, training subset, and testing subset
print("Total number of samples in the original dataset:", len(good_dataset))
print("Number of samples in the training subset:", len(train_dataset))
print("Number of samples in the testing subset:", len(test_dataset))

## Using dataloader for efficient data loading during training

In [None]:

# Set the batch size
BS = 16

# Create data loaders for training and testing datasets
train_loader = DataLoader(train_dataset, batch_size=BS, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BS, shuffle=True)

# Get a batch of images and labels from the training loader
image_batch, label_batch = next(iter(train_loader))

# Print the shape of the input images and labels
print(f'Shape of input images: {image_batch.shape}')
print(f'Shape of labels: {label_batch.shape}')
# Calculate the memory usage
memory_usage = image_batch.numel() * image_batch.element_size()

# Print the memory usage
print(f"Memory usage of the tensor: {memory_usage//1024} KB")

In [None]:
# Set the figure size
plt.figure(figsize=(12*4, 48*4))

# Create a grid of images from the image batch and visualize it
grid = torchvision.utils.make_grid(image_batch[0:4], padding=5, nrow=4)
plt.imshow(grid.permute(1, 2, 0))  # Permute dimensions to (height, width, channels) for visualization
plt.title('Good Samples')  # Set the title of the plot
plt.show()  # Show the plot

## Train autoencoder model

In [None]:
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 128, kernel_size=4),
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=2, stride=2),
            nn.Conv2d(128, 256, kernel_size=4),
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=2, stride=2),
            nn.Conv2d(256, 256, kernel_size=3),
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=2, stride=2),
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 256, kernel_size=4, stride=2, output_padding=1 ),
            nn.ReLU(),
            nn.ConvTranspose2d(256, 128, kernel_size=5, stride=2, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 3, kernel_size=5, stride=2, output_padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# Test the autoencoder architecture
model = Autoencoder()  
input_image = torch.randn(1, 3, 224, 224)  # Sample input image
output_image = model(input_image)
print(output_image.shape)  # Print the shape of the output image

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
ckpoints = torch.load('/kaggle/working/AE_GoodsAD_cigarette_box.pth')
model.load_state_dict(ckpoints)
model.to(device)

In [None]:
# model.to(device)# Move the model to the GPU
# criterion = torch.nn.MSELoss()
# optimizer = torch.optim.Adam(model.parameters(), lr= 0.001)

## Training loop

In [None]:
# # Define a list to store training loss and validation loss
# Loss = []
# Validation_Loss = []


# num_epochs = 100
# for epoch in tqdm(range(num_epochs)):
#     model.train()  # Set model to training mode
#     for img, _ in train_loader:
#         img = img.to(device)
        
#         output = model(img)
#         loss = criterion(output, img)

#         optimizer.zero_grad() #clears the gradients of all optimized tensors.  This step is necessary because gradients are accumulated by default in PyTorch, and we want to compute fresh gradients for the current batch of data.
#         loss.backward() # This line computes the gradients of the loss function with respect to the model parameters. These gradients are used to update the model parameters during optimization.
#         optimizer.step() # This line updates the model parameters using the computed gradients. 
#     Loss.append(loss.item())
       

#     # Calculate validation loss
#     model.eval()  # Set model to evaluation mode
#     with torch.no_grad():
#         val_loss_sum = 0.0
#         num_batches = 0
#         for img, _ in test_loader:
#             img = img.to(device)
#             output = model(img)
#             val_loss = criterion(output, img)
#             val_loss_sum += val_loss.item()
#             num_batches += 1
#         val_loss_avg = val_loss_sum / num_batches
#         Validation_Loss.append(val_loss_avg)
    
#     if epoch % 5 == 0:
#         print('Epoch [{}/{}], Loss: {:.4f}, Validation Loss: {:.4f}'.format(epoch + 1, num_epochs, loss.item(), val_loss_avg))

# plt.plot(Loss, label='Training Loss')
# plt.plot(Validation_Loss, label='Validation Loss')
# plt.xlabel('Epoch')
# plt.ylabel('Loss')
# plt.legend()
# # plt.savefig('AE_food_package.png')
# plt.show()
        

In [None]:
# # Save the model
# torch.save(model.state_dict(), 'test.pth')
# model.eval()

# ckpoints = torch.load('test.pth')
# model.load_state_dict(ckpoints)

## Reconstruction of good images

In [None]:
with torch.no_grad():
    for data, _ in train_loader:
        data = data.cuda()
        recon = model(data)
        break

recon_error =  ((data-recon)**2).mean(axis=1)
print(recon_error.shape)

plt.figure(dpi=250)
fig, ax = plt.subplots(3, 3, figsize=(5*4, 4*4))
for i in range(3):
    ax[0, i].imshow(data[i].cpu().numpy().transpose((1, 2, 0)))
    ax[1, i].imshow(recon[i].cpu().numpy().transpose((1, 2, 0)))
    ax[2, i].imshow(recon_error[i][0:-10,0:-10].cpu().numpy(), cmap='jet',vmax= torch.max(recon_error[i])) #[0:-10,0:-10]
    ax[0, i].axis('OFF')
    ax[1, i].axis('OFF')
    ax[2, i].axis('OFF')
plt.show()

## Reconstruction of bad images

## Obtain the fault detection HEATMAP using AE

In [None]:
# test_image_1 = transform(Image.open(r'/kaggle/input/goodsad/cigarette_box/cigarette_box/test/opened/000_005.jpg'))
# test_image_2 = transform(Image.open(r'/kaggle/input/goodsad/cigarette_box/cigarette_box/test/opened/000_004.jpg'))
# test_image_3 = transform(Image.open(r'/kaggle/input/goodsad/cigarette_box/cigarette_box/test/opened/000_007.jpg'))

# data = torch.stack([test_image_1,test_image_2, test_image_3])

# with torch.no_grad():
#     data = data.cuda()
#     recon = model(data)
    
# recon_error =  ((data-recon)**2).mean(axis=1)
    
# plt.figure(dpi=250)
# fig, ax = plt.subplots(3, 3, figsize=(5*4, 4*4))
# for i in range(3):
#     ax[0, i].imshow(data[i].cpu().numpy().transpose((1, 2, 0)))
#     ax[1, i].imshow(recon[i].cpu().numpy().transpose((1, 2, 0)))
#     ax[2, i].imshow(recon_error[i][0:-10,0:-10].cpu().numpy(), cmap='jet',vmax= torch.max(recon_error[i]))
#     ax[0, i].axis('OFF')
#     ax[1, i].axis('OFF')
#     ax[2, i].axis('OFF')
# plt.show()

## Finding threshold

In [None]:
# RECON_ERROR=[]
# with torch.no_grad():
#     for data, _ in train_loader:
#         data = data.cuda()
#         recon = model(data)
#         data_recon_squared_mean =  ((data-recon)**2).mean(axis=(1))[:,0:-10,0:-10].mean(axis=(1,2))
        
#         RECON_ERROR.append(data_recon_squared_mean)
        
# RECON_ERROR = torch.cat(RECON_ERROR).cpu().numpy()

In [None]:
# best_threshold = np.mean(RECON_ERROR) + 3 * np.std(RECON_ERROR)

# plt.hist(RECON_ERROR,bins=50)
# plt.vlines(x=best_threshold,ymin=0,ymax=30,color='r') 
# plt.show()

In [None]:
# class_labels = []
# y_true = []
# y_pred = []
# y_score = []


# test_path = Path(r'/kaggle/input/goodsad/cigarette_box/cigarette_box/test')
# class_dirs = [d.name for d in test_path.iterdir() if d.is_dir()]

# model.eval()

# for class_name in class_dirs:
#     folder_path = test_path / class_name

#     for pth in tqdm(folder_path.iterdir(),leave=False):

#         class_label = pth.parts[-2]
#         with torch.no_grad():
#             test_image = transform(Image.open(pth)).cuda().unsqueeze(0)
#             # test_image = test_image.repeat(1, 3, 1, 1)
            
#             recon_image = model(test_image)
    
            
#             # y_score_image = 
#             y_score_image =  ((test_image - recon_image)**2).mean(axis=(1))[:,0:-10,0:-10].mean()
            
#             y_score.append(y_score_image.cpu())
#             y_true.append(0 if class_label == 'good' else 1)
            

#         class_labels.append(class_label)
        

In [None]:
# plt.hist(y_score,bins=50)
# plt.vlines(x=best_threshold,ymin=0,ymax=30,color='r')
# plt.show()

In [None]:
# # Calculate AUC-ROC score
# auc_roc_score = roc_auc_score(y_true, y_score)
# print("AUC-ROC Score:", auc_roc_score)

# # Plot ROC curve
# fpr, tpr, thresholds = roc_curve(y_true, y_score)
# plt.figure()
# plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % auc_roc_score)
# plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
# plt.xlabel('False Positive Rate')
# plt.ylabel('True Positive Rate')
# plt.title('Receiver Operating Characteristic (ROC) Curve')
# plt.legend(loc="lower right")
# plt.savefig('Result_AE_food_package.png')
# plt.show()


f1_scores = [f1_score(y_true, y_score >= threshold) for threshold in thresholds]
# Select the best threshold based on F1 score
best_threshold = thresholds[np.argmax(f1_scores)]

print(f'best_threshold = {best_threshold}')

accuracy = accuracy_score(y_true, (y_score >= best_threshold).astype(int))
print("Accuracy:", accuracy)

f1 = f1_score(y_true, (y_score >= best_threshold).astype(int))
print("F1 Score:", f1)

# # Generate confusion matrix
# cm = confusion_matrix(y_true, y_pred)
# disp = ConfusionMatrixDisplay(confusion_matrix=cm,display_labels=['OK','NOK'])
# disp.plot()
# plt.show()

In [None]:
# import matplotlib.pyplot as plt

# import numpy as np


# y_predict = (y_score >= best_threshold).astype(int)
# # Convert list_1 to binary labels
# binary_labels = [0 if label == 'good' else 1 for label in class_labels]

# # Calculate accuracy for each label
# unique_labels = np.unique(class_labels)
# accuracies = []
# for label in unique_labels:
#     label_mask = np.array(class_labels) == label
#     label_accuracy = accuracy_score(np.array(binary_labels)[label_mask], np.array(y_predict)[label_mask])
#     accuracies.append(label_accuracy)

# # Plot accuracy for each label
# plt.figure()
# plt.bar(unique_labels, accuracies, color='skyblue')
# plt.xlabel('Labels')
# plt.ylabel('Accuracy')
# plt.title('Accuracy for Each Label')
# plt.ylim(0, 1)
# plt.show()

# KNN+RESNET

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
resnet_model = resnet50(weights=ResNet50_Weights.DEFAULT)

model = torch.nn.Sequential(*list(resnet_model.children())[:-1]).to(device)
model.eval()

for param in model.parameters():
    param.requires_grad = False

del resnet_model

In [None]:
transform = ResNet50_Weights.DEFAULT.transforms()

## Create Memory Bank 

In [None]:
memory_bank =[]

folder_path = Path(r'/kaggle/input/goodsad/drink_bottle/drink_bottle/train/good')

for pth in tqdm(folder_path.iterdir(),leave=False):

    with torch.no_grad():
        data = transform(Image.open(pth)).to(device).unsqueeze(0)
        features = model(data)
        memory_bank.append(features.squeeze().cpu().detach())

memory_bank = torch.stack(memory_bank).cuda()


In [None]:
values,indices = torch.sort(memory_bank.std(dim=0))

plt.plot(values.cpu().numpy()[::-1])
plt.vlines(x=500,ymin=0,ymax=0.5,colors='red')
plt.ylim([0,0.5])
plt.ylabel("std of the features")
plt.xlabel("Number of features")
plt.show()

In [None]:
values,indices = torch.sort(memory_bank.std(dim=0))
selected_indices = indices[-500:]
memory_bank = memory_bank[:,selected_indices]

In [None]:
memory_bank.shape

## Finding Threshold to classify

In [None]:
dist_error=[]

k=50

folder_path = Path(r'/kaggle/input/goodsad/drink_bottle/drink_bottle/train/good')

for pth in tqdm(folder_path.iterdir(),leave=False):
    data = transform(Image.open(pth)).cuda().unsqueeze(0)
    with torch.no_grad():
        features = model(data).squeeze()
    dist,_=torch.sort(torch.norm(memory_bank - features[selected_indices], dim=1))# Calculating the pair-wise distance between the sample and memory bank
    dist = dist[:k].mean()# K nearsest neighbours
    dist_error.append(dist.cpu().numpy())

In [None]:
best_threshold = np.mean(dist_error) + 1.5 * np.std(dist_error)

plt.hist(y_score,bins=50)
plt.vlines(x=best_threshold,ymin=0,ymax=30,color='r')
plt.show()

## Classify the test dataset

In [None]:
class_labels = []
y_true = []
resnet_features = []
y_score = []

parent_dir = Path(r'/kaggle/input/goodsad/drink_bottle/drink_bottle/test')
class_dirs = [d.name for d in parent_dir.iterdir() if d.is_dir()]

for class_name in class_dirs:
    folder_path = parent_dir / class_name

    for pth in tqdm(folder_path.iterdir(),leave=False):

        class_label = pth.parts[-2]
        with torch.no_grad():
            test_image = transform(Image.open(pth)).cuda().unsqueeze(0)
            features = model(test_image).squeeze()    
            dist,_=torch.sort(torch.norm(memory_bank - features[selected_indices], dim=1))#[-10:].mean()
            dist = dist[:k].mean()
            y_score.append(dist.cpu().numpy())

        class_labels.append(class_label)
        y_true.append(0 if class_label == 'good' else 1)

In [None]:
y_score_nok = [score  for score,true in zip(y_score,y_true) if true==1]
plt.hist(y_score_nok,bins=50)
plt.vlines(x=best_threshold,ymin=0,ymax=30,color='r')
plt.show()

## Evaluation

In [None]:
# Calculate AUC-ROC score
auc_roc_score = roc_auc_score(y_true, y_score)
print("AUC-ROC Score:", auc_roc_score)

# Plot ROC curve
fpr, tpr, thresholds = roc_curve(y_true, y_score)
plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % auc_roc_score)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.savefig('Result_KNN-ResNet_drink_bottle.png')
plt.show()

f1_scores = [f1_score(y_true, y_score >= threshold) for threshold in thresholds]
# Select the best threshold based on F1 score
best_threshold = thresholds[np.argmax(f1_scores)]

print(f'best_threshold = {best_threshold}')

accuracy = accuracy_score(y_true, (y_score >= best_threshold).astype(int))
print("Accuracy:", accuracy)

f1 = f1_score(y_true, (y_score >= best_threshold).astype(int))
print("F1 Score:", f1)

# Generate confusion matrix
cm = confusion_matrix(y_true, (y_score >= best_threshold).astype(int))
disp = ConfusionMatrixDisplay(confusion_matrix=cm,display_labels=['OK','NOK'])
disp.plot()
plt.show()

In [None]:
import matplotlib.pyplot as plt

import numpy as np


y_predict = (y_score >= best_threshold).astype(int)
# Convert list_1 to binary labels
binary_labels = [0 if label == 'good' else 1 for label in class_labels]

# Calculate accuracy for each label
unique_labels = np.unique(class_labels)
accuracies = []
for label in unique_labels:
    label_mask = np.array(class_labels) == label
    label_accuracy = accuracy_score(np.array(binary_labels)[label_mask], np.array(y_predict)[label_mask])
    accuracies.append(label_accuracy)

# Plot accuracy for each label
plt.figure()
plt.bar(unique_labels, accuracies, color='skyblue')
plt.xlabel('Labels')
plt.ylabel('Accuracy')
plt.title('Accuracy for Each Label')
plt.ylim(0, 1)
plt.show()

# Resnet + AutoEncoder

### Setup + Visualize

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
class FeatCAE(nn.Module):
    """Autoencoder."""

    def __init__(self, in_channels=1000, latent_dim=50, is_bn=True):
        super(FeatCAE, self).__init__()

        layers = []
        layers += [nn.Conv2d(in_channels, (in_channels + 2 * latent_dim) // 2, kernel_size=1, stride=1, padding=0)]
        if is_bn:
            layers += [nn.BatchNorm2d(num_features=(in_channels + 2 * latent_dim) // 2)]
        layers += [nn.ReLU()]
        layers += [nn.Conv2d((in_channels + 2 * latent_dim) // 2, 2 * latent_dim, kernel_size=1, stride=1, padding=0)]
        if is_bn:
            layers += [nn.BatchNorm2d(num_features=2 * latent_dim)]
        layers += [nn.ReLU()]
        layers += [nn.Conv2d(2 * latent_dim, latent_dim, kernel_size=1, stride=1, padding=0)]

        self.encoder = nn.Sequential(*layers)

        # if 1x1 conv to reconstruct the rgb values, we try to learn a linear combination
        # of the features for rgb
        layers = []
        layers += [nn.Conv2d(latent_dim, 2 * latent_dim, kernel_size=1, stride=1, padding=0)]
        if is_bn:
            layers += [nn.BatchNorm2d(num_features=2 * latent_dim)]
        layers += [nn.ReLU()]
        layers += [nn.Conv2d(2 * latent_dim, (in_channels + 2 * latent_dim) // 2, kernel_size=1, stride=1, padding=0)]
        if is_bn:
            layers += [nn.BatchNorm2d(num_features=(in_channels + 2 * latent_dim) // 2)]
        layers += [nn.ReLU()]
        layers += [nn.Conv2d((in_channels + 2 * latent_dim) // 2, in_channels, kernel_size=1, stride=1, padding=0)]
        # layers += [nn.ReLU()]

        self.decoder = nn.Sequential(*layers)

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [None]:
class resnet_feature_extractor(torch.nn.Module):
    def __init__(self):
        """This class extracts the feature maps from a pretrained Resnet model."""
        super(resnet_feature_extractor, self).__init__()
        self.model = resnet50(weights=ResNet50_Weights.DEFAULT)

        self.model.eval()
        for param in self.model.parameters():
            param.requires_grad = False

        

        # Hook to extract feature maps
        def hook(module, input, output) -> None:
            """This hook saves the extracted feature map on self.featured."""
            self.features.append(output)

        self.model.layer2[-1].register_forward_hook(hook)            
        self.model.layer3[-1].register_forward_hook(hook) 

    def forward(self, input):

        self.features = []
        with torch.no_grad():
            _ = self.model(input)

        self.avg = torch.nn.AvgPool2d(3, stride=1)
        fmap_size = self.features[0].shape[-2]         # Feature map sizes h, w
        self.resize = torch.nn.AdaptiveAvgPool2d(fmap_size)

        resized_maps = [self.resize(self.avg(fmap)) for fmap in self.features]
        patch = torch.cat(resized_maps, 1)            # Merge the resized feature maps

        return patch


In [None]:
transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor()
])

train_image_path = Path('/kaggle/input/goodsad/food_package/food_package/train')

good_dataset = ImageFolder(root=train_image_path, transform=transform)
train_dataset, test_dataset = torch.utils.data.random_split(good_dataset, [0.8, 0.2])

# Set the batch size
BS = 16

# Create data loaders for training and testing datasets
train_loader = DataLoader(train_dataset, batch_size=BS, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BS, shuffle=True)


In [None]:
image = Image.open(r'/kaggle/input/goodsad/cigarette_box/cigarette_box/test/good/000_001.jpg')
image = transform(image).unsqueeze(0)

backbone = resnet_feature_extractor()
feature = backbone(image)

print(backbone.features[0].shape)
print(backbone.features[1].shape)

print(feature.shape)

plt.imshow(image[0].permute(1,2,0))

In [None]:
# Select 10 random indices for feature maps
indices = torch.randperm(1536)[:10]

# Plot the selected feature maps
fig, axes = plt.subplots(2, 5, figsize=(15, 6))
for i, idx in enumerate(indices):
    row = i // 5
    col = i % 5
    axes[row, col].imshow(feature[0,idx].detach().cpu(), cmap='gray')
    axes[row, col].set_title(f'Feature Map {idx}')
    axes[row, col].axis('off')
plt.tight_layout()
plt.show()

### Training

In [None]:
model = FeatCAE(in_channels=1536, latent_dim=100).to(device)
backbone.to(device)
# Define loss function and optimizer
criterion = torch.nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Define a list to store training loss and validation loss
Loss = []
Validation_Loss = []


num_epochs = 70
for epoch in tqdm(range(num_epochs)):
    model.train()
    for data,_ in train_loader:
        with torch.no_grad():
            features = backbone(data.to(device))
        # Forward pass
        output = model(features)
        # Compute the loss
        loss = criterion(output, features)
        # Backpropagation and optimization step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    Loss.append(loss.item())

    # Calculate validation loss
    model.eval()  # Set model to evaluation mode
    with torch.no_grad():
        val_loss_sum = 0.0
        num_batches = 0
        for data, _ in test_loader:
            features = backbone(data.to(device))
            output = model(features)
            val_loss = criterion(output, features)
            val_loss_sum += val_loss.item()
            num_batches += 1
        val_loss_avg = val_loss_sum / num_batches
        Validation_Loss.append(val_loss_avg)
    
    if epoch % 5 == 0:
        print('Epoch [{}/{}], Loss: {:.4f}, Validation Loss: {:.4f}'.format(epoch + 1, num_epochs, loss.item(), val_loss_avg))

plt.plot(Loss, label='Training Loss')
plt.plot(Validation_Loss, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.savefig('AE-Resnet_food_package.png')
plt.show()

In [None]:
torch.save(model.state_dict(), 'AE-Resnet_GoodsAD_food_package.pth')

In [None]:
ckpoints = torch.load('/kaggle/working/AE-Resnet_GoodsAD_drink_bottle.pth')
model.load_state_dict(ckpoints)

In [None]:
image = Image.open(r'/kaggle/input/goodsad/cigarette_box/cigarette_box/test/good/000_001.jpg')
image = transform(image).unsqueeze(0)

with torch.no_grad():
    features = backbone(image.cuda())
    recon = model(features)

recon_error =  ((features-recon)**2).mean(axis=(1)).unsqueeze(0)

segm_map = torch.nn.functional.interpolate(     # Upscale by bi-linaer interpolation to match the original input resolution
                recon_error,
                size=(224, 224),
                mode='bilinear'
            )

plt.imshow(segm_map.squeeze().cpu().numpy(), cmap='jet')
plt.show()

### Finding Threshold

In [None]:
def decision_function(segm_map):  

    mean_top_10_values = []

    for map in segm_map:
        # Flatten the tensor
        flattened_tensor = map.reshape(-1)

        # Sort the flattened tensor along the feature dimension (descending order)
        sorted_tensor, _ = torch.sort(flattened_tensor,descending=True)

        # Take the top 10 values along the feature dimension
        mean_top_10_value = sorted_tensor[:10].mean()

        mean_top_10_values.append(mean_top_10_value)

    return torch.stack(mean_top_10_values)

In [None]:
model.eval()

ANOMALY_ERROR=[]
for data,_ in train_loader:
    
    with torch.no_grad():
        features = backbone(data.cuda()).squeeze()
        # Forward pass
        recon = model(features)
    # Compute the loss
    segm_map =  ((features-recon)**2).mean(axis=(1))[:,3:-3,3:-3]
    anomaly_score = decision_function(segm_map)
    # anomaly_score = segm_map.mean(axis=(1,2))
    
    ANOMALY_ERROR.append(anomaly_score)
    
ANOMALY_ERROR = torch.cat(ANOMALY_ERROR).cpu().numpy()

In [None]:
best_threshold = np.mean(ANOMALY_ERROR) + 3 * np.std(ANOMALY_ERROR)

heat_map_max, heat_map_min = np.max(ANOMALY_ERROR), np.min(ANOMALY_ERROR)

plt.hist(ANOMALY_ERROR,bins=50)
plt.vlines(x=best_threshold,ymin=0,ymax=30,color='r')
plt.show()

### Classify the Test

In [None]:
class_labels = []
y_true=[]
y_pred=[]
y_score=[]

model.eval()
backbone.eval()

test_path = Path(r'/kaggle/input/goodsad/food_package/food_package/test')
class_dirs = [d.name for d in test_path.iterdir() if d.is_dir()]

for class_name in class_dirs:
    folder_path = test_path / class_name

    for pth in tqdm(folder_path.iterdir(),leave=False):

        class_label = pth.parts[-2]
        with torch.no_grad():
            test_image = transform(Image.open(pth)).cuda().unsqueeze(0)
            features = backbone(test_image)
            # Forward pass
            recon = model(features)
            segm_map = ((features - recon)**2).mean(axis=(1))[:,3:-3,3:-3]
            y_score_image = decision_function(segm_map=segm_map)
            # y_score_image = segm_map.mean(axis=(1,2))

            y_score.append(y_score_image.cpu().numpy())
            y_true.append(0 if class_label == 'good' else 1)
            
        class_labels.append(class_label)
            
y_true = np.array(y_true)
y_score = np.array(y_score)
    

In [None]:
plt.hist(y_score,bins=50)
plt.vlines(x=best_threshold,ymin=0,ymax=30,color='r')
plt.show()

In [None]:
# Calculate AUC-ROC score
auc_roc_score = roc_auc_score(y_true, y_score)
print("AUC-ROC Score:", auc_roc_score)

# Plot ROC curve
fpr, tpr, thresholds = roc_curve(y_true, y_score)
plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % auc_roc_score)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.savefig('Result_AE-Resnet_food_package.png')
plt.show()


f1_scores = [f1_score(y_true, y_score >= threshold) for threshold in thresholds]
# Select the best threshold based on F1 score
best_threshold = thresholds[np.argmax(f1_scores)]

print(f'best_threshold = {best_threshold}')

accuracy = accuracy_score(y_true, (y_score >= best_threshold).astype(int))
print("Accuracy:", accuracy)

f1 = f1_score(y_true, (y_score >= best_threshold).astype(int))
print("F1 Score:", f1)

# Generate confusion matrix
cm = confusion_matrix(y_true, (y_score >= best_threshold).astype(int))
disp = ConfusionMatrixDisplay(confusion_matrix=cm,display_labels=['OK','NOK'])
disp.plot()
plt.show()


In [None]:
import matplotlib.pyplot as plt

import numpy as np


y_predict = (y_score >= best_threshold).astype(int)
# Convert list_1 to binary labels
binary_labels = [0 if label == 'good' else 1 for label in class_labels]

# Calculate accuracy for each label
unique_labels = np.unique(class_labels)
accuracies = []
for label in unique_labels:
    label_mask = np.array(class_labels) == label
    label_accuracy = accuracy_score(np.array(binary_labels)[label_mask], np.array(y_predict)[label_mask])
    accuracies.append(label_accuracy)

# Plot accuracy for each label
plt.figure()
plt.bar(unique_labels, accuracies, color='skyblue')
plt.xlabel('Labels')
plt.ylabel('Accuracy')
plt.title('Accuracy for Each Label')
plt.ylim(0, 1)
plt.show()

# Visualize Prediction

In [None]:
# import cv2, time
# from IPython.display import clear_output

# model.eval()
# backbone.eval()

# test_path = Path('/kaggle/input/goodsad/food_package/food_package/test')

# for path in test_path.glob('*/*.png'):
#     fault_type = path.parts[-2]
#     if fault_type == 'good':
#         continue
#     test_image = transform(Image.open(path)).cuda().unsqueeze(0)
#     true_path = str(path).replace('/test/', '/ground_truth/').replace('.png', '_mask.png')
#     true_image = transform(Image.open(true_path)).cuda().unsqueeze(0)
#     with torch.no_grad():
#         features = backbone(test_image)
#         # Forward pass
#         recon = model(features)
    
#     segm_map = ((features - recon)**2).mean(axis=(1))
#     y_score_image = decision_function(segm_map=segm_map)
#     # y_score_image = segm_map.mean(axis=(1,2))
    
#     y_pred_image = 1*(y_score_image >= best_threshold)
#     class_label = ['OK','NOK']

#     plt.figure(figsize=(20,5))

#     plt.subplot(1,4,1)
#     plt.imshow(test_image.squeeze().permute(1,2,0).cpu().numpy())
#     plt.title(f'fault type: {fault_type}')

#     plt.subplot(1,4,2)
#     heat_map = segm_map.squeeze().cpu().numpy()
#     heat_map = heat_map
#     heat_map = cv2.resize(heat_map, (128,128))
#     plt.imshow(heat_map, cmap='jet', vmin=heat_map_min, vmax=heat_map_max*10) # Here I am cheating by multiplying by 10 (obtained using trail error)
#     plt.title(f'Anomaly score: {y_score_image[0].cpu().numpy() / best_threshold:0.4f} || {class_label[y_pred_image]}')

#     plt.subplot(1,4,3)
#     plt.imshow((heat_map > best_threshold * 10), cmap='gray')
#     plt.title(f'segmentation map')

#     plt.subplot(1,4,4)
#     plt.imshow(true_image.squeeze().cpu().numpy())
#     plt.title(f'groundtruth')
    
#     plt.show()


    


In [None]:
import os

# Specify the path to the file you want to remove
file_path = "/kaggle/working/AE-Resnet_cigarette_box.png"

# Check if the file exists, then remove it
if os.path.exists(file_path):
    os.remove(file_path)
    print(f"File '{file_path}' has been removed.")
else:
    print(f"File '{file_path}' does not exist.")