In [49]:
import torch
from PIL import Image
import os
from tqdm import tqdm
from torch.utils.data import Dataset
import torchvision.transforms as T
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim

In [51]:
class FolderDataset(Dataset):
    def __init__(self, main_dir, transform=None):
        self.main_dir = main_dir
        self.transform = transform
        self.all_imgs = os.listdir(main_dir)

    def __len__(self):
        return len(self.all_imgs)

    def __getitem__(self, idx):
        img_loc = os.path.join(self.main_dir, self.all_imgs[idx])
        image = Image.open(img_loc).convert("RGB")
        if self.transform is not None:
            tensor_image = self.transform(image)
        return tensor_image, tensor_image

In [67]:
__all__ = ["ConvEncoder", "ConvDecoder"]

import torch
import torch.nn as nn

# import config


class ConvEncoder(nn.Module):
    """
    A simple Convolutional Encoder Model
    """

    def __init__(self):
        super().__init__()
        # self.img_size = img_size
        self.conv1 = nn.Conv2d(3, 16, (3, 3), padding=(1, 1))
        self.relu1 = nn.ReLU(inplace=True)
        self.maxpool1 = nn.MaxPool2d((2, 2))

        self.conv2 = nn.Conv2d(16, 32, (3, 3), padding=(1, 1))
        self.relu2 = nn.ReLU(inplace=True)
        self.maxpool2 = nn.MaxPool2d((2, 2))

        self.conv3 = nn.Conv2d(32, 64, (3, 3), padding=(1, 1))
        self.relu3 = nn.ReLU(inplace=True)
        self.maxpool3 = nn.MaxPool2d((2, 2))

        self.conv4 = nn.Conv2d(64, 128, (3, 3), padding=(1, 1))
        self.relu4 = nn.ReLU(inplace=True)
        self.maxpool4 = nn.MaxPool2d((2, 2))

        self.conv5 = nn.Conv2d(128, 256, (3, 3), padding=(1, 1))
        self.relu5 = nn.ReLU(inplace=True)
        self.maxpool5 = nn.MaxPool2d((2, 2))

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.maxpool1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.maxpool2(x)
        x = self.conv3(x)
        x = self.relu3(x)
        x = self.maxpool3(x)
        x = self.conv4(x)
        x = self.relu4(x)
        x = self.maxpool4(x)
        x = self.conv5(x)
        x = self.relu5(x)
        x = self.maxpool5(x)
        # print(x.shape)
        return x


class ConvDecoder(nn.Module):
    """
    A simple Convolutional Decoder Model
    """

    def __init__(self):
        super().__init__()
        self.deconv1 = nn.ConvTranspose2d(256, 128, (2, 2), stride=(2, 2))
        self.relu1 = nn.ReLU(inplace=True)
        self.deconv2 = nn.ConvTranspose2d(128, 64, (2, 2), stride=(2, 2))
        self.relu2 = nn.ReLU(inplace=True)
        self.deconv3 = nn.ConvTranspose2d(64, 32, (2, 2), stride=(2, 2))
        self.relu3 = nn.ReLU(inplace=True)
        self.deconv4 = nn.ConvTranspose2d(32, 16, (2, 2), stride=(2, 2))
        self.relu4 = nn.ReLU(inplace=True)
        self.deconv5 = nn.ConvTranspose2d(16, 3, (2, 2), stride=(2, 2))
        self.relu5 = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.deconv1(x)
        x = self.relu1(x)
        x = self.deconv2(x)
        x = self.relu2(x)
        x = self.deconv3(x)
        x = self.relu3(x)
        x = self.deconv4(x)
        x = self.relu4(x)
        x = self.deconv5(x)
        x = self.relu5(x)
        return x

In [54]:
import os
import shutil

def rename_and_copy_images(source_dir, dest_dir):
    if not os.path.exists(dest_dir):
        os.makedirs(dest_dir)
    for label in os.listdir(source_dir):
        label_dir = os.path.join(source_dir, label)
        if os.path.isdir(label_dir):
            for file in os.listdir(label_dir):
                if any(file.lower().endswith(ext) for ext in ['.jpg', '.jpeg', '.png']):
                    new_name = f"{label}_{file}"
                    source_file = os.path.join(label_dir, file)
                    dest_file = os.path.join(dest_dir, new_name)
                    shutil.copy(source_file, dest_file)

if __name__ == "__main__":
    source_directory = "ImageData/MonumentsData/train"
    destination_directory = "ImageData/CleanMonumentsData/train"
    if not os.path.exists(destination_directory):
        os.makedirs(destination_directory)
    rename_and_copy_images(source_directory, destination_directory)


In [75]:
IMG_PATH = "ImageData/CleanMonumentsData/train/"
IMG_HEIGHT = 512  
IMG_WIDTH = 512  

SEED = 42
TRAIN_RATIO = 0.75
VAL_RATIO = 1 - TRAIN_RATIO
SHUFFLE_BUFFER_SIZE = 100

LEARNING_RATE = 1e-3
EPOCHS = 3
TRAIN_BATCH_SIZE = 32  
TEST_BATCH_SIZE = 32  
FULL_BATCH_SIZE = 32

