<a href="https://colab.research.google.com/github/Hamouda-Yasmine/PFE-X-ray_image_search_using_CBIR_deepL/blob/main/AutoencodeurCBIR.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!nvidia-smi

In [None]:
import glob
from itertools import chain
import os
import random
import zipfile
from tqdm.notebook import tqdm

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from PIL import Image
from sklearn.model_selection import train_test_split
import cv2

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR, CosineAnnealingLR, ReduceLROnPlateau
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms, models
import torch

import torchvision.transforms as T

In [None]:
class FolderDataset(Dataset):
    """
    Creates a PyTorch dataset from folder, returning two tensor images.
    Args:
    main_dir : directory where images are stored.
    transform (optional) : torchvision transforms to be applied while making dataset
    """

    def __init__(self, main_dir, transform=None):
        self.main_dir = main_dir
        self.transform = transform
        self.all_imgs = os.listdir(main_dir)

    def __len__(self):
        return len(self.all_imgs)

    def __getitem__(self, idx):
        img_loc = os.path.join(self.main_dir, self.all_imgs[idx])
        image = Image.open(img_loc).convert("RGB")

        if self.transform is not None:
            tensor_image = self.transform(image)

        return tensor_image, tensor_image

In [None]:
class ConvEncoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 16, (3, 3), padding=(1, 1))
        self.relu1 = nn.ReLU(inplace=True)
        self.maxpool1 = nn.MaxPool2d((2, 2))

        self.conv2 = nn.Conv2d(16, 32, (3, 3), padding=(1, 1))
        self.relu2 = nn.ReLU(inplace=True)
        self.maxpool2 = nn.MaxPool2d((2, 2))

        self.conv3 = nn.Conv2d(32, 64, (3, 3), padding=(1, 1))
        self.relu3 = nn.ReLU(inplace=True)
        self.maxpool3 = nn.MaxPool2d((2, 2))

        self.conv4 = nn.Conv2d(64, 128, (3, 3), padding=(1, 1))
        self.relu4 = nn.ReLU(inplace=True)
        self.maxpool4 = nn.MaxPool2d((2, 2))

        self.conv5 = nn.Conv2d(128, 256, (3, 3), padding=(1, 1))
        self.relu5 = nn.ReLU(inplace=True)
        self.maxpool5 = nn.MaxPool2d((2, 2))
        #self.dropout=nn.Dropout(0.5)

    def forward(self, x):
        # Downscale the image with conv maxpool etc.
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.maxpool1(x)

        x = self.conv2(x)
        x = self.relu2(x)
        x = self.maxpool2(x)

        x = self.conv3(x)
        x = self.relu3(x)
        x = self.maxpool3(x)

        x = self.conv4(x)
        x = self.relu4(x)
        x = self.maxpool4(x)

        x = self.conv5(x)
        x = self.relu5(x)
        x = self.maxpool5(x)
        #x =self.dropout(x)

        return x

In [None]:
class ConvDecoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.deconv1 = nn.ConvTranspose2d(256, 128, (2, 2), stride=(2, 2))
        self.relu1 = nn.ReLU(inplace=True)

        self.deconv2 = nn.ConvTranspose2d(128, 64, (2, 2), stride=(2, 2))
        self.relu2 = nn.ReLU(inplace=True)

        self.deconv3 = nn.ConvTranspose2d(64, 32, (2, 2), stride=(2, 2))
        self.relu3 = nn.ReLU(inplace=True)

        self.deconv4 = nn.ConvTranspose2d(32, 16, (2, 2), stride=(2, 2))
        self.relu4 = nn.ReLU(inplace=True)

        self.deconv5 = nn.ConvTranspose2d(16, 3, (2, 2), stride=(2, 2))
        self.relu5 = nn.ReLU(inplace=True)

    def forward(self, x):
         # Upscale the image with convtranspose etc.
        x = self.deconv1(x)
        x = self.relu1(x)

        x = self.deconv2(x)
        x = self.relu2(x)

        x = self.deconv3(x)
        x = self.relu3(x)

        x = self.deconv4(x)
        x = self.relu4(x)

        x = self.deconv5(x)
        x = self.relu5(x)
        return x

