In [1]:
!nvidia-smi

Sun Jun  5 09:27:14 2022       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 460.32.03    Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla P100-PCIE...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   41C    P0    29W / 250W |      0MiB / 16280MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [2]:
from google.colab import drive 
drive.mount('/content/gdrive/')

Mounted at /content/gdrive/


In [3]:
from glob import glob
val_file = glob(f'/content/gdrive/MyDrive/Colab Notebooks/maps/val/*')
len(val_file)

1098

In [4]:
from PIL import Image
import os
import torch
import torchvision
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets
from torchvision.utils import save_image
import matplotlib.pyplot as plt
import numpy as np
from torch.utils.data import Dataset, DataLoader

In [5]:
def tensor_to_image(data):
  return  unnormalize(np.transpose(data.detach().cpu().numpy()[0] , (1, 2, 0)))

In [6]:
def file_to_PIL(sample):
  
  result = Image.open(sample)

  return result

In [7]:
def splitTensor(sample):
        C, H, W = sample.shape
        
        input_image = sample[:,:,:int(W/2)]
        target_image = sample[:,:,int(W/2):]
        
        output = torch.cat([input_image,target_image])
        return output

In [8]:
def make_two_tensor(sample):
  C, H, W = sample.shape   
  input_image = sample[:int(C/2),:,:]
  target_image = sample[int(C/2):,:,:] 

  return input_image, target_image

In [9]:
def split_input_tensor(sample):
  C, H, W = sample.shape
        
  input_image = sample[:,:,:int(W/2)]
  target_image = sample[:,:,int(W/2):]
  
  output = torch.cat([input_image,target_image])

  return input_image, target_image

In [10]:
def unnormalize(sample):
  sample = ((sample * 0.5) + 0.5)
  return sample

In [11]:
def shuffle_batch(data):
  if len(data) == 1:
    return data
  first_data = data[0].clone()
  data[0:-1] = data[1:].clone()
  data[-1] = first_data
  return data

In [12]:
class CustomDatasest(Dataset):
  def __init__(self, np_data, transform = None):
      self.data = np_data
      self.transform = transform
      self.len = len(np_data)

  def __len__(self):
    return self.len

  def __getitem__(self, idx):
    sample = self.data[idx]
    if self.transform:
      sample = self.transform(sample)
    return sample

In [13]:
image_height = 256
image_width = 256

In [14]:
trans = transforms.Compose([
                            file_to_PIL,
                            transforms.ToTensor(),
                            transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5)),
                            transforms.Resize((image_height, image_width*2)),
                            splitTensor,
                            make_two_tensor,
                            ])

#TMP_loader

In [15]:
val_dataset = CustomDatasest(val_file, transform = trans)
val_loader = DataLoader(
    dataset = val_dataset, 
    batch_size = 1
)

#Pix2Pix

