In [None]:
import torch
import glob
from PIL import Image
import torch.optim as optim
from torch.utils.data import DataLoader,  TensorDataset, Dataset
import torch.nn as nn

import numpy as np
import matplotlib.pyplot as plt
import math

import shutil
import time
from tqdm import tqdm

import imageio

!pip install git+https://github.com/qubvel/segmentation_models.pytorch
import segmentation_models_pytorch as smp

import pickle

import random

def set_random_seed(seed):
    torch.backends.cudnn.deterministic = True
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
set_random_seed(42)

In [None]:
from google.colab import drive
drive.mount('/gdrive')

Drive already mounted at /gdrive; to attempt to forcibly remount, call drive.mount("/gdrive", force_remount=True).


In [None]:
class MyDataset(Dataset):
    
    def __init__(self, data, targets, transform=None):
        self.data = data
        self.targets = targets
        self.data.sort()
        self.targets.sort()
        self.transform = transform
        
    def __len__(self):
        return len(self.targets)

    def __getitem__(self, idx):
        image = self.data[idx]
        label = self.targets[idx]
        if self.transform:
            image = self.transform(image)
        

        return image, label

In [None]:
with open('/gdrive/My Drive/Segmentation_project/test_datasets/test_images_B3', 'rb') as f:
    test_imag = pickle.load(f)
with open('/gdrive/My Drive/Segmentation_project/test_datasets/test_masks_B3', 'rb') as f:
    test_masks = pickle.load(f)

x_images = torch.from_numpy(test_imag/255)
print(x_images.shape)
x_images = torch.transpose(x_images, 1, 3)
x_images = torch.transpose(x_images, 2, 3)
print(x_images.shape)
y_labels = torch.from_numpy(test_masks/255)
print(y_labels.shape)
y_labels = y_labels[:,:,:,2].unsqueeze(1)
print(y_labels.shape)

len_dataset = len(y_labels) 
dataset = MyDataset(data=x_images.float(), targets=y_labels)
test_loader = DataLoader(dataset, batch_size = 34, shuffle = False)
del dataset


In [None]:
model = smp.Unet(encoder_name='efficientnet-b0', 
                 encoder_depth=5, 
                 encoder_weights='imagenet', 
                 decoder_use_batchnorm=True, 
                 decoder_channels=(256, 128, 64, 32, 16), 
                 decoder_attention_type=None, 
                 in_channels=3, 
                 classes=1, 
                 #activation='sigmoid', 
                 aux_params=None)

In [None]:
#model = smp.DeepLabV3Plus(encoder_name='efficientnet-b4', 
#                          encoder_depth=5,
#                          encoder_weights='imagenet',
#                          encoder_output_stride=16,
#                          decoder_channels=256,
#                          decoder_atrous_rates=(12, 24, 36),
#                          in_channels=3,
#                          classes=1,
#                          activation=None,
#                          upsampling=4,
#                          aux_params=None)

In [None]:
model.load_state_dict(torch.load('/gdrive/My Drive/TyurinaAV_Segmentation/Test/modelUNet_weights_efficientnet-b0_batch=26_100ep_NEW.pth', map_location=torch.device('cuda')))
device = torch.device("cuda")
model.to(device)
model.eval()
#device = torch.device("cpu")

images = []
labels_true = []
labels_predicted = []
with torch.no_grad():
    for imag, labels in test_loader:
        imag = imag.to(device)
        labels = labels.to(device)
        # print(images.shape)
        # print(labels.shape)
        outputs = model(imag)
        images.append(imag)
        labels_true.append(labels)
        labels_predicted.append(outputs)

In [None]:
# saving predicted mask
labels_predicted_full_mask = []
for i in range(len(labels_predicted)):
    for j in range(len(labels_predicted[0])):
        labels_predicted_full_mask.append((labels_predicted[i][j]>=0.5).detach().cpu().numpy().astype(int).astype(float))

labels_predicted_full_mask = np.array(labels_predicted_full_mask)
labels_predicted_full_mask = labels_predicted_full_mask*255
labels_predicted_full_mask = labels_predicted_full_mask.reshape(1020, 256, 256) # original shape
print(labels_predicted_full_mask.shape)

In [None]:
def reconstruct_from_patches_custom(img_arr, org_img_size, stride=None, size=None): 
    # for keras but changed

    # check parameters
    if type(org_img_size) is not tuple:
        raise ValueError("org_image_size must be a tuple")

    #if img_arr.ndim == 3:
        #img_arr = np.expand_dims(img_arr, axis=0)

    if size is None:
        size = img_arr.shape[1]

    if stride is None:
        stride = size

    #nm_layers = img_arr.shape[3]

    i_max = (org_img_size[0] // stride) + 1 - (size // stride)
    j_max = (org_img_size[1] // stride) + 1 - (size // stride)

    total_nm_images = img_arr.shape[0] // (i_max ** 2)
    nm_images = img_arr.shape[0]

    averaging_value = size // stride
    images_list = []
    kk = 0
    print(img_arr.shape)
    for img_count in range(total_nm_images):
        #print(img_count)
        img_bg = np.zeros(
            (org_img_size[0], org_img_size[1]), dtype=img_arr[0].dtype
        )

        for i in range(i_max):
            for j in range(j_max):
                img_bg[
                        i * stride : i * stride + size,
                        j * stride : j * stride + size,
                    ] = img_arr[kk, :, :]

                kk += 1

        images_list.append(img_bg)

    return np.stack(images_list)

In [None]:
y_labels_reconstructed = reconstruct_from_patches_custom(img_arr=labels_predicted_full_mask, org_img_size=(7758, 8764), stride = 256, size=256)
y_labels_reconstructed = np.array(y_labels_reconstructed)
print(y_labels_reconstructed.shape)

In [None]:
img = Image.fromarray(y_labels_reconstructed[0])
img = img.convert("L")
img.save('/gdrive/My Drive/TyurinaAV_segmentation/Test/test_mask_predicted_UNet_efficientnet-b0_batch=26_B3', "PNG")

In [None]:
img