In [None]:
import numpy as np
import cv2
import glob
from tqdm import tqdm
import matplotlib.pyplot as plt

import torch 
from torch import nn, optim, tensor
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, models, transforms

import fastai
from fastai.vision import *
from fastai.vision import learner
from fastai.vision.all import *

In [None]:
#Use gpu if available
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

In [None]:
#Import Data

#Images
imageNames = glob.glob('/home/lilcompaadres/Documents/Computer_Vision/bbc_train/pytorch_test/coco_dataset/imgs/*.jpg')
imageNames.sort()
imageNames.reverse()

images = []
for i in tqdm(imageNames[0:5000]):
  img = cv2.imread(i)
  img = cv2.resize(img, (438, 584))
  images.append(img)

#Masks and class label
maskNames= glob.glob("/home/lilcompaadres/Documents/Computer_Vision/bbc_train/pytorch_test/coco_dataset/masks/*.png")
maskNames.sort()
maskNames.reverse()
masks = []
class_labels = []

for img in tqdm(maskNames[0:5000]):
  mask = cv2.imread(img) 
  #mask = np.expand_dims(mask, axis=2)
  mask = np.expand_dims(cv2.resize(mask, (438, 584))[:, :, 0], axis=2)
  masks.append(mask)

In [None]:
#Split Data into Train / Test / Validation Datasets

train_images = np.asarray(images[:4550], dtype=np.uint8)
train_masks = np.asarray(masks[:4550], dtype=np.uint8)

test_images = np.asarray(images[4651:4900], dtype=np.uint8)
test_masks = np.asarray(masks[4651:4900], dtype=np.uint8)
valid_images = np.asarray(images[4901:], dtype=np.uint8)
valid_masks = np.asarray(masks[4901:], dtype=np.uint8)


print(train_images.shape, train_masks.shape)

In [None]:
#Create Dataset
class MergeData(Dataset):
  def __init__(self, images, masks, c=1):
    self.images = images
    self.masks = masks
    self.c = c
    self.length = self.images.shape[0]
    self.bbox = 1
  
  def __len__(self): 
    return self.length

  def __getitem__(self, idx):
    img = self.images[idx, :, :, :]
    mask = self.masks[idx, :, :, :]
    
    img = cv2.resize(img, (256, 256)) / 255.0
    mask = cv2.resize(mask, (256, 256))
    
    mask = np.expand_dims(mask, 2)
    

    return tensor(img.transpose((2, 0, 1))).float(), tensor(mask.transpose((2, 0, 1)))

  def show(self, idx):
    #Bounding box rectangle
    x, y = self.__getitem__(idx)
    print(x.shape, y[0].shape)
    print(x.dtype, y[0].dtype)

    fig, ax = plt.subplots(1)
    print(x.shape)
    ax.imshow(x.numpy().transpose((1,2,0)))
    print(y.numpy().transpose((1,2,0)).shape)
    plt.imshow(y.numpy().transpose((1,2,0)), alpha=0.15)
    plt.title("{}".format(y[0]))

In [None]:
#Create Dataloader
train = MergeData(train_images, train_masks)
test = MergeData(test_images, test_masks)
valid = MergeData(valid_images, valid_masks)

train_loader = DataLoader(train, batch_size=128, shuffle=True)
test_loader = DataLoader(test, batch_size=16, shuffle=True) 
valid_loader = DataLoader(valid, batch_size=16, shuffle=True)

In [None]:
#Free up memory for unused variables
%xdel train_images 
%xdel train_masks 
%xdel test_images
%xdel test_masks
%xdel valid_images
%xdel valid_masks

In [None]:
train.show(7)

In [None]:
def conv_trans(ni, nf, ks = 4, stride = 2, padding = 1, ps=0.50):
    return nn.Sequential(
        nn.ConvTranspose2d(ni, nf, kernel_size=ks, bias=False, stride=stride, padding = padding), 
        nn.ReLU(inplace = True), 
        nn.BatchNorm2d(nf),
        nn.Dropout(p=ps))

In [None]:
class MyModel(nn.Module):
  def __init__(self, base_model, ps=0.35):
    super(MyModel, self).__init__()
    self.base_model = learner.create_body(base_model, pretrained=True)
    self.seg_head = nn.Sequential(
                                conv_trans(ni=512, nf=256, ks=4, stride=2, padding=1, ps=0.8), 
                                conv_trans(256, 128),
                                conv_trans(128, 64),
                                conv_trans(64, 32), 
                                nn.ConvTranspose2d(32, 1, kernel_size=4, bias=False, stride=2, padding = 1)                                  
                                 ) 
    
  def forward(self, x):
    #Attach head to base model
    x = self.base_model(x)
    
    #m3 = torch.nn.Softmax() 
    m3 = nn.Sigmoid()
    seg_model = self.seg_head(m3(x))

    return [seg_model]

In [None]:
#Create custom loss function