In [None]:

def train_step(encoder, decoder, train_loader, loss_fn, optimizer, device):
    """
    Performs a single training step
    Args:
    encoder: A convolutional Encoder. E.g. torch_model ConvEncoder
    decoder: A convolutional Decoder. E.g. torch_model ConvDecoder
    train_loader: PyTorch dataloader, containing (images, images).
    loss_fn: PyTorch loss_fn, computes loss between 2 images.
    optimizer: PyTorch optimizer.
    device: "cuda" or "cpu"
    Returns: Train Loss
    """
    #  Set networks to train mode.
    encoder.train()
    decoder.train()

    for batch_idx, (train_img, target_img) in enumerate(train_loader):
        # Move images to device
        train_img = train_img.to(device)
        target_img = target_img.to(device)

        # Zero grad the optimizer
        optimizer.zero_grad()
        # Feed the train images to encoder
        enc_output = encoder(train_img)
        # The output of encoder is input to decoder !
        dec_output = decoder(enc_output)

        # Decoder output is reconstructed image
        # Compute loss with it and orginal image which is target image.
        loss = loss_fn(dec_output, target_img)
        # Backpropogate
        loss.backward()
        # Apply the optimizer to network by calling step.
        optimizer.step()
    # Return the loss
    return loss.item()

def val_step(encoder, decoder, val_loader, loss_fn, device):
    """
    Performs a single training step
    Args:
    encoder: A convolutional Encoder. E.g. torch_model ConvEncoder
    decoder: A convolutional Decoder. E.g. torch_model ConvDecoder
    val_loader: PyTorch dataloader, containing (images, images).
    loss_fn: PyTorch loss_fn, computes loss between 2 images.
    device: "cuda" or "cpu"
    Returns: Validation Loss
    """

    # Set to eval mode.
    encoder.eval()
    decoder.eval()

    # We don't need to compute gradients while validating.
    with torch.no_grad():
        for batch_idx, (train_img, target_img) in enumerate(val_loader):
            # Move to device
            train_img = train_img.to(device)
            target_img = target_img.to(device)

            # Again as train. Feed encoder the train image.
            enc_output = encoder(train_img)
            # Decoder takes encoder output and reconstructs the image.
            dec_output = decoder(enc_output)

            # Validation loss for encoder and decoder.
            loss = loss_fn(dec_output, target_img)
    # Return the loss
    return loss.item()

In [None]:
transforms = T.Compose([T.ToTensor()]) # Normalize the pixels and convert to tensor.
"""transforms = T.Compose([
    T.Resize((224,224)),
    T.RandomHorizontalFlip(),
    T.ToTensor(),
    T.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
transforms = T.Compose([
    T.Resize(256),
    T.CenterCrop(224),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])"""
full_dataset=FolderDataset("/content/drive/MyDrive/IRMA/train/",transforms)
print(len(full_dataset))
train_size=int(0.80* len(full_dataset))
val_size=len(full_dataset)-train_size
#train_size = 0.75
#val_size = 1 - train_size

# Split data to train and test
train_dataset, val_dataset = torch.utils.data.random_split(full_dataset, [train_size, val_size])
print(len(train_dataset)+len(val_dataset))


# Create the train dataloader
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=22, shuffle=True)

print(len(train_dataset))
# Create the validation dataloader
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=22)
print(len(val_dataset))

# Create the full dataloader
full_loader = torch.utils.data.DataLoader(full_dataset, batch_size=20)



12941
12941
10352
2589


In [None]:
loss_fn = nn.MSELoss() # We use Mean squared loss which computes difference between two images.
encoder = ConvEncoder() # Our encoder model
decoder = ConvDecoder() # Our decoder model
device = "cuda"
encoder.to(device)
decoder.to(device)
autoencoder_params = list(encoder.parameters()) + list(decoder.parameters())
optimizer = optimizer = torch.optim.SGD(autoencoder_params, lr=0.1, momentum=0.9) # SGD Optimizer