In [16]:
class Pix2pixGenerator(nn.Module):
    def __init__(self, output_nc):
        super(Pix2pixGenerator, self).__init__()

        self.downsample_layers = []
        # Downsampling
        in_features = 3
        out_features = 64
        
        input = nn.Sequential(*self.downsample(in_features,out_features,False))
        self.downsample_layers.append(input) #(128, 128, 64)

        in_features = out_features
        out_features = out_features * 2
        
        input = nn.Sequential(*self.downsample(in_features,out_features))
        self.downsample_layers.append(input) #(64, 64, 128)

        in_features = out_features
        out_features = out_features * 2
        
        input = nn.Sequential(*self.downsample(in_features,out_features))
        self.downsample_layers.append(input) # (32, 32, 256)

        in_features = out_features
        out_features = out_features * 2

        input = nn.Sequential(*self.downsample(in_features,out_features))
        self.downsample_layers.append(input) # (16, 16, 512)

        in_features = out_features

        self.downsample_layers.append(nn.Sequential(*self.downsample(in_features,out_features))) # (8, 8, 512)
        self.downsample_layers.append(nn.Sequential(*self.downsample(in_features,out_features))) # (4, 4, 512)
        self.downsample_layers.append(nn.Sequential(*self.downsample(in_features,out_features))) # (2, 2, 512)
        self.downsample_layers.append(nn.Sequential(*self.downsample(in_features,out_features))) # (1, 1, 512)

        self.downsample_layer = nn.Sequential(*self.downsample_layers)

        self.updsample_layers = []

        self.updsample_layers.append(nn.Sequential(*self.upsample(in_features,out_features, apply_dropout=True))) # (2, 2, 512)

        in_features = out_features * 2 # concat 1024

        self.updsample_layers.append(nn.Sequential(*self.upsample(in_features,out_features, apply_dropout=True))) # (4, 4, 512)
        self.updsample_layers.append(nn.Sequential(*self.upsample(in_features,out_features, apply_dropout=True))) # (8, 8, 512)
        self.updsample_layers.append(nn.Sequential(*self.upsample(in_features,out_features))) # (16, 16, 512)

        out_features = out_features // 2 # 256

        self.updsample_layers.append(nn.Sequential(*self.upsample(in_features,out_features))) # (32, 32, 256)

        in_features = out_features * 2 # 512
        out_features = out_features // 2 # 128

        self.updsample_layers.append(nn.Sequential(*self.upsample(in_features,out_features))) # (64, 64, 128)

        in_features = out_features * 2 # 256
        out_features = out_features // 2 #64

        self.updsample_layers.append(nn.Sequential(*self.upsample(in_features,out_features))) # (128, 128, 64)
        
        in_features = out_features * 2
        out_features = output_nc

        self.updsample_layer = nn.Sequential(*self.updsample_layers)

        self.output_layer = nn.Sequential(
                                           nn.ConvTranspose2d(in_features, out_features, 4, stride=2, padding=1),
                                           nn.Tanh(),
        )

    def downsample(self, in_features,out_features, apply_batchnorm=True):
      layerList = []
      layerList.append(nn.Conv2d(in_features, out_features, 4, stride=2, padding=1, bias=False))
      if apply_batchnorm:
        layerList.append(nn.BatchNorm2d(out_features))
      layerList.append(nn.LeakyReLU(0.3, inplace=True))

      return layerList

    def upsample(self, in_features,out_features, apply_dropout=True):
      layerList = []
      layerList.append(nn.ConvTranspose2d(in_features, out_features, 4, stride=2, padding=1, bias=False))

      
      layerList.append(nn.BatchNorm2d(out_features))
      if apply_dropout:
        layerList.append(nn.Dropout(0.2))
      layerList.append(nn.LeakyReLU(0.3, inplace=True))

      return layerList

    def forward(self, x):
      
      down_result = []
      for i, down in enumerate(self.downsample_layers):
        x = down(x)
        down_result.append(x)
      down_result = reversed(down_result[:-1])
      for up, down in zip(self.updsample_layers, down_result):
        x = up(x)
        x = torch.cat([x, down], dim=1)

      x = self.output_layer(x)
      return x

