<a href="https://colab.research.google.com/github/flosch9/deep_learning_home_exam/blob/main/Task_02.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Mount drive for loading data

In [1]:
from google.colab import drive

drive.mount("/content/drive")

Mounted at /content/drive


# Define and load model (model.py)

```
# Als Code formatiert
```



In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from torchvision.models import vgg16, VGG16_Weights


class FCN32(nn.Module):
    def __init__(self, output_size = (128,128), num_classes = 2, small_vgg = False):
        super(FCN32, self).__init__()

        # first part is regualr vgg16 (without batch normalization)?
        if small_vgg:
          self.features = vgg16(weights = VGG16_Weights.IMAGENET1K_V1).features[0:28]
        else:
          self.features = vgg16(weights = VGG16_Weights.IMAGENET1K_V1).features


        # only choose some parts of vgg16? [0:28]
        # set ceil mode to true
        #self.features[6].ceil_mode = True
        #self.features[13].ceil_mode = True
        #self.features[23].ceil_mode = True
        #self.features[33].ceil_mode = True
        #self.features[43].ceil_mode = True

        # classifier is now replaced with another cnn (instead of a fc)
        self.classifier = nn.Sequential(
            nn.Conv2d(512, 4024, kernel_size=(3,3), stride=(1,1), padding=(1,1)), # 512 output from last cnn/maxpool layer #maybe only 1000 channels
            # use filter size 1024 or 4096
            nn.ReLU(True),
            #with Relu? with Batchnorm? with maxpool?
            # 7x7 filter
            nn.Conv2d(4024, num_classes, kernel_size=1, stride=(1,1), padding=(1,1))
            #nn.ReLU(True)

            #nn.Softmax()
            #softmax produces niceer output in the end

            # makes difference in the output, and in the loss which (none) activation is used
        )
        self.upsample = nn.Sequential(
            # what is with upsampling meant? this (just resizing) or the deconvolution before upsample and transposeconv2d the same???
            # this one is not trainable but easier
            nn.UpsamplingBilinear2d(size=(output_size)),
            nn.Softmax()

            #nn.ConvTranspose2d(num_classes, num_classes, kernel_size=4, stride=2, padding=0)#, output_padding=0, groups=1, bias=True, dilation=1, padding_mode='zeros', device=None, dtype=None)
            # then output needs to be adjusted and classes in one channel
            # try to set ceiling of maxpool to true
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        #print("Output after classifier")
        #print(x.shape)
        x = self.upsample(x)
        #print("Output after upsampling")
        #print(x.shape)
        return x

#model = FCN32(output_size=(128,128))

# print outputs of a model
#print(vgg16_bn().features)
#print(vgg16_bn().classifier)

#print(vgg16_bn())
#model = FCN32()

# print(FCN32().features)
# set ceil_modes to true
#print(model.features[6].ceil_mode)



# Mount drive to load data from drive

In [28]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [29]:
PATH = "/content/drive/MyDrive/Deep_learning_home_exam"

# Define train and test functions (traintestfuncs.py)

In [14]:
import csv
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from torchvision.datasets import OxfordIIITPet
from torch.utils.data import DataLoader
from torchvision.transforms import PILToTensor, ToTensor, ToPILImage, Resize

# not needed in notebook
#from model import FCN32


def smallerDataset(dataset, size):

    smaller_dataset = []

    for i in range(size):
        smaller_dataset.append(dataset[i])

    return(smaller_dataset)


# since the dataloader can t handle the PIL Image class of the original dataset
# for each batch the lables and images have to be stacked to a tensor manually
# in this step also the PIL Images are transformed to a tensor
# do not use ToTensor for target image since it destroys the class
def customCollate(batch, resize):

    images = []
    labels_orig = []
    labels_merched = []

    for dataset in batch:
        image = ToTensor()(dataset[0])
        label = PILToTensor()(dataset[1]) # important else target fckd up with classes
        label = label.view(resize) #to get rid of the (implizit) given channel

        label = label.long() # also importantz for CE-Loss, excpects long
        #print(torch.unique(label))
        # merching classes background and border
        label_merched = torch.where(label == 3, 1, label)
        #print(torch.unique(label))
        label_merched = torch.sub(label_merched, 1) # also important since 3 classes -> [0,3), but original it was [1,3]
        #print(torch.unique(label_merched))
        label_orig = torch.sub(label, 1)
        #print(torch.unique(label_orig))

        images.append(image)
        labels_merched.append(label_merched)
        labels_orig.append(label_orig)

    return(torch.stack(images), torch.stack(labels_merched), torch.stack(labels_orig))


def showLosses(training_loss, test_loss, path_to_save):
  return()


def testModel(model, lossFunction, test_dataload):

  test_loss = 0

  for i, dataset in enumerate(test_dataload):

        # unpack images and labels of batch
        images , labels, labels_orig = dataset

        model.eval()

        output = model(images)

        # compute loss
        loss = lossFunction(output, labels)

        test_loss += loss.item()

  return(test_loss)


def trainBatch(model, lossFunction, Optimizer,
              train_dataload, test_dataload,
              NUMBEREPOCHS, FOLDERPATH):

  running_loss = 0
  batch_number = 0

  for i, dataset in enumerate(train_dataload):

      # unpack images and labels of batch
      images , labels, _ = dataset

      # reset optimizer gradients
      Optimizer.zero_grad()

      # forward pass
      outputs = model(images)

      # compute loss
      loss = lossFunction(outputs, labels)

      loss.backward()
      Optimizer.step()

      running_loss += loss.item()
      batch_number += 1



  return(running_loss, batch_number)


def trainModel(model, lossFunction, Optimizer,
              train_dataload, test_dataload,
              NUMBEREPOCHS, FOLDERPATH):

  training_loss = np.zeros(NUMBEREPOCHS)
  test_loss = np.zeros(NUMBEREPOCHS)

  for epoch in range(NUMBEREPOCHS):

    # train every batch
    running_loss, batch_number = trainBatch(model, lossFunction,
                                           Optimizer, train_dataload,
                                           test_dataload, NUMBEREPOCHS,
                                           FOLDERPATH)

    # test in each epoch
    test_loss[epoch] = testModel(model, lossFunction, test_dataload)

    # print epoch statistics
    training_loss[epoch] = running_loss / batch_number
    print("\nEpoch [{}/{}]".format(epoch+1,NUMBEREPOCHS))
    print("{} updates in this epoch.".format(batch_number))
    print("Trainingloss: {}".format(training_loss[epoch]))
    print("Testingloss: {}".format(test_loss[epoch]))



  return(training_loss, test_loss)


def evaluateFCN():
  return()


def test_model(model, test_images, test_labels_merched, original_labels):

    model.eval()

    output = model(test_images)

    show_output(test_images[0], output[0])

    return()




# short function for displaying initial image next to segmentation
def display_data(data_point):

    fig, ax = plt.subplots(nrows=1, ncols=2, figsize =(10,6))
    ax[0].imshow(ToPILImage()(data_point[0][0]))
    ax[1].imshow(ToPILImage()(data_point[1][0]))
    fig.tight_layout()
    plt.show()

    return()

###############################################################################
# testing model and displaying outputs
###############################################################################

def show_output(initial_image, model_output):

    #print("Shape initial: {}".format(initial_image.shape)) #(3, H,W)
    #print("Shape segmentation: {}".format(model_output.shape)) #(2,H,W)
    #print("Unique values segmentation: {}".format(torch.unique(model_output)))
    #print("Output segmentation:")
    #print(model_output)

    model_output = model_output[1,:,:]

    fig, ax = plt.subplots(nrows=1, ncols=2, figsize =(10,6))
    ax[0].imshow(ToPILImage()(initial_image))
    ax[1].imshow(ToPILImage()(model_output), cmap = "gray")
    fig.tight_layout()
    plt.show()



    return()


def show_uncertainity_map():
    return()




# Train and evaluate model (main.py)

In [15]:
#import utils
import os
import sys
import json

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from torchvision.models import vgg16, VGG16_Weights


import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from torchvision.datasets import OxfordIIITPet
from torch.utils.data import DataLoader
from torchvision.transforms import PILToTensor, ToTensor, ToPILImage, Resize

#from model import FCN32
#import traintestfuncs

#from test import show_output, test_model, show_uncertainity_map

###############################################################################
# general Parameters for training (make adjustments here)
###############################################################################
# Path for notebook
ROOTPATH = "/content/drive/MyDrive/Deep_learning_home_exam/data/problem2/"

# Path for local
#ROOTPATH = "home_exam\data\problem2\\"

# for saving model with trained parameters
VERSION = "02"

MODE = "train" # "train" or "evaluate"

# parameters for training
NUMBEREPOCHS = 10
BATCHSIZE = 2
LEARNINGRATE = 0.001

# resize dataset to see faster runtrough
SMALLDATASET = True
smaller_train_dataset_size = 10
smaller_test_dataset_size = 10


# resizing images of dataset so that they all have the same size
# (which they originaly dont have)
RESIZE =(512,512)
# 56 too small
#  128 #TODO
# 256 ok -> version 01
# resize to smaller images (not really good)

# bigger image better, together with drop of last two maxpool layers

# resize to smaller image maybe
# use kernelsize 7x7 in last step of convolution output
# google collab train on

PRETRAINED = True
SMALLVGG = False

model = FCN32(output_size=RESIZE, num_classes=2, small_vgg = SMALLVGG)

lossFunction = nn.CrossEntropyLoss() #nn.MSELoss()

Optimizer = optim.Adam(model.parameters(),lr = LEARNINGRATE) #optim.SGD()

# maybe only train last part
# turn grad of pre layers off

#for name, param in model.named_parameters():
#    print(name, param.requires_grad)

print("--------------------------------------------------------------------\n")

###############################################################################
# directory for output files for model version / loading model
###############################################################################

foldername = "FCN_v" + VERSION + "/"
FOLDERPATH = os.path.join(ROOTPATH, foldername)

###############################################################################
# save parameters of model
###############################################################################

if MODE == "train":
  if not os.path.exists(FOLDERPATH):
    os.mkdir(FOLDERPATH)

    parameters = {"Modelversion" : VERSION,
                  "Batchsize" : BATCHSIZE,
                  "Learningrate" : LEARNINGRATE,
                  "Number Epochs" : NUMBEREPOCHS,
                  "Image resize" : RESIZE,
                  "untouched pretrained weights" : PRETRAINED,
                  "Smaller VGG" : SMALLVGG,
                  "Optimizer" : str(Optimizer),
                  "Model architectur" : str(model)}

    with open(FOLDERPATH + "training_parameters.json", 'w') as file:
      json.dump(parameters, file)

    print("File with trainingparameters of model saved in {}.".format(FOLDERPATH))
    print("\n------------------------------------------------------------------\n")

  else:
    test = 1
    #sys.exit("Folder for model allready exists.")

###############################################################################
# reload model for evaluation
###############################################################################

if MODE == "evaluate":

  if not os.path.exists(os.path.join(FOLDERPATH + "training_parameters.json")):
    sys.exit("File with parameters doesnt exists")

  with open(FOLDERPATH + "training_parameters.json", newline='') as file:
    parameters = json.load(file)

  VERSION = parameters["Modelversion"]
  BATCHSIZE = parameters["Batchsize"]
  LEARNINGRATE = parameters["Learningrate"]
  NUMBEREPOCHS = parameters["Number Epochs"]
  RESIZE = parameters["Image resize"]
  PRETRAINED = parameters["untouched pretrained weights"]
  SMALLVGG = parameters["Smaller VGG"]

  print("Loaded model version {} for evaluation".format(VERSION))
  print("\n------------------------------------------------------------------\n")

###############################################################################
# load dataset for training
###############################################################################

# open training data, has 3680 samples
# transform images (input images and lapels / mregion maps)to same sizes
pets_train = OxfordIIITPet(root=ROOTPATH, split="trainval",
                           transform =Resize(RESIZE),
                           target_transform = Resize(RESIZE),
                           target_types="segmentation", download=False)

# open test data, has 3669 samples
# transform images (input images and lapels / mregion maps)to same sizes
pets_test = OxfordIIITPet(root=ROOTPATH, split="test",
                          transform = Resize(RESIZE),
                          target_transform = Resize(RESIZE),
                          target_types="segmentation", download=False)

# use smaller dataset for faster runntime during debugging
if SMALLDATASET:
    print("Use reduced size of dataset for faster runtime. ONLY FOR DEBUGGING!\n")
    pets_train = smallerDataset(pets_train, smaller_train_dataset_size)
    pets_test = smallerDataset(pets_test, smaller_test_dataset_size)

# load data for training, use custom collate function
# to handle non-tensor format of original dataset
train_dataload = DataLoader(pets_train, batch_size=BATCHSIZE,
                            shuffle=True,
                            collate_fn= lambda b, params=RESIZE: customCollate(b, params))

# load data for testing, use custom collate function
# to handle non-tensor format of original dataset
test_dataload = DataLoader(pets_test, batch_size=BATCHSIZE,
                           shuffle=True,
                           collate_fn= lambda b, params=RESIZE: customCollate(b, params))

print("Dataset loaded.")
print("\n------------------------------------------------------------------\n")

###############################################################################
# training and testing
###############################################################################

if MODE == "train":
  print("Starting training.")

  if PRETRAINED:
    print("Dont train pretrained weights of the VGG16 features.")
    for param in model.features.parameters():
      param.requires_grad = False

  training_loss, test_loss = trainModel(model, lossFunction, Optimizer,
                                        train_dataload, test_dataload,
                                        NUMBEREPOCHS, FOLDERPATH)

  #plotLosses(training_loss, test_loss, FOLDERPATH)


  print("Finished training.")
  print("\n------------------------------------------------------------------\n")

###############################################################################
# only evaluation
###############################################################################

if MODE == "evaluate":
  print("Starting evaluation.")


  print("Finished evaluation.")
  print("\n------------------------------------------------------------------\n")




with open(name_model + "loss_batches.csv", 'w') as file:
  file.write("batch nr.; loss \n")

with open(name_model + "loss_epochs.csv", 'w') as file:
  file.write("epoch nt.; loss \n")

for epoch in range(number_epochs):

  model.train()

  data_size = len(pets_train)

  running_loss = 0
  batch_number = 0 #is equal to batch_size (just for debugging here)

  for i, dataset in enumerate(train_dataload):

    # unpack images and labels of batch
    images , labels, _ = dataset

    #print("Size of labels (targets):")
    #print(labels.shape) #(2, imagesize) (N,H,W)

    # reset optimizer gradients
    optimizer.zero_grad()

    # forward pass
    outputs = model(images)

    #print("Size of outputs (after model):")
    #print(outputs.shape) #(2, 2, imagesize) (N,C,H,W)

    # compute loss
    loss = loss_function(outputs, labels)

    # backward pass and optimization
    loss.backward()
    optimizer.step()

    running_loss += loss.item()
    batch_number += 1

    # report of batch at the moment
    #print("Batch number {} / {} in Epoch {}".format(i+1,int(data_size/batch_size), epoch+1))
    #print("Loss for this batch: {}".format(loss))

    # write loss for batch in file
    with open(name_model + "loss_batches.csv", 'a') as file:
      file.write("{};\t{}\n".format(i+1, loss))

    # Print epoch statistics
    epoch_loss = running_loss / batch_number
    print("Epoch [{}/{}], Loss: {}".format(epoch+1,number_epochs,epoch_loss))
    print("{} updates in this epoch.".format(batch_number))



    # write loss for epoch in file
    with open(name_model + "loss_epochs.csv", 'a') as file:
      file.write("{};\t{}\n".format(epoch, epoch_loss))

torch.save(model.state_dict(), name_model + "trained" )
print("Training finished, model saved.")

# show output after training
show_output(images[0], outputs[0])


print("\n------------------------------------------------------------------\n")

# test model

batch_number = 0
running_loss = 0
for i, dataset in enumerate(test_dataload):

        # unpack images and labels of batch
        images , labels, labels_orig = dataset


        model.eval()

        output = model(images)



        # compute loss
        loss = loss_function(output, labels)

        running_loss += loss.item()
        batch_number += 1



        # report of batch at the moment
        #print("Batch number {} / {} in Epoch {}".format(i+1,int(data_size/batch_size), epoch+1))
        #print("Loss for this batch: {}".format(loss))

        # write loss for batch in file
        with open(name_model + "_loss_test.csv", 'a') as file:
            file.write("{};\t{}\n".format(i+1, loss))

test_loss = running_loss / batch_number
print("Testing, Loss: {}".format(test_loss))
print("Testing with {} batches".format(batch_number))

# show output after testing
show_output(images[0], output[0])





--------------------------------------------------------------------

Use reduced size of dataset for faster runtime. ONLY FOR DEBUGGING!

Dataset loaded.

------------------------------------------------------------------

Starting training.
Dont train pretrained weights of the VGG16 features.


  return self._call_impl(*args, **kwargs)



Epoch [1/10]
5 updates in this epoch.
Trainingloss: 0.5211837470531464
Testingloss: 2.6446775794029236


KeyboardInterrupt: 