In [None]:
from torchsummary import summary
summary(encoder,(3, 512, 256))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 16, 512, 256]             448
              ReLU-2         [-1, 16, 512, 256]               0
         MaxPool2d-3         [-1, 16, 256, 128]               0
            Conv2d-4         [-1, 32, 256, 128]           4,640
              ReLU-5         [-1, 32, 256, 128]               0
         MaxPool2d-6          [-1, 32, 128, 64]               0
            Conv2d-7          [-1, 64, 128, 64]          18,496
              ReLU-8          [-1, 64, 128, 64]               0
         MaxPool2d-9           [-1, 64, 64, 32]               0
           Conv2d-10          [-1, 128, 64, 32]          73,856
             ReLU-11          [-1, 128, 64, 32]               0
        MaxPool2d-12          [-1, 128, 32, 16]               0
           Conv2d-13          [-1, 256, 32, 16]         295,168
             ReLU-14          [-1, 256,

In [None]:
EPOCHS =6
max_loss=9999
for epoch in tqdm(range(EPOCHS)):
        train_loss = train_step(encoder, decoder, train_loader, loss_fn, optimizer, device=device)
        print(f"Epochs = {epoch}, Training Loss : {train_loss}")
        val_loss = val_step(encoder, decoder, val_loader, loss_fn, device=device)
        print(f"Epochs = {epoch}, Validation Loss : {val_loss}")
        if val_loss < max_loss:
            print("Validation Loss decreased, saving new best model")
            torch.save(encoder.state_dict(), "encoder_model_irmaASGD1.pt")
            torch.save(decoder.state_dict(), "decoder_model_irmaASGD1.pt")
            max_loss=val_loss

  0%|          | 0/6 [00:00<?, ?it/s]

Epochs = 0, Training Loss : 0.17872221767902374
Epochs = 0, Validation Loss : 0.16518794000148773
Validation Loss decreased, saving new best model
dkhaal
Epochs = 1, Training Loss : 0.18129834532737732
Epochs = 1, Validation Loss : 0.16518782079219818
Validation Loss decreased, saving new best model
dkhaal
Epochs = 2, Training Loss : 0.185751810669899
Epochs = 2, Validation Loss : 0.16508479416370392
Validation Loss decreased, saving new best model
dkhaal
Epochs = 3, Training Loss : 0.20731687545776367
Epochs = 3, Validation Loss : 0.1647532880306244
Validation Loss decreased, saving new best model
dkhaal
Epochs = 4, Training Loss : 0.1762389987707138
Epochs = 4, Validation Loss : 0.16652119159698486
Epochs = 5, Training Loss : 0.19068633019924164
Epochs = 5, Validation Loss : 0.1650882512331009


In [None]:
#full_dataset=FolderDataset("/content/drive/MyDrive/PFE/alldata/",transforms)
full_loader=torch.load('/content/drive/MyDrive/3e-4 sgd/fullloaderirma3E-4.pth')
#torch.save(full_loader, 'fulldataloader.pth')
print(len(full_loader.dataset.all_imgs))

12941


In [None]:
torch.save(full_loader, 'fullloaderirma1.pth')

In [None]:
def create_embedding(encoder, full_loader, embedding_dim, device):
    """
    Creates embedding using encoder from dataloader.
    encoder: A convolutional Encoder. E.g. torch_model ConvEncoder
    full_loader: PyTorch dataloader, containing (images, images) over entire dataset.
    embedding_dim: Tuple (c, h, w) Dimension of embedding = output of encoder dimesntions.
    device: "cuda" or "cpu"
    Returns: Embedding of size (num_images_in_loader + 1, c, h, w)
    """
    # Set encoder to eval mode.
    encoder.eval()
    # Just a place holder for our 0th image embedding.
    embedding = torch.randn(embedding_dim)

    # Again we do not compute loss here so. No gradients.
    with torch.no_grad():
        for batch_idx, (train_img, target_img) in enumerate(full_loader):
            # We can compute this on GPU. be faster
            train_img = train_img.to(device)
            print(batch_idx)

            # Get encoder outputs and move outputs to cpu
            enc_output = encoder(train_img).cpu()
            # Keep adding these outputs to embeddings.
            embedding = torch.cat((embedding, enc_output), 0)

    # Return the embeddings
    return embedding

In [None]:
# Save the feature representations.
EMBEDDING_SHAPE = (1, 256, 8, 16) # This we know from our encoder

# We need feature representations for complete dataset not just train and validation.
# Hence we use full loader here.
ENCODER_MODEL_PATH = "/content/encoder_model_irmaASGD6.pt"
device = torch.device("cuda")
encoder =ConvEncoder()
print (device)
# Load the state dict of encoder
encoder.load_state_dict(torch.load(ENCODER_MODEL_PATH, map_location=device))
encoder.eval()
encoder.to(device)
embedding = create_embedding(encoder, full_loader, EMBEDDING_SHAPE, device)

# Convert embedding to numpy and save them
numpy_embedding = embedding.cpu().detach().numpy()
num_images = numpy_embedding.shape[0]

# Save the embeddings for complete dataset, not just train
flattened_embedding = numpy_embedding.reshape((num_images, -1))
np.save("data_embedding_SGD-IRMA-6.npy", flattened_embedding)


In [None]:
embedding = create_embedding(encoder, full_loader, EMBEDDING_SHAPE, device)
numpy_embedding = embedding.cpu().detach().numpy()
num_images = numpy_embedding.shape[0]
flattened_embedding = numpy_embedding.reshape((num_images, -1))
np.save("data_embedding_SGD-IRMA-6.npy", flattened_embedding)

In [None]:
import torch
import numpy as np

from sklearn.neighbors import NearestNeighbors
import torchvision.transforms as T
import os
from PIL import Image
import matplotlib.pyplot as plt
%matplotlib inline
def compute_similar_images(image_path, num_images, embedding, device):
    image_tensor = load_image_tensor(image_path, device)
    # image_tensor = image_tensor.to(device)

    with torch.no_grad():
        image_embedding = encoder(image_tensor).cpu().detach().numpy()

    #print(image_embedding.shape)

    flattened_embedding = image_embedding.reshape((image_embedding.shape[0], -1))
    print(flattened_embedding.shape)

    knn = NearestNeighbors(n_neighbors=num_images, metric="correlation")
    knn.fit(embedding)

    x, indices = knn.kneighbors(flattened_embedding)
    indices_list = indices.tolist()
    print(indices_list)
    return indices_list

In [None]:
numpy_embedding = embedding.cpu().detach().numpy()
print(numpy_embedding.shape)
num_images = numpy_embedding.shape[0]
print(num_images)
flattened_embedding = numpy_embedding.reshape((num_images, -1))
print(flattened_embedding.shape)

(12942, 256, 8, 16)
12942
(12942, 32768)


In [None]:
import numpy as np
np.save("data_embedding_f.npy_sgd_irma", flattened_embedding)

In [None]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(device)
def load_image_tensor(image_path, device):
    image_tensor = T.ToTensor()(Image.open(image_path).convert('RGB'))
    image_tensor = image_tensor.unsqueeze(0)
    print(image_tensor.shape)
    # input_images = image_tensor.to(device)
    return image_tensor

cuda


In [None]:

def plot_similar_images(indices_list):
    indices = indices_list[0]
    for index in indices:
        img_name = str(index - 1) + ".png"
        img_path = os.path.join("/content/dataset/" + img_name)
        print(img_path)
        img = Image.open(img_path).convert("RGB")
        plt.imshow(img)
        plt.show()

In [None]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
encoder =ConvEncoder()

# Load the state dict of encoder
encoder.load_state_dict(torch.load(ENCODER_MODEL_PATH, map_location=device))
encoder.eval()
encoder.to(device)

# Loads the embedding
embedding = np.load(EMBEDDING_PATH)

indices_list = compute_similar_images(TEST_IMAGE_PATH, NUM_IMAGES, embedding, device)
plot_similar_images(indices_list)