In [17]:
class Pix2pixDiscriminator(nn.Module):
    def __init__(self, input_nc):
        super(Pix2pixDiscriminator, self).__init__()

        # A bunch of convolutions one after another
        self.downsample_layers = []
        # Downsampling
        in_features = input_nc * 2
        out_features = 64
        
        input = nn.Sequential(*self.downsample(in_features,out_features,False))
        self.downsample_layers.append(input) #(128, 128, 64)

        in_features = out_features
        out_features = out_features * 2
        
        input = nn.Sequential(*self.downsample(in_features,out_features))
        self.downsample_layers.append(input) #(64, 64, 128)

        in_features = out_features
        out_features = out_features * 2
        
        input = nn.Sequential(*self.downsample(in_features,out_features))
        self.downsample_layers.append(input) # (32, 32, 256)

        self.downsample_layer = nn.Sequential(*self.downsample_layers)

        in_features = out_features
        out_features = out_features * 2

        self.zero_pad = nn.ZeroPad2d(1)

        self.conv1 = nn.Conv2d(in_features, out_features, 4, stride=1, bias=False)

        in_features = out_features

        self.batchnorm = nn.BatchNorm2d(out_features)
        self.leakyRelu = nn.LeakyReLU(0.3, inplace=True)
        self.conv2 = nn.Conv2d(in_features, 1, 4, stride=1)

    def downsample(self, in_features,out_features, apply_batchnorm=True):
      layerList = []
      layerList.append(nn.Conv2d(in_features, out_features, 4, stride=2, padding=1, bias=False))
      if apply_batchnorm:
        layerList.append(nn.BatchNorm2d(out_features))
      layerList.append(nn.LeakyReLU(0.3, inplace=True))

      return layerList

    def forward(self, inputImg, targetImg):
        x = torch.cat((inputImg,targetImg), dim=1)
        x = self.downsample_layer(x)
        x = self.zero_pad(x)
        x = self.conv1(x)
        x = self.batchnorm(x)
        x = self.leakyRelu(x)
        x = self.zero_pad(x)
        x = self.conv2(x)
        return x

#CycleGan

In [18]:
class ResidualBlock(nn.Module):
    def __init__(self, in_features):
        super(ResidualBlock, self).__init__()

        conv_block = [  nn.ReflectionPad2d(1),
                        nn.Conv2d(in_features, in_features, 3),
                        nn.InstanceNorm2d(in_features),
                        nn.ReLU(inplace=True),
                        nn.Dropout(0.5),
                        nn.ReflectionPad2d(1),
                        nn.Conv2d(in_features, in_features, 3),
                        nn.InstanceNorm2d(in_features)]

        self.conv_block = nn.Sequential(*conv_block)

    def forward(self, x):
        return x + self.conv_block(x)

## Generator

In [19]:
class Generator(nn.Module):
    def __init__(self, input_nc, output_nc, n_residual_blocks=9):
        super(Generator, self).__init__()

        # Initial convolution block       
        model = [   nn.ReflectionPad2d(3),
                    nn.Conv2d(input_nc, 64, 7),
                    nn.InstanceNorm2d(64),
                    nn.ReLU(inplace=True) ]

        # Downsampling
        in_features = 64
        out_features = in_features*2
        for _ in range(2):
            model += [  nn.Conv2d(in_features, out_features, 3, stride=2, padding=1),
                        nn.InstanceNorm2d(out_features),
                        nn.ReLU(inplace=True) ]
            in_features = out_features
            out_features = in_features*2

        # Residual blocks
        for _ in range(n_residual_blocks):
            model += [ResidualBlock(in_features)]

        # Upsampling
        out_features = in_features//2
        for _ in range(2):
            model += [  nn.ConvTranspose2d(in_features, out_features, 3, stride=2, padding=1, output_padding=1),
                        nn.InstanceNorm2d(out_features),
                        nn.ReLU(inplace=True) ]
            in_features = out_features
            out_features = in_features//2

        # Output layer
        model += [  nn.ReflectionPad2d(3),
                    nn.Conv2d(64, output_nc, 7),
                    nn.Tanh() ]

        self.model = nn.Sequential(*model)

    def forward(self, x):
        return self.model(x)

##Discriminator

In [20]:
class Discriminator(nn.Module):
  def __init__(self, input_nc):
      super(Discriminator, self).__init__()

      # A bunch of convolutions one after another
      model = [   nn.Conv2d(input_nc, 64, 4, stride=2, padding=1),
                  nn.LeakyReLU(0.2, inplace=True),
                  nn.Dropout(0.2)]

      model += [  nn.Conv2d(64, 128, 4, stride=2, padding=1),
                  nn.InstanceNorm2d(128),
                  nn.LeakyReLU(0.2, inplace=True),
                  nn.Dropout(0.2) ]

      model += [  nn.Conv2d(128, 256, 4, stride=2, padding=1),
                  nn.InstanceNorm2d(256),
                  nn.LeakyReLU(0.2, inplace=True),
                  nn.Dropout(0.2) ]

      model += [  nn.Conv2d(256, 512, 4, 1, 1),
                  nn.InstanceNorm2d(512),
                  nn.LeakyReLU(0.2, inplace=True) ]

      # FCN classification layer
      model += [nn.Conv2d(512, 1, 4, 1, 1)]

      self.model = nn.Sequential(*model)

  def forward(self, x):
    return self.model(x)

