In [97]:
import numpy as np
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.metrics import jaccard_score
from PIL import Image, ImageDraw, ImageFont
import os
import cv2
from collections import Counter
import matplotlib.pyplot as plt
import torch
from torchsummary import summary
import copy

In [85]:
# reading the dataset
dir_names = ['./dataset/A/', './dataset/B/', './dataset/label/']

A_images = []
B_images = []
true_delta = []

# reading A images
for fileName in os.listdir(dir_names[0]):
    # read the image, convert it into grey scale and then store it into a numpy array
    image =  np.array(Image.open(os.path.join(dir_names[0], fileName)))    
    # add the image of the list
    A_images.append(image)

A_images = np.array(A_images)

# reading B images
for fileName in os.listdir(dir_names[1]):
    # read the image, convert it into grey scale and then store it into a numpy array
    image =  np.array(Image.open(os.path.join(dir_names[1], fileName)))
    # add the image of the list
    B_images.append(image)

B_images = np.array(B_images)

# reading truth images
for fileName in os.listdir(dir_names[2]):
    # read the image, convert it into grey scale and then store it into a numpy array
    image =  np.array(Image.open(os.path.join(dir_names[2], fileName)))
    # add the image of the list
    true_delta.append(image)

true_delta = np.array(true_delta)

In [86]:
# this function calculates jaccard score using every predicted image and its corresponding ground truth then average all the scores
def score(truth_ground, predicted, start_idx, end_idx):

    # convert images into binary
    truth_ground[truth_ground != 0] = 1
    predicted[predicted != 0] = 1
    scores = 0
    
    for i in range(start_idx, end_idx):
        scores += jaccard_score(truth_ground[i], predicted[i-start_idx], average='micro', zero_division=1.0)

    return scores / len(predicted)

In [89]:
# a class that will always predict black images to show how bad is the dataset
class ZeroPrediction:

    def __init__(self):
        pass

    def predict(self, A_test, B_test, start_idx, end_idx):

        # the array in which we will store the result difference images
        self.result_images = []

        # loop over all the images
        for i in range(start_idx, end_idx):
            self.result_images.append(np.zeros((A_test[i].shape[0], A_test[i].shape[1])))


        self.result_images = np.array(self.result_images)
        return self.result_images

zero_predictions = ZeroPrediction()
zero_predictions.predict(A_images, B_images, 0, len(A_images))
print(f"Jaccard index on all the dataset = {score(true_delta, zero_predictions.result_images, 0, len(A_images))}")

Jaccard index on all the dataset = 0.6639276910435498


