In [1]:
import os
import base64
from io import BytesIO
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, random_split
from sklearn.metrics import confusion_matrix, recall_score
import numpy as np
import pandas as pd
from PIL import Image

In [None]:
# Path to image dataset
data_dir = "../Dataset/augmented_images(Stef)/"

In [3]:
# Define image transformations
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    # Normalization values for pretrained DenseNet121 on ImageNet
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225])
])

# Create dataset using ImageFolder (assumes subdirectories per class)
dataset = datasets.ImageFolder(data_dir, transform=transform)
num_classes = len(dataset.classes)
print("Detected classes:", dataset.classes)

Detected classes: ['augmented_images(blisters)', 'augmented_images(bottles)', 'augmented_images(boxes)', 'augmented_images(boxes_blisters)']


In [4]:
# Split dataset: 70% train, 30% test
num_total = len(dataset)
num_train = int(0.7 * num_total)
num_test = num_total - num_train
train_dataset, test_dataset = random_split(dataset, [num_train, num_test])

In [5]:
# Create DataLoaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Setup device (GPU if available)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Load DenseNet121 model, pre-trained on ImageNet
model = models.densenet121(pretrained=True)
# Replace the classifier to match the number of classes in your dataset
model.classifier = nn.Linear(model.classifier.in_features, num_classes)
model = model.to(device)

# Change made : Freeze the feature extraction layers
for param in model.features.parameters():
    param.requires_grad = False

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training parameters
num_epochs = 10



In [6]:
# Showing the model architecture
print(model)