In [21]:
pix2pix_netG = Pix2pixGenerator(3)

default_netG_A2B = Generator(3, 3)
default_netG_B2A = Generator(3, 3)

semi_netG_A2B = Generator(3, 3)
semi_netG_B2A = Generator(3, 3)

In [22]:
pix2pix_save_folder = f'/content/gdrive/MyDrive/Colab Notebooks/maps/pix2pix/' 
pix2pix_file_name = f'pix2pix_model_2022-06-04 01:41:47.675579+09:00.pth_200.pth'
pix2pix_checkpoint = torch.load(f'{pix2pix_save_folder}{pix2pix_file_name}', map_location=torch.device('cpu'))
pix2pix_netG.load_state_dict(pix2pix_checkpoint['g_model_state_dict'])
print(pix2pix_checkpoint['epoch'])

199


In [23]:
cyclegan_save_folder = f'/content/gdrive/MyDrive/Colab Notebooks/maps/cyclegan/'
cyclegan_file_name = f'cyclegan_model_2022-05-12 02:46:50.888392+09:00.pth'
cycle_gan_checkpoint = torch.load(f'{cyclegan_save_folder}{cyclegan_file_name}', map_location=torch.device('cpu'))
start_epoch = cycle_gan_checkpoint['epoch']
default_netG_A2B.load_state_dict(cycle_gan_checkpoint['netG_A2B_state_dict'])
default_netG_B2A.load_state_dict(cycle_gan_checkpoint['netG_B2A_state_dict'])
print(start_epoch)

150


In [24]:
save_model_folder = f'/content/gdrive/MyDrive/Colab Notebooks/maps/cyclegan_semi/'
file_name = f'150_semi_cyclegan_model_2022-05-15 13:50:11.965796+09:00.pth'
checkpoint = torch.load(f'{save_model_folder}{file_name}', map_location=torch.device('cpu'))
start_epoch = checkpoint['epoch']
semi_netG_A2B.load_state_dict(checkpoint['netG_A2B_state_dict'])
semi_netG_B2A.load_state_dict(checkpoint['netG_B2A_state_dict'])
print(start_epoch)

150


In [25]:
device = 'cuda' if torch.cuda.is_available() else  'cpu'
device

'cuda'

In [26]:
val_iter = iter(val_loader)

pix2pix_netG.to(device)
default_netG_A2B.to(device)
default_netG_B2A.to(device)
semi_netG_A2B.to(device)
semi_netG_B2A.to(device)

pix2pix_netG.eval()
default_netG_A2B.eval()
default_netG_B2A.eval()
semi_netG_A2B.eval()
semi_netG_B2A.eval()
print("done")

done


In [27]:
for _ in range(10):
  inputA, inputB = next(val_iter)


  with torch.no_grad():
    inputA = inputA.to(device)
    inputB = inputB.to(device)

    pix2pix_fake_B = pix2pix_netG(inputA)
    default_fake_B = default_netG_A2B(inputA)
    semi_fake_B = semi_netG_A2B(inputA)

    #default_fake_A = tensor_to_image(default_netG_B2A(inputB))
    #semi_fake_A = tensor_to_image(semi_netG_B2A(inputB))

    fig = plt.figure(figsize=(30,30))
    fig.add_subplot(151)
    plt.axis('off')
    plt.imshow(tensor_to_image(inputA))
    fig.add_subplot(152)
    plt.axis('off')
    plt.imshow(tensor_to_image(inputB))
    fig.add_subplot(153)
    plt.imshow(tensor_to_image(pix2pix_fake_B))
    fig.add_subplot(154)
    plt.imshow(tensor_to_image(default_fake_B))
    fig.add_subplot(155)
    plt.imshow(tensor_to_image(semi_fake_B))
    
    plt.show()

