In [None]:
import os
from PIL import Image 
from torch.utils.data import Dataset , DataLoader
from torchvision import transforms, datasets, utils
import numpy as np
from tqdm import tqdm
import numpy as np
import torch.nn as nn
import torchvision.transforms.functional as TF 
import torch.optim as optim
import torch.nn.functional as F
import pickle
import itertools
import torch
def save_variable(a,name):
    with open(name, 'wb') as f:
        pickle.dump(a, f)

def load_variable(name):
    with open(name, 'rb') as f:
        return pickle.load(f)

In [None]:
#dataset loading class -- specifically for cityscape
class CityscapesDataset(Dataset):
    def __init__(self, split, root_dir, mode='fine', transform=None, eval=False):
        self.transform = transform
        if mode == 'fine':
            self.mode = 'gtFine'
        self.split = split
        self.yLabel_list = []
        self.XImg_list = []
        self.eval = eval

        self.label_path = os.path.join(os.getcwd(), root_dir+'\\'+self.mode+'\\'+self.split)
        self.rgb_path = os.path.join(os.getcwd(), root_dir+'\\leftImg8bit\\'+self.split)
        city_list = os.listdir(self.label_path)
        for city in city_list:
            temp = os.listdir(self.label_path+'/'+city)
            list_items = temp.copy()
            for item in temp:
                if not item.endswith('labelIds.png', 0, len(item)):
                    list_items.remove(item)

            # defining paths
            list_items = ['/'+city+'/'+path for path in list_items]

            self.yLabel_list.extend(list_items)
            self.XImg_list.extend(
                ['/'+city+'/'+path for path in os.listdir(self.rgb_path+'/'+city)]
            )
                
    def __len__(self):
        length = len(self.XImg_list)
        return length      

    def __getitem__(self, index):
        image = Image.open(self.rgb_path+self.XImg_list[index])
        y = Image.open(self.label_path+self.yLabel_list[index])

        if self.transform is not None:
            image = self.transform(image)
            y = self.transform(y)

        image = transforms.ToTensor()(image)
        y = np.array(y)
        y = torch.from_numpy(y)
        y = y.type(torch.LongTensor)
        if self.eval:
            return image, y, self.XImg_list[index]
        else:
            return image, y

def get_cityscapes_data(mode,split,root_dir='datasets\cityscapes',transforms=None,batch_size=1,eval=False,shuffle=True,pin_memory=True):
    data = CityscapesDataset(
        mode=mode, split=split, transform=transforms, root_dir=root_dir, eval=eval)

    data_loaded = torch.utils.data.DataLoader(
        data, batch_size=batch_size, shuffle=shuffle, pin_memory=pin_memory)

    return data_loaded

In [None]:
class FuzzyLayer(nn.Module):
	def __init__(self, fuzzynum,channel, mu, sigma, static = False):
		super(FuzzyLayer,self).__init__()
		self.n = fuzzynum
		self.channel = channel
		self.conv1 = nn.Conv2d(self.channel,self.channel,3,padding=1)
		self.conv2 = nn.Conv2d(self.channel,self.channel,3,padding=1)
		# You can change initial mu and sigma here.

		if not static:
			self.sigma = nn.Parameter(torch.full((self.channel, self.n), 0.01).cuda())
			self.mu = nn.Parameter(torch.rand(self.channel,self.n).cuda())
		else:	
			self.mu = mu
			self.sigma = sigma
		self.weight = torch.tensor(list(map(float,(range(10,10*(self.n + 1),10))))).cuda()
		self.bn1 = nn.BatchNorm2d(256, affine=True)	
		self.bn2 = nn.BatchNorm2d(self.channel,affine=True)

	def forward(self, x):
		x = self.conv1(x)
		y = x.unsqueeze(2).repeat_interleave(self.n, dim=2)
		tmp = (self.weight[:,np.newaxis,np.newaxis] * torch.exp(-((y - self.mu[np.newaxis,:, :, np.newaxis, np.newaxis]) / (np.sqrt(2) * self.sigma[np.newaxis,:, : , np.newaxis, np.newaxis]))**2)).sum(dim=2)
		sum = (torch.exp(-((y - self.mu[np.newaxis,:, : , np.newaxis, np.newaxis]) / (np.sqrt(2) * self.sigma[np.newaxis,:, : , np.newaxis, np.newaxis]))**2)).sum(dim=2)

		fuzzy = torch.div(tmp,sum)
				
		fuzzy = self.bn2(self.conv2(self.bn1(tmp)))
		return fuzzy	