In [88]:
# classical method: PCA with K-means (unsupervised method that has no training)
class PCA_Kmean:

    # initialization function, h is a parameter that will determine the size of blocks of image
    def __init__(self, h=7):
        self.h = h


    # the training function has no meaning as this method is unsupervised method so there is no point of having true labels and train the model using these labels

    # this function takes both A and B images and apply the algorithm on both of them and output the predicted ground truth
    def predict(self, A_test, B_test, start_idx, end_idx):

        # the array in which we will store the result difference images
        self.result_images = []

        # loop over all the images
        for i in range(start_idx, end_idx):

            print(f"processing image number {i}", end="\r")

            # apply gaussian blur to the image
            blk = int(self.h // 2 + 1)
            a_test_blur = cv2.GaussianBlur(A_test[i], (blk, blk), 0)
            b_test_blur = cv2.GaussianBlur(B_test[i], (blk, blk), 0)

            # first calculate the absolute difference between A and B images
            difference_image = np.abs(a_test_blur - b_test_blur)

            # resize the image so that it can be divided into integer number of 'self.h' non-overlapping blocks
            resized_difference = cv2.resize(difference_image, dsize=(
                np.array((A_test[i].shape[0], A_test[i].shape[1])) // self.h) * self.h)

            # loop over the channels of the image
            images = []
            for channel in range(0, 3):

                # find the vector set, we divide the difference image into non-overlapping blocks and then each block is flattened to represent a vector then all of the vectors are subtracted from their mean vector
                vectors = []
                for j in range(0, resized_difference.shape[0] // self.h):
                    for k in range(0, resized_difference.shape[1] // self.h):
                        vectors.append(
                            resized_difference[j:j+self.h, k:k+self.h, channel].ravel())
                vectors = np.array(vectors)
                mean_vector = np.mean(vectors, axis=0)
                delta_vectors = vectors - mean_vector

                # apply PCA on the delta vectors to get the PCA components
                pca = PCA(random_state=42)
                pca.fit(delta_vectors)

                # project each point into the new vector space where each point will be represented by using an overlapping blocks around each point then flatten them
                point_vector = []
                pad = self.h // 2
                for j in range(pad, resized_difference.shape[0] - pad):
                    for k in range(pad, resized_difference.shape[1] - pad):
                        point_vector.append(
                            resized_difference[j-pad:j+pad+1, k-pad:k+pad+1, channel].ravel())
                point_vector = np.array(point_vector) - mean_vector
                projected_points = np.dot(point_vector, pca.components_)

                # apply Kmeans clustering on the projected points where the cluster that has low number of points and average high means is considered the cluster that changed values
                kmeans = KMeans(n_clusters=2, random_state=42)
                kmeans.fit(projected_points)
                predicted_labels = kmeans.predict(
                    projected_points).astype(np.uint8)
                least_common_idx = Counter(
                    predicted_labels).most_common()[-1][0]
                predicted_labels_map = np.reshape(predicted_labels, newshape=(
                    np.array(A_test[i].shape) // self.h) * self.h - 2 * pad)
                predicted_labels_map = cv2.resize(predicted_labels_map, dsize=(
                    np.array((A_test[i].shape[0], A_test[i].shape[1]))))

                predicted_labels_map[predicted_labels_map ==
                                     least_common_idx] = 255
                predicted_labels_map[predicted_labels_map != 255] = 0

                # perform erosion and  to clean the image
                kernel = np.ones((self.h, self.h), np.uint8)
                opened_image = cv2.morphologyEx(
                    predicted_labels_map, cv2.MORPH_OPEN, kernel, iterations=1)
                closed_image = cv2.morphologyEx(
                    opened_image, cv2.MORPH_CLOSE, kernel, iterations=1)

                images.append(closed_image)

            # break

            # apply intersection between the 3 bands to get the pixels that has changed all of its R, G, B values
            intersection = cv2.bitwise_and(
                cv2.bitwise_and(images[0], images[1]), images[2])

            # plt.imshow(intersection, cmap='grey', vmin=0, vmax=255)
            # plt.show()
            # plt.imshow(images[0], cmap='grey', vmin=0, vmax=255)
            # plt.show()
            # plt.imshow(images[1], cmap='grey', vmin=0, vmax=255)
            # plt.show()
            # plt.imshow(images[2], cmap='grey', vmin=0, vmax=255)
            # plt.show()

            # find the contours in the intersection image
            contours, hierarchy = cv2.findContours(
                intersection, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)

            # loop over all the contours
            rectangles = []
            for cnt in contours:

                # approximate every contour to be a polygon
                poly = cv2.approxPolyDP(
                    cnt, 0.1*cv2.arcLength(cnt, True), True)
            
                # find the minimum area of bounding rotated rectangle
                rotated_rect = cv2.minAreaRect(poly)
                area = rotated_rect[1][0] * rotated_rect[1][1]

                # check if the minimum area of rotated rectangle is nearly equal to the area of that polygon
                if (area != 0) and abs((cv2.contourArea(poly) / area) - 1) < 1:
                    rectangles.append(poly)

            # draw the contours that looks like a rectangle
            contour_img = np.zeros(intersection.shape)
            for rect in rectangles:
                cv2.drawContours(
                    contour_img, [rect], -1, 255, thickness=cv2.FILLED)

            # plt.imshow(contour_img, cmap='gray', vmin=0, vmax=255)
            # plt.show()

            # print(np.where(contour_img == 255))
            self.result_images.append(contour_img)
            # return np.array(self.result_images)

        return np.array(self.result_images)


pca_kmeans = PCA_Kmean(h=5)
predicted_difference = pca_kmeans.predict(A_images, B_images, 0, len(A_images))

print()
print(score(true_delta, predicted_difference, 0, len(A_images)))


processing image number 4867
0.40968759404252886


In [57]:
# # Set random seed for reproducibility
# np.random.seed(27)  

# # select test indices where the test data size will be 20% of the whole data
# test_indices = np.random.choice(np.arange(len(true_delta)), size=int(0.2*(len(true_delta))), replace=False)

# # split data into training and testing data
# true_delta_test = true_delta[test_indices]
# A_images_test = A_images[test_indices]
# B_images_test = B_images[test_indices]

# true_delta_train = np.delete(true_delta, test_indices, 0)
# A_images_train = np.delete(A_images, test_indices, 0)
# B_images_train = np.delete(B_images, test_indices, 0)


In [None]:
# some comments:

# torch.nn.sequential() -> https://pytorch.org/docs/stable/generated/torch.nn.Sequential.html 
# -------------------
# A sequential container.
# Modules will be added to it in the order they are passed in the constructor. Alternatively, an OrderedDict of modules can be passed in. The forward() method of Sequential accepts any input and forwards it to the first module it contains. It then “chains” outputs to inputs sequentially for each subsequent module, finally returning the output of the last module.


# torch.nn.DistributedDataParallel() -> https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html#torch.nn.parallel.DistributedDataParallel 
# ----------------------------------
# Implement distributed data parallelism based on torch.distributed at module level.
# This container provides data parallelism by synchronizing gradients across each model replica. The devices to synchronize across are specified by the input process_group, which is the entire world by default. Note that DistributedDataParallel does not chunk or otherwise shard the input across participating GPUs; the user is responsible for defining how to do so, for example through the use of a DistributedSampler.

# torch.nn.MaxPool2d() -> https://pytorch.org/docs/stable/generated/torch.nn.MaxPool2d.html 
# --------------------
# Applies a 2D max pooling over an input signal composed of several input planes.





In [21]:
# we made the class inherit from 'nn.Module' so that we can wrap it up to be trained on Parallel Processes on GPU
class UNET(torch.nn.Module):

    def convolve(self, in_dimension, out_dimension):
        return torch.nn.Sequential(
            torch.nn.Conv2d(in_channels=in_dimension, out_channels=out_dimension, kernel_size=3, stride=1, padding=1),
            torch.nn.BatchNorm2d(num_features=out_dimension),
            torch.nn.LeakyReLU(negative_slope=0.2, inplace=True),
            torch.nn.Conv2d(in_channels=out_dimension, out_channels=out_dimension, kernel_size=3, stride=1, padding=1),
            torch.nn.BatchNorm2d(num_features=out_dimension),
            torch.nn.LeakyReLU(negative_slope=0.2, inplace=True),
        )
    
    def difference(self, image_old_feature, image_new_feature, threshold):

        # compute difference
        difference_feature = torch.abs(torch.sub(image_new_feature, image_old_feature))

        # assign the pixels that are lower then or equal to threshold to be zero while others to be be features in the new image
        DI = torch.where(difference_feature <= threshold, 0.0, image_new_feature)

        return DI

    def __init__(self) -> None:

        # call the constructor of the parent Module
        super(UNET, self).__init__()

        # BUILD THE MODEL ARCHITECTURE 

        # resize image to 256 x 256
        # self.resize = transforms.Resize((256, 256)),


        # LEVEL 1 - encoder
        self.lvl1_encoder = self.convolve(3, 64)

        # LEVEL 1 TO LEVEL 2 - encoder max pooling
        self.lvl1_pool = torch.nn.MaxPool2d(kernel_size=2, stride=2, padding=0)

        # LEVEL 2 - encoder
        self.lvl2_encoder = self.convolve(64, 128)

        # LEVEL 2 TO LEVEL 3 - encoder max pooling
        self.lvl2_pool = torch.nn.MaxPool2d(kernel_size=2, stride=2, padding=0)    

        # LEVEL 3 - encoder
        self.lvl3_encoder = self.convolve(128, 256)

        # LEVEL 3 TO LEVEL 4 - encoder max pooling
        self.lvl3_pool = torch.nn.MaxPool2d(kernel_size=2, stride=2, padding=0)    

        # LEVEL 4 - encoder
        self.lvl4_encoder = self.convolve(256, 512)

        # LEVEL 4 TO LEVEL 5 - encoder max pooling
        self.lvl4_pool = torch.nn.MaxPool2d(kernel_size=2, stride=2, padding=0)    

        # LEVEL 5 - encoder
        self.lvl5_encoder = self.convolve(512, 1024)

        # LEVEL 5 TO LEVEL 4 - decoder up conv
        self.lvl5_upConv = torch.nn.ConvTranspose2d(1024, 512, kernel_size=3, stride=2, padding=1, output_padding=1)

        # apply batch normalization
        self.lvl5_upConvNorm = torch.nn.BatchNorm2d(num_features=512)

        # LEVEL 4 - decoder
        self.lvl4_decoder = self.convolve(1024, 512)

        # LEVEL 4 TO LEVEL 3 - decoder up conv
        self.lvl4_upConv = torch.nn.ConvTranspose2d(512, 256, kernel_size=3, stride=2, padding=1, output_padding=1)

        # apply batch normalization
        self.lvl4_upConvNorm = torch.nn.BatchNorm2d(num_features=256)

        # LEVEL 3 - decoder
        self.lvl3_decoder = self.convolve(512, 256)

        # LEVEL 3 TO LEVEL 2 - decoder up conv
        self.lvl3_upConv = torch.nn.ConvTranspose2d(256, 128, kernel_size=3, stride=2, padding=1, output_padding=1)

        # apply batch normalization
        self.lvl3_upConvNorm = torch.nn.BatchNorm2d(num_features=128)

        # LEVEL 3 - decoder
        self.lvl2_decoder = self.convolve(256, 128)

        # LEVEL 2 TO LEVEL 1 - decoder up conv
        self.lvl2_upConv = torch.nn.ConvTranspose2d(128, 64, kernel_size=3, stride=2, padding=1, output_padding=1)

        # apply batch normalization
        self.lvl2_upConvNorm = torch.nn.BatchNorm2d(num_features=64)

        # LEVEL 1 - decoder
        self.lvl1_decoder = self.convolve(128, 64)
        
        # output layer (we have only 1 output mask)
        self.output_conv = torch.nn.Conv2d(64, 1, kernel_size=3, stride=1, padding=1)

        # apply softmax
        # self.output = torch.nn.Softmax()
   
    def forward(self, img_old, img_new):

        # contracting path (old image)
        imgOld_lvl1_encoder = self.lvl1_encoder(img_old)
        imgOld_lvl1_pool = self.lvl1_pool(imgOld_lvl1_encoder)
        imgOld_lvl2_encoder = self.lvl2_encoder(imgOld_lvl1_pool)
        imgOld_lvl2_pool = self.lvl2_pool(imgOld_lvl2_encoder)    
        imgOld_lvl3_encoder = self.lvl3_encoder(imgOld_lvl2_pool)
        imgOld_lvl3_pool = self.lvl3_pool(imgOld_lvl3_encoder)    
        imgOld_lvl4_encoder = self.lvl4_encoder(imgOld_lvl3_pool)
        imgOld_lvl4_pool = self.lvl4_pool(imgOld_lvl4_encoder)    
        imgOld_lvl5_encoder = self.lvl5_encoder(imgOld_lvl4_pool)

        # contracting path (new image)
        imgNew_lvl1_encoder = self.lvl1_encoder(img_new)
        imgNew_lvl1_pool = self.lvl1_pool(imgNew_lvl1_encoder)
        imgNew_lvl2_encoder = self.lvl2_encoder(imgNew_lvl1_pool)
        imgNew_lvl2_pool = self.lvl2_pool(imgNew_lvl2_encoder)    
        imgNew_lvl3_encoder = self.lvl3_encoder(imgNew_lvl2_pool)
        imgNew_lvl3_pool = self.lvl3_pool(imgNew_lvl3_encoder)    
        imgNew_lvl4_encoder = self.lvl4_encoder(imgNew_lvl3_pool)
        imgNew_lvl4_pool = self.lvl4_pool(imgNew_lvl4_encoder)    
        imgNew_lvl5_encoder = self.lvl5_encoder(imgNew_lvl4_pool)       
        

        # expanding path
        lvl5_upConv = self.lvl5_upConv(self.difference(imgOld_lvl5_encoder, imgNew_lvl5_encoder, 1.2))
        lvl5_upConvNorm = self.lvl5_upConvNorm(lvl5_upConv)
        lvl4_cat = torch.cat((self.difference(imgOld_lvl4_encoder, imgNew_lvl4_encoder, 1.0), lvl5_upConvNorm), dim=1)
        lvl4_decoder = self.lvl4_decoder(lvl4_cat)
        lvl4_upConv = self.lvl4_upConv(lvl4_decoder)
        lvl4_upConvNorm = self.lvl4_upConvNorm(lvl4_upConv)
        lvl3_cat = torch.cat((self.difference(imgOld_lvl3_encoder, imgNew_lvl3_encoder, 0.8), lvl4_upConvNorm), dim=1)
        lvl3_decoder = self.lvl3_decoder(lvl3_cat)
        lvl3_upConv = self.lvl3_upConv(lvl3_decoder)
        lvl3_upConvNorm = self.lvl3_upConvNorm(lvl3_upConv)
        lvl2_cat = torch.cat((self.difference(imgOld_lvl2_encoder, imgNew_lvl2_encoder, 0.6), lvl3_upConvNorm), dim=1)
        lvl2_decoder = self.lvl2_decoder(lvl2_cat)
        lvl2_upConv = self.lvl2_upConv(lvl2_decoder)
        lvl2_upConvNorm = self.lvl2_upConvNorm(lvl2_upConv)
        lvl1_cat = torch.cat((self.difference(imgOld_lvl1_encoder, imgNew_lvl1_encoder, 0.4), lvl2_upConvNorm), dim=1)
        lvl1_decoder = self.lvl1_decoder(lvl1_cat)
        output_conv = self.output_conv(lvl1_decoder)

        # return self.output(output_conv)
        return output_conv
    


In [22]:
# print pipeline of the model
unet = UNET().cuda() 
# unet = UNET().cuda() 
summary(unet, [(3, 256, 256), (3, 256, 256)])

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 256, 256]           1,792
       BatchNorm2d-2         [-1, 64, 256, 256]             128
         LeakyReLU-3         [-1, 64, 256, 256]               0
            Conv2d-4         [-1, 64, 256, 256]          36,928
       BatchNorm2d-5         [-1, 64, 256, 256]             128
         LeakyReLU-6         [-1, 64, 256, 256]               0
         MaxPool2d-7         [-1, 64, 128, 128]               0
            Conv2d-8        [-1, 128, 128, 128]          73,856
       BatchNorm2d-9        [-1, 128, 128, 128]             256
        LeakyReLU-10        [-1, 128, 128, 128]               0
           Conv2d-11        [-1, 128, 128, 128]         147,584
      BatchNorm2d-12        [-1, 128, 128, 128]             256
        LeakyReLU-13        [-1, 128, 128, 128]               0
        MaxPool2d-14          [-1, 128,

In [27]:
def get_data_loaders(batch_size):

    # append images A and B
    data = []
    for i in range(0, len(A_images)):
        data.append([np.moveaxis(A_images[i], -1, 0), np.moveaxis(B_images[i], -1, 0)])

    # normalize
    data = np.array(data) / 255.0

    # convert the data to tensor
    data_tensor = torch.Tensor(data)
    target_tensor = torch.Tensor(true_delta/255) #.type(torch.LongTensor)

    # create the dataset
    dataset = torch.utils.data.TensorDataset(data_tensor, target_tensor)

    # split the dataset into test and train (80% training and 20% test)
    train_data_size = (int) (0.8 * len(dataset))
    train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_data_size, len(dataset) - train_data_size])

    # create the dataloaders
    train_dataloaders = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0),
    test_dataloaders = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
    

    return train_dataloaders, test_dataloaders

train_dataloaders, test_dataloaders = get_data_loaders(4)

In [None]:
for images, target in test_dataloaders:
    print(images[:, 0].shape)
    print(images[:, 1].shape)
    print(images.dtype)
    print(target.shape)
    print(target.dtype)
    break

torch.Size([5, 3, 256, 256])
torch.Size([5, 3, 256, 256])
torch.float32
torch.Size([5, 256, 256])
torch.float32


In [19]:
torch.cuda.is_available()

True

In [34]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# unet = UNET()
unet = UNET().to(device)
binary_cross_entroy_loss = torch.nn.BCEWithLogitsLoss()
# adam_optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, unet.parameters()), lr=0.0002)
adam_optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, unet.parameters()), lr=0.0002)


In [76]:
def get_accuracy(unet):

    # loop over the test data
    unet.eval()

    jaccard_index = 0
    i = 0
    sigmoid = torch.nn.Sigmoid()
    for images, targets in test_dataloaders:
        
        i+=1

        old_image = images[:, 0].to(device)
        new_image = images[:, 1].to(device)

        # get the model output
        output = unet(old_image,new_image)
        output = torch.where(sigmoid(output) >= 0.5, 1., 0.)
        output_cpu = output.squeeze(1).data.cpu().numpy()
        targets_cpu = targets.data.cpu().numpy()

        # compute score
        jaccard_index += score(output_cpu, targets_cpu, 0, len(output))
        print(f"current Batch :{i} out of 244", end="\r")

    average_score = jaccard_index / i
    return average_score
    # print(f"Average Jaccard Score = {average_score}")



In [83]:
best_epoch = 109
best_accuracy = 0.7983807751551625
best_loss = 0.01446279214243567

for epochs in range(0, 130):
    i = 0
    average_loss = 0

    # begin training 
    unet.train()

    for images, targets in train_dataloaders[0]:
        i +=1 

        # move to the GPU
        old_image = images[:, 0].to(device)
        new_image = images[:, 1].to(device)
        targets = targets.unsqueeze(1).to(device)        
        
        # zero the parameter gradients
        adam_optimizer.zero_grad()

        # get the model output
        output = unet(old_image,new_image)

        # compute loss
        loss = binary_cross_entroy_loss(output, targets)
        
        # print loss
        average_loss += loss.data.cpu().numpy()
        print(f"Epoch Number: {epochs}, Batch Number: {i} out of 974")

        # back propagate
        loss.backward()
        adam_optimizer.step()

    # get the average accuracy
    accuracy = get_accuracy(unet)

    average_loss = average_loss / (i+1)
    print(f"Epoch Number: {epochs}, loss: {average_loss}, accuracy: {accuracy}")

    # saving best model
    if accuracy > best_accuracy:
        print("saving better Model")
        best_accuracy = accuracy
        best_epoch = epochs
        best_loss = average_loss
        best_model_wts = copy.deepcopy(unet.state_dict())
    


Epoch Number: 110, Batch Number: 1 out of 974
Epoch Number: 110, Batch Number: 2 out of 974
Epoch Number: 110, Batch Number: 3 out of 974
Epoch Number: 110, Batch Number: 4 out of 974
Epoch Number: 110, Batch Number: 5 out of 974
Epoch Number: 110, Batch Number: 6 out of 974
Epoch Number: 110, Batch Number: 7 out of 974
Epoch Number: 110, Batch Number: 8 out of 974
Epoch Number: 110, Batch Number: 9 out of 974
Epoch Number: 110, Batch Number: 10 out of 974
Epoch Number: 110, Batch Number: 11 out of 974
Epoch Number: 110, Batch Number: 12 out of 974
Epoch Number: 110, Batch Number: 13 out of 974
Epoch Number: 110, Batch Number: 14 out of 974
Epoch Number: 110, Batch Number: 15 out of 974
Epoch Number: 110, Batch Number: 16 out of 974
Epoch Number: 110, Batch Number: 17 out of 974
Epoch Number: 110, Batch Number: 18 out of 974
Epoch Number: 110, Batch Number: 19 out of 974
Epoch Number: 110, Batch Number: 20 out of 974
Epoch Number: 110, Batch Number: 21 out of 974
Epoch Number: 110, Bat

In [80]:
best_model = UNET().to(device)
best_model.load_state_dict(best_model_wts)
acc = get_accuracy(best_model)
print()
print(acc)
# save the model
torch.save(best_model, f'./UNET-{acc}.dlpk')

current Batch :244 out of 244
0.7983807011401249


In [105]:
acc = get_accuracy(unet)
print()
print(acc)

current Batch :244 out of 244
0.7983807011401249


In [84]:
# save the model
torch.save(unet, './UNET-global_minimum.dlpk')

In [91]:
# load the model
unet = torch.load('./UNET-0.7983807011401249.dlpk')

In [92]:
# draw text onto image
def drawText(array, text, TOP_PAD):
    
    # add white padding to the top of the image to write text on
    bordered_image = cv2.copyMakeBorder(array, TOP_PAD, 0, 0, 0, cv2.BORDER_CONSTANT, value=[255,255,255])

    # convert array to RGB image
    RGB_image = Image.fromarray(np.uint8(bordered_image)).convert('RGB')
    draw = ImageDraw.Draw(RGB_image)

    # write the text on the image
    font = ImageFont.truetype("arial.ttf", 28, encoding="unic")

    _, _, w, h = draw.textbbox((0, 0), text, font=font)
    draw.text(((256-w)/2, (40-h)/2), text, font=font, fill='black')
    draw.line([(0, 0), (0, TOP_PAD)], fill ="black", width = 5)
    draw.line([(256, 0), (256, TOP_PAD)], fill ="black", width = 5) 
    return np.array(RGB_image)

In [106]:
# loop over the Trainig data to calculate the training accuracy
unet.eval()

jaccard_index = 0
i = 0
sigmoid = torch.nn.Sigmoid()
for images, targets in train_dataloaders[0]:
    
    i+=1

    old_image = images[:, 0].to(device)
    new_image = images[:, 1].to(device)

    # get the model output
    output = unet(old_image,new_image)
    output = torch.where(sigmoid(output) >= 0.5, 1., 0.)
    output_cpu = output.squeeze(1).data.cpu().numpy()
    targets_cpu = targets.data.cpu().numpy()

    # compute score
    jaccard_index += score(output_cpu, targets_cpu, 0, len(output))
    print(f"current Batch :{i} out of ", end="\r")

average_score = jaccard_index / i
print(f"Average Jaccard Score = {average_score}")



Average Jaccard Score = 0.9290661427421627


In [116]:
# make shape of the images to be (3, 256, 256) instead of (256, 256, 3)
A_images_ch = []
B_images_ch = []
for i in range(len(A_images)):
    A_images_ch.append(np.moveaxis(A_images[i], -1, 0))
    B_images_ch.append(np.moveaxis(B_images[i], -1, 0))
A_images_ch = np.array(A_images_ch) / 255.0
B_images_ch = np.array(B_images_ch) / 255.0

# convert dataset into tenor objects
A_images_tensor = torch.Tensor(A_images_ch)
B_images_tensor = torch.Tensor(B_images_ch)

unet_predictions = []

# predict all the images
unet.eval()

sigmoid = torch.nn.Sigmoid()
for i in range(len(A_images)):
    
    old_image = A_images_tensor[i].unsqueeze(0).to(device)
    new_image = B_images_tensor[i].unsqueeze(0).to(device)

    # get the model output
    output = unet(old_image,new_image)
    output = torch.where(sigmoid(output) >= 0.5, 1., 0.)
    output_cpu = output.squeeze(0).squeeze(0).data.cpu().numpy()
    unet_predictions.append(output_cpu)
    print(f"current Batch: {i} out of {len(A_images)}", end="\r")

unet_predictions = np.array(unet_predictions).astype(np.uint8)

current Batch: 4867 out of 4868

In [117]:
# compute average jaccard index on thw whole dataset for unet
print(f"Jaccard index on all the dataset = {score(true_delta, unet_predictions, 0, len(A_images))}")

Jaccard index on all the dataset = 0.902992918631896


In [118]:
# convert masks into RGB images
true_delta_rgb = []
unet_predictions_rgb = []
pca_kmeans_rgb = []
zero_predictions_rgb = []
for i in range(len(A_images)):
    true_delta_rgb.append(cv2.cvtColor(true_delta[i]*255,cv2.COLOR_GRAY2RGB))
    unet_predictions_rgb.append(cv2.cvtColor(unet_predictions[i].astype(np.uint8)*255,cv2.COLOR_GRAY2RGB))
    pca_kmeans_rgb.append(cv2.cvtColor(pca_kmeans.result_images[i].astype(np.uint8),cv2.COLOR_GRAY2RGB))
    zero_predictions_rgb.append(cv2.cvtColor(zero_predictions.result_images[i].astype(np.uint8)*255,cv2.COLOR_GRAY2RGB))

true_delta_rgb = np.array(true_delta_rgb)
unet_predictions_rgb = np.array(unet_predictions_rgb)
pca_kmeans_rgb = np.array(pca_kmeans_rgb)

In [119]:
TOP_PAD = 40
for i in range(len(A_images)):
    
    # concatenate images and store them to the output
    output_image = np.zeros((A_images[i].shape[0] + TOP_PAD, 6*A_images[i].shape[1], A_images[i].shape[2]), dtype=np.uint8) 
    output_image[:, A_images[i].shape[0] * 0 : A_images[i].shape[0] * 1, :] = cv2.cvtColor(drawText(A_images[i], "A (before)", TOP_PAD), cv2.COLOR_BGR2RGB)
    output_image[:, A_images[i].shape[0] * 1 : A_images[i].shape[0] * 2, :] = cv2.cvtColor(drawText(B_images[i], "B (After)", TOP_PAD), cv2.COLOR_BGR2RGB)
    output_image[:, A_images[i].shape[0] * 2 : A_images[i].shape[0] * 3, :] = cv2.cvtColor(drawText(true_delta_rgb[i], "∆ (truth ground)", TOP_PAD), cv2.COLOR_BGR2RGB)
    output_image[:, A_images[i].shape[0] * 3 : A_images[i].shape[0] * 4, :] = cv2.cvtColor(drawText(unet_predictions_rgb[i], "unet", TOP_PAD), cv2.COLOR_BGR2RGB)
    output_image[:, A_images[i].shape[0] * 4 : A_images[i].shape[0] * 5, :] = cv2.cvtColor(drawText(pca_kmeans_rgb[i], "PCA_Kmeans", TOP_PAD), cv2.COLOR_BGR2RGB)
    output_image[:, A_images[i].shape[0] * 5 : A_images[i].shape[0] * 6, :] = cv2.cvtColor(drawText(zero_predictions_rgb[i], "Zero Prediction", TOP_PAD), cv2.COLOR_BGR2RGB)
    cv2.imwrite(f'./output/{i}.PNG', output_image)

In [120]:
# FOR TESTING data
# reading the test set
dir_names = ['./test/A/', './test/B/']

A_images_test = []
B_images_test = []

# reading A images
for fileName in os.listdir(dir_names[0]):
    # read the image, convert it into grey scale and then store it into a numpy array
    image =  np.array(Image.open(os.path.join(dir_names[0], fileName)))    
    # add the image of the list
    A_images_test.append(image)

A_images_test = np.array(A_images_test)

# reading B images
for fileName in os.listdir(dir_names[1]):
    # read the image, convert it into grey scale and then store it into a numpy array
    image =  np.array(Image.open(os.path.join(dir_names[1], fileName)))
    # add the image of the list
    B_images_test.append(image)

B_images_test = np.array(B_images_test)

In [134]:
def save_images(images, url): 
    for i in range(len(images)):
        image_name = '{0:04}.png'.format(i)
        cv2.imwrite(f'./test_output/{url}/{image_name}', images[i])

In [136]:
# predict images for Zero Predication model
zero_predictions_test =  ZeroPrediction()
zero_predictions_test.predict(A_images_test, B_images_test, 0, len(A_images_test))
save_images(zero_predictions_test.result_images.astype(np.uint8), 'zero_prediction')

In [139]:
# predict images for PCA_Kmeans
pca_kmeans_test = PCA_Kmean(h=5)
pca_kmeans_test.predict(A_images_test, B_images_test, 0, len(A_images_test))
save_images(np.array(pca_kmeans_test.result_images).astype(np.uint8), 'pca_kmeans')

processing image number 607

In [141]:
# predict images for Unet

# make shape of the images to be (3, 256, 256) instead of (256, 256, 3)
A_images_ch_test = []
B_images_ch_test = []
for i in range(len(A_images_test)):
    A_images_ch_test.append(np.moveaxis(A_images_test[i], -1, 0))
    B_images_ch_test.append(np.moveaxis(B_images_test[i], -1, 0))
A_images_ch_test = np.array(A_images_ch_test) / 255.0
B_images_ch_test = np.array(B_images_ch_test) / 255.0

# convert dataset into tenor objects
A_images_tensor_test = torch.Tensor(A_images_ch_test)
B_images_tensor_test = torch.Tensor(B_images_ch_test)

unet.eval()

unet_predictions_test = []

sigmoid = torch.nn.Sigmoid()
for i in range(len(A_images_test)):
    
    old_image = A_images_tensor_test[i].unsqueeze(0).to(device)
    new_image = B_images_tensor_test[i].unsqueeze(0).to(device)

    # get the model output
    output = unet(old_image,new_image)
    output = torch.where(sigmoid(output) >= 0.5, 1., 0.)
    output_cpu = output.squeeze(0).squeeze(0).data.cpu().numpy()
    unet_predictions_test.append(output_cpu)
    print(f"current Batch: {i} out of {len(A_images_test)}", end="\r")

unet_predictions_test = np.array(unet_predictions_test).astype(np.uint8) * 255
save_images(unet_predictions_test, 'unet')


current Batch: 607 out of 608