Output hidden; open in https://colab.research.google.com to view.

#Segmentation Labels

In [28]:
label_colors = {}
label_colors['grass'] = [200,225,170]
label_colors['load'] = [255, 255, 255]
label_colors['arrow'] = [200,200,200]
label_colors['building'] = [243,240,233]
label_colors['building_floor'] = [233,228,222]
label_colors['playground'] = [232,221, 190]
label_colors['high_load'] = [250,160,40]
label_colors['water'] = [177,208, 255]
len(label_colors)

8

In [29]:
def make_label_map(image):
  H, W, C = image.shape
  feature_map = np.zeros((H,W,1)).astype(np.uint)
  for h in range(H):
    for w in range(W):
      label_index = 0
      min_distance = float("inf")
      img_color = image[h, w, :]
      for i, label in enumerate(label_colors.values()):
        distance = ((img_color - label) ** 2).sum()
        if min_distance > distance:
          label_index = i
          min_distance = distance
      feature_map[h, w, 0] = label_index
  return feature_map

In [30]:
def label_map_to_image(label_map):
  H, W, C = label_map.shape
  label_values = list(label_colors.values())
  result_image = np.zeros((H, W, 3)).astype(np.uint)
  for h in range(H):
    for w in range(W):
      result_image[h,w,:] = label_values[label_map[h,w,0]]
  return result_image

In [31]:
def get_pixel_accuracy(target_label, input_label):
  if target_label.shape != input_label.shape:
    print("Input Shape Error!")
    return
  H, W, C = target_label.shape
  t_i = H * W
  correct_num = np.count_nonzero(target_label == input_label)
  return correct_num / t_i

In [32]:
def make_label_map_by_torch(image_tensor):
  H, W, C = image_tensor.shape
  device = image_tensor.get_device()
  feature_num = len(label_colors.values())

  
  label_tensor = torch.Tensor(list(label_colors.values())).view(feature_num,1,1,3).repeat(1,H,W,1).to(device)
  image_tensor = torch.unsqueeze(image_tensor, 0).repeat(feature_num,1,1,1).to(device)

  cal_result = (image_tensor - label_tensor).pow(2).sum(-1)
  _, ii = torch.min(cal_result, 0)

  return ii

In [33]:
def batch_tensor_to_image_tensor(input):
  result = input[0]
  result = (result * 0.5) + 0.5
  result = torch.permute(result, (1,2,0))
  return result * 255.

In [34]:
def label_tensor_map_to_image(label_map):
  label_map = np.expand_dims(label_map.cpu().numpy(), -1)

  H, W,C = label_map.shape
  label_values = list(label_colors.values())
  result_image = np.zeros((H, W, 3)).astype(np.uint)
  for h in range(H):
    for w in range(W):
      result_image[h,w,:] = label_values[label_map[h,w]]
  return result_image

In [35]:
def get_pixel_accuracy_by_tensor(target_label, input_label):
  if target_label.shape != input_label.shape:
    print("Input Shape Error!")
    return
  H, W = target_label.shape
  t_i = H * W
  correct_num = torch.count_nonzero(target_label == input_label)
  return correct_num / t_i

In [36]:
from numpy.lib.arraysetops import union1d
def get_iou_by_tensor(target_label, input_label):
  if target_label.shape != input_label.shape:
    print("Input Shape Error!")
    return
  class_list= torch.unique(target_label)
  class_num = len(class_list)
  H, W = target_label.shape
  class_iou = 0
  for i in class_list:
    target_i = target_label == i
    label_i = input_label == i
    num_target_i = torch.count_nonzero(target_i)
    num_label_i = torch.count_nonzero(label_i)
    intersection = torch.count_nonzero((target_i == True) & (label_i == True))
    union = num_target_i +  num_label_i - intersection
    if union != 0:
      class_iou += intersection / union
  result = class_iou / class_num
  return result