In [None]:
class Fuzzy_UNET(nn.Module):
    
    def __init__(self, mu, sigma, in_channels=3, classes=1, static = False, fuzzy = False):
        super(Fuzzy_UNET, self).__init__()
        self.layers = [in_channels, 64, 128, 256]
        self.double_conv_downs = nn.ModuleList(
            [self.__double_conv(layer, layer_n) for layer, layer_n in zip(self.layers[:-1], self.layers[1:])]) 
        self.up_trans = nn.ModuleList(
            [nn.ConvTranspose2d(layer, layer_n, kernel_size=2, stride=2)
             for layer, layer_n in zip(self.layers[::-1][:-2], self.layers[::-1][1:-1])])
        self.double_conv_ups = nn.ModuleList(
        [self.__double_conv(layer, layer//2) for layer in self.layers[::-1][:-2]])
        self.max_pool_2x2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.final_conv = nn.Conv2d(64, classes, kernel_size=1)
        # You can change number of fuzzy membership function here
        self.fuzzy = FuzzyLayer(fuzzynum=35,channel=self.layers[-1], mu = mu, sigma = sigma, static = static)
        self.f = fuzzy

        self.conv1 = nn.Conv2d(self.layers[-1],self.layers[-1],3,padding=1)
        self.conv2 = nn.Conv2d(self.layers[-1],self.layers[-1],3,padding=1)

    
    def __double_conv(self, in_channels, out_channels):
        conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )
        return conv
    
    def forward(self, x):
        # down layers
        concat_layers = []

        for down in self.double_conv_downs:
            x = down(x)
            if down != self.double_conv_downs[-1]:
                concat_layers.append(x)
                x = self.max_pool_2x2(x)
                
        x = self.max_pool_2x2(x)
        if self.f:    
            x = self.fuzzy(x)
            
        concat_layers = concat_layers[::-1]
        
        # up layers
        for up_trans, double_conv_up, concat_layer  in zip(self.up_trans, self.double_conv_ups, concat_layers):
            x = up_trans(x)
            if x.shape != concat_layer.shape:
                x = TF.resize(x, concat_layer.shape[2:])
            
            concatenated = torch.cat((concat_layer, x), dim=1)
            x = double_conv_up(concatenated)
            
        x = self.final_conv(x)
        
        return x 


In [None]:
if torch.cuda.is_available():
    DEVICE = 'cuda:0'
    print('Running on the GPU')
else:
    DEVICE = "cpu"
    print('Running on the CPU')

In [None]:
def accuracy_calculation(pred, label):
    torch.cuda.empty_cache()
    pred_class = torch.argmax(pred, dim=1) 
    pred_class = pred_class.float()
    acc_sum = (pred_class == label).sum()
    acc = float(acc_sum) / torch.numel(label)
    return acc

def acc_epoch(data, model,device):
    acc = []
    for index, batch in enumerate(data): 
        X, y = batch
        X, y = X.to(device), y.to(device)
        preds = model(X)
        acc_ = accuracy_calculation(preds,y)
        acc.append(acc_)
    return np.mean(acc)


In [None]:
ROOT_DIR = "datasets\\cityscapes"
IMG_HEIGHT = 110  
IMG_WIDTH = 220  
BATCH_SIZE = 32 
LEARNING_RATE = 0.0004
EPOCHS = 50

transform = transforms.Compose([
    transforms.Resize((IMG_HEIGHT, IMG_WIDTH), interpolation=Image.NEAREST),
]) 

train_set = get_cityscapes_data(
    split='train',
    mode='fine',
    root_dir=ROOT_DIR,
    transforms=transform,
    batch_size=BATCH_SIZE,
)

test_set = get_cityscapes_data(
    split='val',
    mode='fine',
    root_dir=ROOT_DIR,
    transforms=transform,
    batch_size=16,
)

def train_function(data, model, optimizer, loss_fn, device):
    print('Entering into train function')
    loss_values = []
    data = tqdm(data)
    for index, batch in enumerate(data): 
        X, y = batch
        X, y = X.to(device), y.to(device)
        preds = model(X)
        loss = loss_fn(preds, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    return loss.item()

In [None]:
def Unet_Fuzzy_Test(folder_name, name, DEVICE, static = False, fuzzy = False, mu = 0, sigma = 0):
    
    LOSS_VALS_Fuzzy = []
    ACC_VALS_Fuzzy = []

    unet_fuzzy = Fuzzy_UNET(mu, sigma,in_channels=3, classes=35, fuzzy = fuzzy, static = static).to(DEVICE).train()
    optimizer = optim.Adam(unet_fuzzy.parameters(), lr=LEARNING_RATE)
    loss_function = nn.CrossEntropyLoss(ignore_index=255).to(DEVICE)

    for e in range(0, EPOCHS):
        torch.cuda.empty_cache()
        print(f'Epoch: {e}')
        loss_val = train_function(train_set, unet_fuzzy, optimizer, loss_function, DEVICE)
        LOSS_VALS_Fuzzy.append(loss_val) 
        print(f'Loss = {loss_val}')
        torch.cuda.empty_cache()
        acc = acc_epoch(test_set, unet_fuzzy, DEVICE)
        ACC_VALS_Fuzzy.append(acc)
        print(f'Acc = {acc}')
        if fuzzy and not static:   
            save_variable(unet_fuzzy.fuzzy.mu,folder_name + '/unet_fuzzy_mu_' + str(e) + '.txt')
            save_variable(unet_fuzzy.fuzzy.sigma,folder_name + '/unet_fuzzy_sigma_' + str(e) + '.txt')

    save_variable(unet_fuzzy,folder_name + '/' + name + '.txt')
    save_variable(LOSS_VALS_Fuzzy,folder_name + '/loss_' + name + '.txt')
    save_variable(ACC_VALS_Fuzzy,folder_name + '/acc_' + name + '.txt')

In [None]:
Unet_Fuzzy_Test('Unet_Origin','unet_origin',DEVICE)

In [None]:
Unet_Fuzzy_Test('Unet_Fuzzy','unet_fuzzy',DEVICE,fuzzy=True)