In [1]:
import os
import cv2
import tqdm
import itertools
import numpy as np
from tqdm.contrib import tzip
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

In [2]:
import torch
import torch.nn as nn
import torchvision.transforms as T
from torchvision.utils import make_grid
from torchvision.utils import save_image
from torch.utils.data import DataLoader, Dataset

In [None]:
class CCNN(nn.Module):
    
    def __init__(self, in_channels = 1, out_channels = 1, debug = False):
        
        super(CCNN, self).__init__()
        self.debug = debug
        self.in_channels = in_channels
        self.out_channels = out_channels
        
        ## encode:
        
        self.c_0 = nn.Sequential(*[nn.Conv2d(in_channels = self.in_channels, out_channels = 32,
                                            kernel_size=11, padding=5, stride=1)])
        
        self.c_1 = nn.Sequential(*[nn.Conv2d(in_channels = 32, out_channels = 32,
                                            kernel_size=7, padding=3, stride=1)])
        
        self.c_2 = nn.Sequential(*[nn.Conv2d(in_channels = 32, out_channels = 64,
                                            kernel_size=5, padding=3, stride=1)])
        
        self.c_3 = nn.Sequential(*[nn.Conv2d(in_channels = 64, out_channels = 1000,
                                            kernel_size=1, padding=0, stride=1)])
        
        self.c_4 = nn.Sequential(*[nn.Conv2d(in_channels = 1000, out_channels = 400,
                                            kernel_size=1, padding=0, stride=1)])
        
        self.c_5 = nn.Sequential(*[nn.Conv2d(in_channels = 400, out_channels = self.out_channels,
                                            kernel_size=1, padding=0, stride=1)])
        
    
    def forward(self, x):

        debug = self.debug

        ## encode:
        size0 = x.size()
        x = self.c_0(x)
        x = F.relu(x)
        
        size1 = x.size()
        
        x = self.c_1(x)
        x = F.max_pool2d(x, kernel_size = 2, stride = 2)
        x = F.relu(x)
        
        size2 = x.size()
        
        x = self.c_2(x)
        x = F.max_pool2d(x, kernel_size = 2, stride = 2)
        x = F.relu(x)
        
        size3 = x.size()
        
        x = self.c_3(x)
        x = F.relu(x)
        
        size4 = x.size()
        
        x = self.c_4(x)
        x = F.relu(x)
        
        size5 = x.size()
        
        x = self.c_5(x)
        x = F.relu(x)
        
        size6 = x.size()
        
        if debug:
            print("size: {}".format(size1))
            print("size: {}".format(size2))
            print("size: {}".format(size3))
            print("size: {}".format(size4))
            print("size: {}".format(size5))
            print("size: {}".format(size6))
            

        return x

In [3]:
def train(model, train_loader, test_loader, criterion, optimizer, NUM_EPOCHS = 30, log = True, load = False):
    
    criterion0 = torch.nn.MSELoss()
    print('Epochs:\t', NUM_EPOCHS)
    if load:
        model = CCNN()
        model.load_state_dict(torch.load("./model_best.pth"))
    
    losses = []
    t_losses = []
    mseLosses = []
    mse_t_losses = []
    prev_loss = float('inf')

    model.train()

    for epoch in range(NUM_EPOCHS):
        loss_f = []
        mseLoss_f = []
        t_start = time.time()

        for i, (input_tensor, target_tensor) in enumerate(train_loader):
            
            input_tensor = torch.autograd.Variable(input_tensor)
            target_tensor = torch.autograd.Variable(target_tensor)
            target_tensor = torch.unsqueeze(target_tensor, 1)
            
            predicted_tensor = model(input_tensor)
            
            loss = criterion(predicted_tensor, target_tensor)
            mseLoss = criterion0(predicted_tensor, target_tensor)
            if log: print('batch loss:', float(loss))
            if log: print('batch MSE loss:', float(mseLoss))

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            loss_f.append(float(loss))
            mseLoss_f.append(float(mseLoss))

        delta = time.time() - t_start
        
        dataiter = iter(test_loader)
        images, label = dataiter.next()
        input_tensor = torch.autograd.Variable(images)
        target_tensor = torch.autograd.Variable(label)
        predicted_tensor = model(input_tensor)
        target_tensor = torch.unsqueeze(target_tensor, 1)
        test_loss = criterion(predicted_tensor, target_tensor)
        mse_test_loss = criterion0(predicted_tensor, target_tensor)
    
        if np.array(loss_f).mean() < prev_loss:
            prev_loss = np.array(loss_f).mean()
            torch.save(model.state_dict(), './model_best.pth')
        
        losses.append(np.array(loss_f).mean())
        t_losses.append(float(test_loss))
        mseLosses.append(np.array(mseLoss_f).mean())
        mse_t_losses.append(float(mse_test_loss))
        print('test MSE Loss:', float(mse_test_loss))
        print("Epoch #{}\ttrain loss: {:.8f}\ttest loss: {:.8f}\t Time: {:2f}s".format(epoch+1, np.array(loss_f).mean(),t_losses[-1], delta))
        
    return losses, t_losses, mseLosses, mse_t_losses

In [None]:
def val(test_loader, model = None, model_path = ''):
    correctnesses = []
    if len(model_path):
        model = CCNN()
        model.load_state_dict(torch.load(model_path))
    
    model.eval()
    images = []
    labels = []
    results = []
    
    i = 0
    for img, label in test_loader:
        if i > 10:
            break
        i += 1
        img = img[0].to(device)
        label = label[0].cpu().numpy()
        label = label.squeeze()
        output = model(img)
        #a = 1 - torch.count_nonzero((torch.argmax(class_prob, axis = 1)-torch.tensor(label)))/(360*480)
        model_output = output #dr.rev_translate(torch.argmax(class_prob, axis = 1))
        
        images.append(img)
        labels.append(label)
        results.append(model_output)
        #correctnesses.append(a)
    return images, labels, results#, correctnesses