In [37]:
arr = torch.Tensor([[1,2,3,4,5],[7,8,9,9,0]])
arr2 = torch.Tensor([[1,2,3,9,5],[7,8,9,0,0]])
get_iou_by_tensor(arr, arr2)

tensor(0.7593)

In [38]:
from tqdm import tqdm
tqdm_var = tqdm(val_loader)
acc_pix2pix_list = []
iou_pix2pix_list = []
acc_cyclegan_list = []
iou_cyclegan_list = []
acc_semi_cyclegan_list = []
iou_semi_cyclegan_list = []
with torch.no_grad():
  for inputA, inputB in tqdm_var:
    inputA = inputA.to(device)
    inputB = inputB.to(device)

    pix2pix_fake_B = pix2pix_netG(inputA)
    cyclegan_fake_B = default_netG_A2B(inputA)
    semi_cyclegan_fake_B = semi_netG_A2B(inputA)

    input_label_map = make_label_map_by_torch(batch_tensor_to_image_tensor(inputB))
    pix2pix_label_map = make_label_map_by_torch(batch_tensor_to_image_tensor(pix2pix_fake_B))
    cyclegan_label_map = make_label_map_by_torch(batch_tensor_to_image_tensor(cyclegan_fake_B))
    semi_cyclegan_label_map = make_label_map_by_torch(batch_tensor_to_image_tensor(semi_cyclegan_fake_B))
  
    acc_pix2pix = get_pixel_accuracy_by_tensor(input_label_map, pix2pix_label_map)
    iou_pix2pix = get_iou_by_tensor(input_label_map, pix2pix_label_map)
    acc_cyclegan = get_pixel_accuracy_by_tensor(input_label_map, cyclegan_label_map)
    iou_cyclegan = get_iou_by_tensor(input_label_map, cyclegan_label_map)
    acc_semi_cyclegan = get_pixel_accuracy_by_tensor(input_label_map, semi_cyclegan_label_map)
    iou_semi_cyclegan = get_iou_by_tensor(input_label_map, semi_cyclegan_label_map)
    
    acc_pix2pix_list.append(acc_pix2pix)
    iou_pix2pix_list.append(iou_pix2pix)
    acc_cyclegan_list.append(acc_cyclegan)
    iou_cyclegan_list.append(iou_cyclegan)
    acc_semi_cyclegan_list.append(acc_semi_cyclegan)
    iou_semi_cyclegan_list.append(iou_semi_cyclegan)

100%|██████████| 1098/1098 [01:34<00:00, 11.66it/s]


In [39]:
pix2pix_pixel_acc = sum(acc_pix2pix_list) / len(acc_pix2pix_list) 
cyclegan_pixel_acc = sum(acc_cyclegan_list) / len(acc_cyclegan_list) 
semi_cyclegan_pixel_acc = sum(acc_semi_cyclegan_list) / len(acc_semi_cyclegan_list)
pix2pix_pixel_acc, cyclegan_pixel_acc, semi_cyclegan_pixel_acc

(tensor(0.6243, device='cuda:0'),
 tensor(0.5818, device='cuda:0'),
 tensor(0.6657, device='cuda:0'))

In [40]:
pix2pix_iou = sum(iou_pix2pix_list) / len(iou_pix2pix_list) 
cyclegan_iou = sum(iou_cyclegan_list) / len(iou_cyclegan_list) 
semi_cyclegan_iou = sum(iou_semi_cyclegan_list) / len(iou_semi_cyclegan_list) 
pix2pix_iou,cyclegan_iou ,semi_cyclegan_iou

(tensor(0.2430, device='cuda:0'),
 tensor(0.2344, device='cuda:0'),
 tensor(0.2963, device='cuda:0'))