ENCODER_MODEL_PATH = "baseline_encoder.pt"
DECODER_MODEL_PATH = "baseline_decoder.pt"
EMBEDDING_SHAPE = (1, 256, 16, 16)
# TEST_RATIO = 0.2

In [68]:
import torch
import torch.nn as nn

def train_step(encoder, decoder, train_loader, loss_fn, optimizer, device):
    encoder.train()
    decoder.train()
    for batch_idx, (train_img, target_img) in enumerate(train_loader):
        train_img = train_img.to(device)
        target_img = target_img.to(device)
        optimizer.zero_grad()
        enc_output = encoder(train_img)
        dec_output = decoder(enc_output)
        loss = loss_fn(dec_output, target_img)
        loss.backward()
        optimizer.step()

    return loss.item()


def val_step(encoder, decoder, val_loader, loss_fn, device):
    encoder.eval()
    decoder.eval()

    with torch.no_grad():
        for batch_idx, (train_img, target_img) in enumerate(val_loader):
            train_img = train_img.to(device)
            target_img = target_img.to(device)
            enc_output = encoder(train_img)
            dec_output = decoder(enc_output)
            loss = loss_fn(dec_output, target_img)

    return loss.item()

In [69]:
if torch.cuda.is_available():
    device = "cuda"
else:
    device = "cpu"

transforms = T.Compose([T.Resize((IMG_HEIGHT, IMG_WIDTH)), T.ToTensor()])

print("------------ Creating Dataset ------------")
full_dataset = FolderDataset(IMG_PATH, transforms)

train_size = int(TRAIN_RATIO * len(full_dataset))
val_size = len(full_dataset) - train_size

train_dataset, val_dataset = torch.utils.data.random_split(
    full_dataset, [train_size, val_size]
)

print("------------ Dataset Created ------------")
print("------------ Creating DataLoader ------------")
train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=TRAIN_BATCH_SIZE, shuffle=True, drop_last=True
)
val_loader = torch.utils.data.DataLoader(
    val_dataset, batch_size=TEST_BATCH_SIZE
)

full_loader = torch.utils.data.DataLoader(
    full_dataset, batch_size=FULL_BATCH_SIZE
)

------------ Creating Dataset ------------
------------ Dataset Created ------------
------------ Creating DataLoader ------------


In [70]:
print("------------ Dataloader Created ------------")

loss_fn = nn.MSELoss()
encoder = ConvEncoder()
decoder = ConvDecoder()

if torch.cuda.is_available():
    print("GPU Availaible moving models to GPU")
else:
    print("Moving models to CPU")

encoder.to(device)
decoder.to(device)

autoencoder_params = list(encoder.parameters()) + list(decoder.parameters())
optimizer = optim.AdamW(autoencoder_params, lr=LEARNING_RATE)
max_loss = 9999

print("------------ Training started ------------")

for epoch in tqdm(range(EPOCHS)):
    train_loss = train_step(
        encoder, decoder, train_loader, loss_fn, optimizer, device=device
    )
    print(f"Epochs = {epoch}, Training Loss : {train_loss}")
    val_loss = val_step(
        encoder, decoder, val_loader, loss_fn, device=device
    )

    if val_loss < max_loss:
        print("Validation Loss decreased, saving new best model")
        torch.save(encoder.state_dict(), ENCODER_MODEL_PATH)
        torch.save(decoder.state_dict(), DECODER_MODEL_PATH)

    print(f"Epochs = {epoch}, Validation Loss : {val_loss}")

print("Training Done")

------------ Dataloader Created ------------
Moving models to CPU
------------ Training started ------------


  0%|          | 0/3 [00:00<?, ?it/s]

Epochs = 0, Training Loss : 0.031207827851176262


 33%|███▎      | 1/3 [06:25<12:51, 385.91s/it]

Validation Loss decreased, saving new best model
Epochs = 0, Validation Loss : 0.03336744010448456
Epochs = 1, Training Loss : 0.0245977733284235


 67%|██████▋   | 2/3 [12:40<06:19, 379.51s/it]

Validation Loss decreased, saving new best model
Epochs = 1, Validation Loss : 0.02556123584508896
Epochs = 2, Training Loss : 0.022636428475379944


100%|██████████| 3/3 [18:43<00:00, 374.55s/it]

Validation Loss decreased, saving new best model
Epochs = 2, Validation Loss : 0.019655724987387657
Training Done





In [71]:
def create_embedding(encoder, full_loader, embedding_dim, device):
    encoder.eval()
    embedding = torch.randn(embedding_dim)
    with torch.no_grad():
        for batch_idx, (train_img, target_img) in enumerate(full_loader):
            train_img = train_img.to(device)
            enc_output = encoder(train_img).cpu()
            embedding = torch.cat((embedding, enc_output), 0)

    return embedding

In [76]:
embedding = create_embedding(encoder, full_loader, EMBEDDING_SHAPE, device)

In [77]:
numpy_embedding = embedding.cpu().detach().numpy()
print(numpy_embedding.shape)
num_images = numpy_embedding.shape[0]
flattened_embedding = numpy_embedding.reshape((num_images, -1))
print(flattened_embedding.shape)
import numpy as np
np.save("data_embedding_f.npy", flattened_embedding)

(3667, 256, 16, 16)
(3667, 65536)
