In [118]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torchvision.io import read_image
from torchvision.transforms import ToTensor, Resize
from torchvision import transforms
import os
import glob
import cv2
from PIL import Image
from sklearn.model_selection import train_test_split

In [119]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

Using cpu device


In [120]:
NUM_CHANNELS = 3
LEARNING_RATE = 1e-3
EPOCHS = 1
BATCH_SIZE = 64
LOSS_FN = nn.BCELoss(reduce=None)
IMAGE_HEIGHT = 1080
IMAGE_WIDTH = 1920
data = []
RESIZE = transforms.Resize((2048, 1024))
NORMALIZE = transforms.Normalize(mean=0, std=1)#TODO:find real mean and std values
TOTENSOR = transforms.ToTensor()

In [121]:
# filepaths
tray1_rgb = "image_data/masks/T01_rgb"
tray1_mask = "image_data/masks/T01_mask"
tray2_rgb = "image_data/masks/T02_rgb"
tray2_mask = "image_data/masks/T02_mask"
tray3_rgb = "image_data/masks/T03_rgb"
tray3_mask = "image_data/masks/T03_mask"

In [122]:
class conv_block(nn.Module):
    def __init__(self, input_channels, output_channels):
        super().__init__()
        self.conv1 = nn.Conv2d(input_channels, output_channels, kernel_size = 3, padding=1)
        self.bn1 = nn.BatchNorm2d(output_channels)
        self.conv2 = nn.Conv2d(output_channels, output_channels, kernel_size = 3, padding=1)
        self.bn2 = nn.BatchNorm2d(output_channels)
        self.relu = nn.ReLU()

    def forward(self, input):
        x = self.conv1(input)
        x = self.bn1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        return x

In [123]:
class encoder_block(nn.Module):
    def __init__(self, input_channels, output_channels):
        super().__init__()
        self.conv = conv_block(input_channels, output_channels)
        self.pool = nn.MaxPool2d((2,2))

    def forward(self, inputs):
        x = self.conv(inputs)
        p = self.pool(x)
        return x, p

In [124]:
class decoder_block(nn.Module):
    def __init__(self, input_channels, output_channels):
        super().__init__()
        self.up = nn.ConvTranspose2d(input_channels, output_channels, kernel_size=2, stride=2, padding=0)
        self.conv = conv_block(output_channels + output_channels, output_channels)
        
    def forward(self, inputs, skip):
        x = self.up(inputs)
        x = torch.cat([x, skip], axis=1)
        x = self.conv(x)
        return x

In [125]:
class UNET(nn.Module):
    def __init__(self):
        super().__init__()

        self.e1 = encoder_block(3, 64)
        self.e2 = encoder_block(64, 128)
        self.e3 = encoder_block(128, 256)
        self.e4 = encoder_block(256, 512)

        self.b = conv_block(512,1024)

        self.d1 = decoder_block(1024, 512)
        self.d2 = decoder_block(512, 256)
        self.d3 = decoder_block(256, 128)
        self.d4 = decoder_block(128, 64)

        self.outputs = nn.Conv2d(64, 1, kernel_size=1, padding=0)

    def forward(self, inputs):

        s1, p1 = self.e1(inputs)
        s2, p2 = self.e2(p1)
        s3, p3 = self.e3(p2)
        s4, p4 = self.e4(p3)

        b = self.b(p4)

        d1 = self.d1(b, s4)
        d2 = self.d2(d1, s3)
        d3 = self.d3(d2, s2)
        d4 = self.d4(d3, s1)

        outputs = self.outputs(d4)

        return outputs

In [126]:
def dataloader(image_dir, mask_dir):
    ds = []
    for X in os.listdir(image_dir):
        for y in os.listdir(mask_dir):
            if (X == y):
                ds.append((X, y))
    return ds

In [127]:
def open_image(X,y):
    X = RESIZE(read_image(f"image_data/masks/T03_rgb/{X}"))
    y = RESIZE(read_image(f"image_data/masks/T03_mask/{y}"))
    X = X.float()
    y = y.float()
    X = NORMALIZE(X)
    y = NORMALIZE(y)
    X = X.float()
    y = y.float()
    
    
    

    return X, y

In [128]:
def train_loop(model, dataset, optimizer, loss_fn):
    loss_function = loss_fn
    for (X,y) in dataset:
        X_var = X
        y_var = y
        X_image, y_image = open_image(X_var, y_var)

        model.train()

        model.zero_grad()
        optimizer.zero_grad()

        prediction = model(X_image.unsqueeze(0)/255)
        loss = loss_function(prediction, y_image.unsqueeze(0)/255)
        loss.backward()
        optimizer.step()

        return(f"Loss: {loss}")



    

In [129]:
data = dataloader(tray3_rgb, tray3_mask)
train_ds, test_ds = train_test_split(data, test_size=0.1, random_state=25)
for (X, y) in train_ds:

    X, y = open_image(X,y)
    
   
    #print(X_img)
    #print(X_img.size())
    #print(y_img.size())





In [130]:
UNET = UNET()
print(UNET)

UNET(
  (e1): encoder_block(
    (conv): conv_block(
      (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU()
    )
    (pool): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  )
  (e2): encoder_block(
    (conv): conv_block(
      (conv1): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU()
    )
    (pool): MaxPool2d(kernel_size=(2, 2), str

In [131]:
adam_optimizer = torch.optim.Adam(UNET.parameters(), lr=LEARNING_RATE)
for iteration in range(EPOCHS):
    print(f"Epoch {iteration}")
    print("---------------------------------------")
    train_loop(UNET, train_ds, adam_optimizer, LOSS_FN)


Epoch 0
---------------------------------------


RuntimeError: all elements of input should be between 0 and 1