In [1]:
import numpy as np
import matplotlib.pyplot as plt
import cv2 as cv
from patchify import patchify
import PIL
from PIL import Image
PIL.Image.MAX_IMAGE_PIXELS = 933120000
import os
import shutil
import random
%matplotlib inline
import torchvision
import torch
from torch.utils.data import Dataset,DataLoader
from torchvision import datasets, transforms
from torch import nn
from torch import optim
import torch.nn.functional as F
from torch.autograd import Variable
import torch.onnx

In [2]:
class PatchDataset(Dataset):
    def __init__(self, root, train=True, transforms=None):
        super(PatchDataset, self).__init__()
        self.root = root
        self.image_path = [os.path.join(root, x) for x in os.listdir(root)]
        random.shuffle(self.image_path)        

        if transform is not None:
            self.transform = transform

        if train:
            self.images = self.image_path[: int(.8 * len(self.image_path))]
        else:
            self.images = self.image_path[int(.8 * len(self.image_path)):]

    def __len__(self):
        return len(self.images)

    def __getitem__(self, item):
        return self.transform(self.images[item])  

In [3]:
transform = transforms.Compose([
    lambda x: Image.open(x).convert('RGB'),
    transforms.ToTensor(),
])

In [4]:
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        
        self.encoder_ = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(.2),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0),

            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(.2),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0),

            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(.2),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0),
        )

    def forward(self, x):
        return self.encoder_(x)

In [5]:
class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        
        self.decoder_ = nn.Sequential(
            nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2, padding=0),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(.2),

            nn.ConvTranspose2d(64, 32, kernel_size=2, stride=2, padding=0),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(.2),

            nn.ConvTranspose2d(32, 3, kernel_size=2, stride=2, padding=0),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.decoder_(x)

In [6]:
class CAE(nn.Module):
    def __init__(self):
        super(CAE, self).__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [7]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CAE().to(device)
# Set loss function and optimizer
criterion = nn.MSELoss().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)


train_dataset = PatchDataset("Batho_90721368(64)",train=True, transforms=transform)
test_dataset = PatchDataset("Batho_90721368(64)",train=False, transforms=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# print(device)
print(model.eval())

CAE(
  (encoder): Encoder(
    (encoder_): Sequential(
      (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
      (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (4): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (5): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (6): ReLU()
      (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (8): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (9): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (10): ReLU()
      (11): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    )
  )
  (decoder): Decoder(
    (decoder_): Sequential(
      (0): ConvTranspose2d(128, 64, kernel_size=(2, 2), stride=(2,

In [8]:
unloader = transforms.ToPILImage()
def tensor_to_PIL(tensor):
    image = tensor.cpu().clone()
    image = image.squeeze(0)
    image = unloader(image)
    return image

In [10]:
train_losses = []
test_losses = []
# Training loop
num_epochs = 30
for epoch in range(num_epochs):
    for data in train_loader:
        img = data
        img = img.to(device)
        optimizer.zero_grad()

        # Forward pass
        outputs = model(img)

        # Compute loss
        loss = criterion(outputs, img).to(device)

        # Backward pass
        loss.backward()
        optimizer.step()
    
    with torch.no_grad():
        for data in test_loader:
            img = data
            img = img.to(device)
            outputs = model(img)
            loss_test = criterion(outputs, img).to(device)
            
            
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {loss.item():.4f}')
    print(f'Epoch [{epoch+1}/{num_epochs}], Test Loss: {loss_test.item():.4f}')
    train_losses.append(loss.item())
    test_losses.append(loss_test.item())
    # Save the trained model
    path = 'CAE_Model(64)/epoch_{i}.pth'.format(i=epoch+1)
    torch.save(model.state_dict(), path)
print("Training completed.")


Epoch [1/30], Train Loss: 0.0043
Epoch [1/30], Test Loss: 0.0023
Epoch [2/30], Train Loss: 0.0033
Epoch [2/30], Test Loss: 0.0017
Epoch [3/30], Train Loss: 0.0028
Epoch [3/30], Test Loss: 0.0014
Epoch [4/30], Train Loss: 0.0026
Epoch [4/30], Test Loss: 0.0012
Epoch [5/30], Train Loss: 0.0024
Epoch [5/30], Test Loss: 0.0011
Epoch [6/30], Train Loss: 0.0023
Epoch [6/30], Test Loss: 0.0010
Epoch [7/30], Train Loss: 0.0022
Epoch [7/30], Test Loss: 0.0010
Epoch [8/30], Train Loss: 0.0022
Epoch [8/30], Test Loss: 0.0009
Epoch [9/30], Train Loss: 0.0021
Epoch [9/30], Test Loss: 0.0009
Epoch [10/30], Train Loss: 0.0020
Epoch [10/30], Test Loss: 0.0008
Epoch [11/30], Train Loss: 0.0020
Epoch [11/30], Test Loss: 0.0008
Epoch [12/30], Train Loss: 0.0019
Epoch [12/30], Test Loss: 0.0008
Epoch [13/30], Train Loss: 0.0019
Epoch [13/30], Test Loss: 0.0008
Epoch [14/30], Train Loss: 0.0019
Epoch [14/30], Test Loss: 0.0007
Epoch [15/30], Train Loss: 0.0018
Epoch [15/30], Test Loss: 0.0007
Epoch [16/30]

In [9]:
state = torch.load("E:\PG_HP_FINAL\CAE Batho Map Reconstruction\CAE_Model(64)\epoch_30.pth")
model = CAE()
model.load_state_dict(state)
model.eval()

CAE(
  (encoder): Encoder(
    (encoder_): Sequential(
      (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
      (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (4): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (5): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (6): ReLU()
      (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (8): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (9): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (10): ReLU()
      (11): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    )
  )
  (decoder): Decoder(
    (decoder_): Sequential(
      (0): ConvTranspose2d(128, 64, kernel_size=(2, 2), stride=(2,

In [10]:
with torch.no_grad():
    for idx, x in enumerate(train_loader):
        encon = model.encoder(x)
        recon = model.decoder(encon)
        
        for index,xxx in enumerate(recon):
            original_x = x[index]
            reconstr_x = xxx
            image1 = tensor_to_PIL(original_x)
            image2 = tensor_to_PIL(reconstr_x)

            path1 =  'img(ori)_64/patch_{num}_original(train).png'.format(num=idx*256+index)
            path2 =  'img(recon)_64/patch_{num}_reconstruct(train).png'.format(num=idx*256+index)
            image1.save(path1)
            image2.save(path2)  
        break