DenseNet(
  (features): Sequential(
    (conv0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (norm0): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu0): ReLU(inplace=True)
    (pool0): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (denseblock1): _DenseBlock(
      (denselayer1): _DenseLayer(
        (norm1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu1): ReLU(inplace=True)
        (conv1): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (norm2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu2): ReLU(inplace=True)
        (conv2): Conv2d(128, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      )
      (denselayer2): _DenseLayer(
        (norm1): BatchNorm2d(96, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu

In [7]:
# Training and evaluation loop
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()  # zero the parameter gradients
        
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * inputs.size(0)
    
    epoch_loss = running_loss / num_train
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}")
    
    # Evaluation on the test set
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.numpy())
    
    # Compute metrics
    cm = confusion_matrix(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds, average='macro')
    
    print(f"Confusion Matrix:\n{cm}")
    print(f"Test Recall (macro): {recall:.4f}")  

Epoch 1/10, Loss: 0.3034
Confusion Matrix:
[[4893   38   65   56]
 [   6 1275   38    4]
 [  22   52 1801   18]
 [ 123   14   29  605]]
Test Recall (macro): 0.9171
Epoch 2/10, Loss: 0.1718
Confusion Matrix:
[[4929   36   49   38]
 [   4 1290   28    1]
 [  24   53 1804   12]
 [ 131   16   28  596]]
Test Recall (macro): 0.9192
Epoch 3/10, Loss: 0.1541
Confusion Matrix:
[[4929   29   48   46]
 [   3 1284   32    4]
 [  22   37 1814   20]
 [  95    5   27  644]]
Test Recall (macro): 0.9349
Epoch 4/10, Loss: 0.1455
Confusion Matrix:
[[4898   25   43   86]
 [   4 1290   24    5]
 [  22   31 1815   25]
 [  51    4   18  698]]
Test Recall (macro): 0.9522
Epoch 5/10, Loss: 0.1359
Confusion Matrix:
[[4934   25   50   43]
 [   5 1286   29    3]
 [  18   27 1838   10]
 [  89    5   28  649]]
Test Recall (macro): 0.9403
Epoch 6/10, Loss: 0.1248
Confusion Matrix:
[[4925   15   58   54]
 [   3 1263   53    4]
 [  15   16 1850   12]
 [  60    1   33  677]]
Test Recall (macro): 0.9462
Epoch 7/10, Loss

In [8]:
# Saving the model and optimiser state
torch.save({
    "epoch": num_epochs,
    "model_state_dict": model.state_dict(),
    "optimizer_state_dict": optimizer.state_dict(),
    "loss": epoch_loss,
}, "densenet121_full_checkpoint_iter1_new_dataset.pth")

### Getting Top N Similar Images


In [9]:
# ============
# 1. Define a Feature Extractor Class
# ============
class DenseNetFeatureExtractor(nn.Module):
    def __init__(self, original_model):
        super(DenseNetFeatureExtractor, self).__init__()
        # Use the pretrained feature layers from DenseNet
        self.features = original_model.features
        # DenseNet usually applies a ReLU and an adaptive average pooling
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        # Save the number of features (this is used by the classifier normally)
        self.out_features = original_model.classifier.in_features

    def forward(self, x):
        x = self.features(x)
        x = F.relu(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        return x

In [11]:
# ============
# 2. Load the Saved Classification Model and Convert It
# ============
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load your first iteration DenseNet model (assumes you saved the state_dict)
model = models.densenet121(pretrained=False)
num_classes = 4  # Replace with your number of classes
model.classifier = nn.Linear(model.classifier.in_features, num_classes)
checkpoint = torch.load("densenet121_full_checkpoint_iter1_new_dataset.pth", map_location=device)
model.load_state_dict(checkpoint["model_state_dict"])
model.eval()

# Create a feature extractor by removing the classifier (using our wrapper)
feature_extractor = DenseNetFeatureExtractor(model)
feature_extractor.to(device)
feature_extractor.eval()


DenseNetFeatureExtractor(
  (features): Sequential(
    (conv0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (norm0): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu0): ReLU(inplace=True)
    (pool0): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (denseblock1): _DenseBlock(
      (denselayer1): _DenseLayer(
        (norm1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu1): ReLU(inplace=True)
        (conv1): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (norm2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu2): ReLU(inplace=True)
        (conv2): Conv2d(128, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      )
      (denselayer2): _DenseLayer(
        (norm1): BatchNorm2d(96, eps=1e-05, momentum=0.1, affine=True, track_running_stats=Tru

In [12]:
# ============
# 3. Define Image Transformations (same as used during training)
# ============
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225])
])

In [None]:
# ============
# 4. Compute and Store Embeddings for the Entire Dataset
# ============
# Use the folder that contains your images in the expected ImageFolder structure.
dataset_dir = "../../Dataset/Labelled_Images_blisterPriority/"  # Update as needed
dataset = datasets.ImageFolder(dataset_dir, transform=transform)
dataloader = DataLoader(dataset, batch_size=32, shuffle=False)

all_embeddings = []
with torch.no_grad():
    for inputs, _ in dataloader:
        inputs = inputs.to(device)
        embeddings = feature_extractor(inputs)
        all_embeddings.append(embeddings.cpu())
all_embeddings = torch.cat(all_embeddings, dim=0)

# Get corresponding file paths (ImageFolder stores these in dataset.samples)
all_image_paths = [s[0] for s in dataset.samples]

FileNotFoundError: [Errno 2] No such file or directory: '../../Dataset/augmented_images(stef)/'

In [None]:
# ============
# 5. Define a Function to Retrieve Top N Similar Images
# ============
def get_top_n_similar(query_image_path, top_n=5):
    # Load and process the query image
    query_image = Image.open(query_image_path).convert("RGB")
    query_tensor = transform(query_image).unsqueeze(0).to(device)
    with torch.no_grad():
        query_embedding = feature_extractor(query_tensor).cpu()
    # Compute cosine similarity between the query and all dataset embeddings
    similarities = F.cosine_similarity(query_embedding, all_embeddings)
    # Get the indices of the top N most similar images
    top_n_indices = torch.topk(similarities, k=top_n).indices.numpy()
    top_n_similarities = similarities[top_n_indices].numpy()
    top_n_paths = [all_image_paths[i] for i in top_n_indices]
    return list(zip(top_n_paths, top_n_similarities))

In [None]:
query_image_paths = [
    "../../Dataset/Labelled_Images_blisterPriority/Blisters/Abacavir_,_Lamivudine_0 - 600mg_300mg - v1 - aug0 - Blisters.jpg",
    "../../Dataset/Labelled_Images_blisterPriority/Blisters/Zopiclone_Tab_0 - 7.5mg - v1 - aug0 - Blisters.jpg",
    "../../Dataset/Labelled_Images_blisterPriority/Bottle/Abacavir_,_Lamivudine_Tab_0 - 600mg_300mg - v1 - aug0 - Bottle.jpg",
    "../../Dataset/Labelled_Images_blisterPriority/Bottle/Zuclopenthixol_Tab_0 - 10mg - v1 - aug3 - Bottle.jpg",
    "../../Dataset/Labelled_Images_blisterPriority/Box/Abacavir_,_Lamivudine_0 - 600mg_300mg - v2 - aug0 - Box.jpg",
    "../../Dataset/Labelled_Images_blisterPriority/Box/Vosevi_Tab_28_s_1 - 400mg_100mg_100mg - v1 - aug3 - Box.jpg"
]

In [None]:
# ============
# 6. Run a Query and Save the Top N Results to a CSV
# ============

def get_filename_without_extension(path):
    """Extract only the base file name without extension."""
    base = os.path.basename(path)
    name_without_ext = os.path.splitext(base)[0]
    return name_without_ext

def get_category_from_path(path):
    """Extract the image class from the parent folder name."""
    return os.path.basename(os.path.dirname(path))

def image_to_data_uri(image_path, max_size=(150, 150), quality=50):
    """
    Open an image, resize it, compress to JPEG, and return a data URI string.
    This embeds the image directly in the HTML.
    """
    try:
        im = Image.open(image_path).convert("RGB")
        im.thumbnail(max_size)
        buffer = BytesIO()
        im.save(buffer, format="JPEG", quality=quality)
        encoded_string = base64.b64encode(buffer.getvalue()).decode("utf-8")
        data_uri = f"data:image/jpeg;base64,{encoded_string}"
        return data_uri
    except Exception as e:
        print(f"Error converting {image_path}: {e}")
        return ""

# --- New Similarity Function Based on a Threshold ---
def get_similar_images_above_threshold(query_image_path, threshold):
    """
    Process the query image, compute its embedding, and return all dataset images
    with cosine similarity above the given threshold.
    """
    # Load and process the query image
    query_image = Image.open(query_image_path).convert("RGB")
    query_tensor = transform(query_image).unsqueeze(0).to(device)
    
    # Compute the query embedding
    with torch.no_grad():
        query_embedding = feature_extractor(query_tensor).cpu()
    
    # Compute cosine similarity with all dataset embeddings
    similarities = F.cosine_similarity(query_embedding, all_embeddings)
    
    # Get indices where similarity is above the threshold
    indices = (similarities >= threshold).nonzero(as_tuple=True)[0].numpy()
    
    # Collect the results as tuples of (file_path, similarity_score)
    results = []
    for i in indices:
        results.append((all_image_paths[i], similarities[i].item()))
    return results

# --- Settings and Global Variables ---
# (Assume that transform, device, feature_extractor, all_embeddings, and all_image_paths
#  have already been defined and computed as in previous code.)
threshold = 0.7  # Adjust the threshold as needed

# Build results for each query image.
all_results_by_query = []  # List to store query info and grouped results

for query_img in query_image_paths:
    # Use the new function to get all similar images above the threshold
    similar_results = get_similar_images_above_threshold(query_img, threshold)
    
    # Group results by category (Bottle, Box, Blisters)
    grouped = {"Bottle": [], "Box": [], "Blisters": []}
    for result_path, similarity in similar_results:
        cat = get_category_from_path(result_path)
        if cat in grouped:
            grouped[cat].append((result_path, similarity))
        else:
            grouped.setdefault(cat, []).append((result_path, similarity))
    
    all_results_by_query.append({
         "query_image": query_img,
         "query_category": get_category_from_path(query_img),
         "grouped_results": grouped
    })

# Group query rows by their own image category.
grouped_queries = {}  # key: query category, value: list of query info dicts
for query_info in all_results_by_query:
    cat = query_info["query_category"]
    grouped_queries.setdefault(cat, []).append(query_info)

# --- HTML Report Generation with Embedded Images ---
html_content = f"""
<html>
<head>
    <meta charset="utf-8">
    <title>Similar Images Report</title>
    <style>
        body {{
            font-family: Arial, sans-serif;
            margin: 20px;
        }}
        .threshold-info {{
            font-size: 18px;
            font-weight: bold;
            margin-bottom: 20px;
            text-align: center;
        }}
        .category-section {{
            margin-bottom: 40px;
            border: 2px solid #aaa;
            padding: 10px;
        }}
        .category-title {{
            font-size: 20px;
            font-weight: bold;
            margin-bottom: 20px;
            text-align: center;
        }}
        .row {{
            display: flex;
            align-items: flex-start;
            margin-bottom: 30px;
            border: 1px solid #ddd;
            padding: 10px;
        }}
        .image-container {{
            margin: 5px;
            text-align: center;
        }}
        .image-container img {{
            max-width: 150px;
            height: auto;
            display: block;
            margin-bottom: 5px;
        }}
        .filename {{
            font-size: 12px;
            word-break: break-all;
        }}
        .query-label {{
            font-weight: bold;
            margin-bottom: 5px;
        }}
        .category-container {{
            margin-left: 20px;
        }}
        .category-header {{
            font-size: 14px;
            font-weight: bold;
            margin-bottom: 10px;
            text-align: center;
        }}
    </style>
</head>
<body>
    <h2>Similar Images Report</h2>
    <div class="threshold-info">Current Cosine Similarity Threshold: {threshold}</div>
"""

# Create a section for each query category.
for cat, queries in grouped_queries.items():
    html_content += f'<div class="category-section">\n'
    html_content += f'  <div class="category-title">{cat} Query Images</div>\n'
    
    for query_info in queries:
        query_img = query_info["query_image"]
        query_filename = get_filename_without_extension(query_img)
        
        # Convert query image to data URI
        query_data_uri = image_to_data_uri(query_img)
        
        html_content += f'<div class="row">\n'
        # Query image container with embedded image
        html_content += '  <div class="image-container">\n'
        html_content += f'    <div class="query-label">Query:<br>{query_filename}</div>\n'
        html_content += f'    <img src="{query_data_uri}" alt="Query Image">\n'
        html_content += '  </div>\n'
        
        # Display results by their own category (Bottle, Box, Blisters)
        for result_cat in ["Bottle", "Box", "Blisters"]:
            results = query_info["grouped_results"].get(result_cat, [])
            if results:
                html_content += '  <div class="category-container">\n'
                html_content += f'    <div class="category-header">{result_cat} Results</div>\n'
                for result_path, similarity in results:
                    result_filename = get_filename_without_extension(result_path)
                    # Convert result image to data URI
                    result_data_uri = image_to_data_uri(result_path)
                    html_content += '    <div class="image-container">\n'
                    html_content += f'      <div class="filename">{result_filename}<br>Sim: {similarity:.4f}</div>\n'
                    html_content += f'      <img src="{result_data_uri}" alt="Result Image">\n'
                    html_content += '    </div>\n'
                html_content += '  </div>\n'
        
        html_content += '</div>\n'  # end of row
    html_content += '</div>\n'  # end of category section

html_content += """
</body>
</html>
"""

# Save the HTML report to a file
with open(f"new_dataset_similar_images_report_threshold{threshold}.html", "w", encoding="utf-8") as f:
    f.write(html_content)

print("HTML report saved: similar_images_report.html")

HTML report saved: similar_images_report.html