def MyLoss(yhat, seg_tgts):
    #seg_loss = CrossEntropyLossFlat(axis=1)(yhat[0], seg_tgts.long()) #For multi-class problems
    
    seg_loss = torch.nn.BCELoss()(yhat, seg_tgts.float()) #BCEWithLogitLoss
    return 1.0*seg_loss

In [None]:
def pixel_accuracy(yhat, seg_tgts): #segmentation accuracy bbox_tgts, 
    #pred_mask[pred_mask>0.5] = 1
    #pred_mask[pred_mask<0.5] = 0
    #print(y.shape, seg_tgts.shape)
    y_=seg_tgts.squeeze(dim=1)
    yhat_=yhat.squeeze(dim=1)
    
    yhat_[yhat_>0.5] = 1
    yhat_[yhat_<0.5] = 0
    
    return (y_==yhat_).sum().float()/seg_tgts.numel()

In [None]:
#Create model
resnet_model = models.resnet18(pretrained=True)
for param in resnet_model.parameters():
    param.requires_grad = False #Freeze parameters so backwards doesn't affect weights

model = MyModel(models.resnet18)
model.to('cuda')
print("Model Loaded")

In [None]:
#model = torch.load("model.pth") #Load model
NUM_EPOCHS = 80

#Create optimizer
loss = 0.0

optimizer = optim.Adam(model.parameters(), lr=0.01)
#torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[8, 15, 20], gamma=0.1)
best = 1000
total = 0.0

for epoch in range(0, NUM_EPOCHS):
    print(f"Epoch: {epoch} || Loss: {loss}")
    #model = torch.load("model.pth")
    
    model.train()
    
    training_accs = []
    testing_accs = []
    
    for x, y in tqdm(train_loader):
        x = x.to('cuda')
        y = y.to('cuda')
        
        pred = model(x)
        loss = MyLoss(torch.sigmoid(pred[0]), y) 
        training_accs.append(pixel_accuracy(pred[0], y))
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        
    #Compute model accuracy on training and testing data
    
    model.eval()
    with torch.no_grad():
        for x, y in tqdm(test_loader):
            x = x.to('cuda')
            y = y.to('cuda')
            pred = model(x)
            testing_accs.append(pixel_accuracy(pred[0], y))
    
    
    print(f"Training Accuracy: {sum(training_accs)/len(training_accs)}")
    print(f"Testing Accuracy: {sum(testing_accs)/len(testing_accs)}")
      
    
    if loss < best:
            best = loss
            torch.save(model, "model.pth")
            print("Saving model.")

In [None]:
valid_accs = []
with torch.no_grad():
        for x, y in tqdm(valid_loader):
            x = x.to('cuda')
            y = y.to('cuda')
            pred = model(x)
            valid_accs.append(pixel_accuracy(pred[0], y))
print(f"Testing Accuracy: {sum(valid_accs)/len(valid_accs)}")

In [None]:
def predict_image(model, image, gt_mask=None):
    model.eval()
    if gt_mask is None:
        gt_mask = np.ones((1, 1))
    with torch.no_grad():
        print(gt_mask.sum())
        image = image.astype("float32") / 255.0 if gt_mask.sum() == 1 else image.astype("float32")
        image = cv2.resize(image, (256, 256))

        orig_image = image.copy()

        #gt_mask = cv2.resize(mask, (256, 256)) if gt_mask.sum() > 0 else None

        image = np.transpose(image, (2, 0, 1)) #Move channel axis to front
        image = np.expand_dims(image, 0)

        #Make prediction from image input
        image = torch.from_numpy(image).to("cuda")
        pred_mask = model(image)[0].squeeze()
        
        pred_mask = torch.sigmoid(pred_mask)
        pred_mask = pred_mask.cpu().numpy()
        """arr = np.ones((256, 256), np.uint8)
        arr[pred_mask>0.15] = 0
        arr[pred_mask<0.15] = 1
        print(np.unique(arr))
        orig_image[:,:,0] *= arr
        orig_image[:,:,1] *= arr
        orig_image[:,:,2] *= arr"""
        f, axarr = plt.subplots(1,3, figsize=(15,15))
        
        axarr[0].title.set_text('Original Image')
        axarr[1].title.set_text('GT Segmentation Mask')
        axarr[2].title.set_text('Predicted Segmentation Mask')
        axarr[0].imshow(cv2.cvtColor(orig_image, cv2.COLOR_BGR2RGB))
        axarr[1].imshow(gt_mask) #if gt_mask is None else None
        axarr[2].imshow(pred_mask)
    

In [None]:
img = cv2.imread("person.png")

predict_image(model, img)

In [None]:
num = 10

batch = test.__getitem__(num) 

img = np.transpose(batch[0], (1, 2, 0)).cpu().numpy()

gt_mask = np.moveaxis(batch[1].numpy(), 0, -1)

predict_image(model, img, gt_mask)