In [11]:
import torch.nn as nn
import pickle
import numpy as np
import torch
import torchvision
import torch.nn.functional as F
import time
import matplotlib.pyplot as plt
import numpy as np
import cv2

# Reading in Data (Unet Array, Labels/Masks)

In [12]:
# Mask array, which is the labels
file_nameMasks = "Masks.pkl"

open_file = open(file_nameMasks, "rb")
labels = pickle.load(open_file)
open_file.close()

In [13]:
a = np.moveaxis(labels[0], 0, 2)

In [14]:
print(np.shape(a))

(256, 256, 3)


In [15]:
cv2.imwrite('test.png', a)

True

In [16]:
print(np.shape(labels))

(197, 3, 256, 256)


In [79]:
# Unet array, which is the training data
file_nameUnet = "UnetArray.pkl"

open_file = open(file_nameUnet, "rb")
unetArray = pickle.load(open_file)
open_file.close()

In [80]:
print(np.shape(unetArray))

(197, 5, 256, 256)


# UNET MODEL

In [81]:
class Block(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.conv1 = nn.Conv2d(in_ch, out_ch, 3)
        self.relu  = nn.ReLU()
        self.conv2 = nn.Conv2d(out_ch, out_ch, 3)
        self.batchnorm = nn.BatchNorm2d(out_ch)
    
    def forward(self, x):
        return self.relu(self.batchnorm(self.conv2(self.relu(self.batchnorm(self.conv1(x))))))

In [82]:
class Encoder(nn.Module):
    def __init__(self, chs=(5,64,128,256,512,1024)):
        super().__init__()
        self.enc_blocks = nn.ModuleList([Block(chs[i], chs[i+1]) for i in range(len(chs)-1)])
        self.pool = nn.MaxPool2d(2)
    
    def forward(self, x):
        ftrs = []
        for block in self.enc_blocks:
            x = block(x)
            ftrs.append(x)
            x = self.pool(x)
        return ftrs

In [83]:
class Decoder(nn.Module):
    def __init__(self, chs=(1024, 512, 256, 128, 64)):
        super().__init__()
        self.chs         = chs
        self.upconvs    = nn.ModuleList([nn.ConvTranspose2d(chs[i], chs[i+1], 2, 2) for i in range(len(chs)-1)])
        self.dec_blocks = nn.ModuleList([Block(chs[i], chs[i+1]) for i in range(len(chs)-1)]) 
        
    def forward(self, x, encoder_features):
        for i in range(len(self.chs)-1):
            x        = self.upconvs[i](x)
            enc_ftrs = self.crop(encoder_features[i], x)
            x        = torch.cat([x, enc_ftrs], dim=1)
            x        = self.dec_blocks[i](x)
        return x
    
    def crop(self, enc_ftrs, x):
        _, _, H, W = x.shape
        enc_ftrs   = torchvision.transforms.CenterCrop([H, W])(enc_ftrs)
        return enc_ftrs

In [84]:
class UNet_Train(nn.Module):
    def __init__(self, enc_chs=(5,64,128,256,512,1024), dec_chs=(1024, 512, 256, 128, 64), num_class=1, retain_dim=True, out_sz=(256,256)):
        super().__init__()
        self.encoder     = Encoder(enc_chs)
        self.decoder     = Decoder(dec_chs)
        self.head        = nn.Conv2d(dec_chs[-1], num_class, 1)
        #print(dec_chs[-1])
        self.retain_dim  = retain_dim
        self.out_sz=out_sz

    def forward(self, x):
        enc_ftrs = self.encoder(x)
        out      = self.decoder(enc_ftrs[::-1][0], enc_ftrs[::-1][1:])
        out      = self.head(out)
        if self.retain_dim:
            out = F.interpolate(out, (256, 256))
        return out

In [85]:
model = UNet_Train()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_func = nn.BCEWithLogitsLoss()#.to(device)

In [86]:
a = torch.FloatTensor(unetArray[0])
a = a.unsqueeze(0)
print(np.shape(a))
print(model(a))
print(np.shape(model(a)))
print(np.shape(model(a).squeeze(0).squeeze(0)))

torch.Size([1, 5, 256, 256])
tensor([[[[ 0.2864,  0.2864,  0.2864,  ...,  0.5565,  0.5565,  0.5565],
          [ 0.2864,  0.2864,  0.2864,  ...,  0.5565,  0.5565,  0.5565],
          [ 0.2864,  0.2864,  0.2864,  ...,  0.5565,  0.5565,  0.5565],
          ...,
          [-0.5162, -0.5162, -0.5162,  ...,  0.3229,  0.3229,  0.3229],
          [-0.5162, -0.5162, -0.5162,  ...,  0.3229,  0.3229,  0.3229],
          [-0.5162, -0.5162, -0.5162,  ...,  0.3229,  0.3229,  0.3229]]]],
       grad_fn=<UpsampleNearest2DBackward1>)
torch.Size([1, 1, 256, 256])
torch.Size([256, 256])


In [87]:
print(np.shape(unetArray))
print(np.shape(labels))
dataLab = torch.FloatTensor(labels[0][0])

print(np.shape(dataLab.unsqueeze(0)))

(197, 5, 256, 256)
(197, 3, 256, 256)
torch.Size([1, 256, 256])


In [88]:
for epoch in range(10):
    loss = 0
    t0 = 0
    t1 = 0
    for iteration in range(30):
        
        optimizer.zero_grad()
        data = torch.FloatTensor(unetArray[iteration]).unsqueeze(0)
        t0 = time.time()
        pred_mask = model(data)
        dataLabel = torch.FloatTensor(labels[iteration][0])
        loss = loss_func(pred_mask, dataLabel.unsqueeze(0).unsqueeze(0))
        loss.backward()
        optimizer.step()
        t1 = time.time()
    print('[Epoch:{}], cost = {}, time = {}'.format(epoch+1, loss, t1-t0))
print('training Finished!')

[Epoch:1], cost = -3.7944793701171875, time = 1.1439402103424072
[Epoch:2], cost = -3.363457679748535, time = 1.1229968070983887
[Epoch:3], cost = -2.3319833278656006, time = 2.8661680221557617
[Epoch:4], cost = -2.6849348545074463, time = 1.5470249652862549
[Epoch:5], cost = -12.395492553710938, time = 1.2108588218688965
[Epoch:6], cost = -6.081043243408203, time = 1.3194713592529297
[Epoch:7], cost = -6.0744948387146, time = 1.1359615325927734
[Epoch:8], cost = -13.09618091583252, time = 1.0681428909301758
[Epoch:9], cost = -18.882362365722656, time = 1.876979112625122
[Epoch:10], cost = -11.894464492797852, time = 1.257636547088623
training Finished!
