In [1]:
import pygad
import pygad.cnn
import pygad.gacnn
import pygad.torchga

import torch
from torch.utils.data import random_split
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from tqdm import tqdm

from sklearn.metrics import balanced_accuracy_score
from torchmetrics.classification import Accuracy
import matplotlib.pyplot  as plt 

import numpy as np
import pipe  as pp
import time
import cv2 as cv
import glob
import os

import auto_labeling.guided as alg
sample_shape = (512,512,3)

# Defining model
----------------

In [2]:
# Build the PyTorch model.
input_layer = torch.nn.Conv2d(in_channels=3, out_channels=30, kernel_size=15)
relu_layer1 = torch.nn.ReLU()
max_pool1   = torch.nn.MaxPool2d(kernel_size=7, stride=5)

conv_layer2 = torch.nn.Conv2d(in_channels=30, out_channels=15, kernel_size=11)
relu_layer2 = torch.nn.ReLU()

conv_layer3 = torch.nn.Conv2d(in_channels=15, out_channels=7, kernel_size=21)
relu_layer3 = torch.nn.ReLU()

conv_layer4 = torch.nn.Conv2d(in_channels=7, out_channels=5, kernel_size=15)
relu_layer4 = torch.nn.ReLU()

conv_layer5 = torch.nn.Conv2d(in_channels=5, out_channels=1, kernel_size=3)
relu_layer5 = torch.nn.ReLU()

flatten_layer1 = torch.nn.Flatten()
# The value 100 is pre-computed by tracing the sizes of the layers' outputs.
dense_layer1 = torch.nn.Linear(in_features=2809, out_features=100)
relu_layer6 = torch.nn.ReLU()

dense_layer2 = torch.nn.Linear(in_features=100, out_features=50)
relu_layer7 = torch.nn.ReLU()

dense_layer3 = torch.nn.Linear(in_features=50, out_features=5)
output_layer = torch.nn.Sigmoid()

model = torch.nn.Sequential(input_layer,
                            relu_layer1,
                            max_pool1,
                            conv_layer2,
                            relu_layer2,
                            conv_layer3,
                            relu_layer3,
                            conv_layer4,
                            relu_layer4,
                            conv_layer5,
                            relu_layer5,
                            flatten_layer1,
                            dense_layer1,
                            relu_layer6,
                            dense_layer2,
                            relu_layer7,
                            dense_layer3,
                            output_layer)

In [3]:
model_test = torch.nn.Sequential(input_layer,
                                 relu_layer1,
                                 max_pool1,
                                 conv_layer2,
                                 relu_layer2,
                                 conv_layer3,
                                 relu_layer3,
                                 conv_layer4,
                                 relu_layer4,
                                 conv_layer5,
                                 relu_layer5,
                                 flatten_layer1)

In [4]:
# testing how many inputs after convolutions layers do we need
torch_ga = pygad.torchga.TorchGA(model=model,
                                 num_solutions=1)
shp = np.array(torch_ga.population_weights).shape
initial_population = list(np.random.rand(*shp))
pred = pygad.torchga.predict(model=model_test, solution=initial_population[0], 
                             data=torch.zeros(sample_shape[2], sample_shape[0], sample_shape[1]))
print(f"Flatten layer output dim: {pred.shape[1]}")

Flatten layer output dim: 2809


In [6]:
# testing that our model is correct and lets check amount of parameters
pred = pygad.torchga.predict(model=model, solution=initial_population[0], 
                             data=torch.zeros(sample_shape[2], sample_shape[0], sample_shape[1]))
print(f"Amount of free parameters: {shp[1]}")

Amount of free parameters: 415288


In [7]:
print(model)

Sequential(
  (0): Conv2d(3, 30, kernel_size=(15, 15), stride=(1, 1))
  (1): ReLU()
  (2): MaxPool2d(kernel_size=7, stride=5, padding=0, dilation=1, ceil_mode=False)
  (3): Conv2d(30, 15, kernel_size=(11, 11), stride=(1, 1))
  (4): ReLU()
  (5): Conv2d(15, 7, kernel_size=(21, 21), stride=(1, 1))
  (6): ReLU()
  (7): Conv2d(7, 5, kernel_size=(15, 15), stride=(1, 1))
  (8): ReLU()
  (9): Conv2d(5, 1, kernel_size=(3, 3), stride=(1, 1))
  (10): ReLU()
  (11): Flatten(start_dim=1, end_dim=-1)
  (12): Linear(in_features=2809, out_features=100, bias=True)
  (13): ReLU()
  (14): Linear(in_features=100, out_features=50, bias=True)
  (15): ReLU()
  (16): Linear(in_features=50, out_features=5, bias=True)
  (17): Sigmoid()
)


# Loading and preparing data
-------------

In [8]:
# function to load data automatically basing on shape
def load_data(path, shape):
    fl_npylike = glob.glob(os.path.join(path, "*.npy"))
    fl_basenames = list(fl_npylike | pp.map(os.path.basename))

    fl_images   = next(fl_basenames | pp.filter(lambda s: "images"       in s and f"{shape[0]}_{shape[1]}" in s))
    fl_gtruth   = next(fl_basenames | pp.filter(lambda s: "ground_truth" in s and f"{shape[0]}_{shape[1]}" in s))
    fl_metadata = next(fl_basenames | pp.filter(lambda s: "metadata"     in s and f"{shape[0]}_{shape[1]}" in s))
    fl_params   = next(fl_basenames | pp.filter(lambda s: "parameters"   in s and f"{shape[0]}_{shape[1]}" in s))
    fl_fitness  = next(fl_basenames | pp.filter(lambda s: "fitness"      in s and f"{shape[0]}_{shape[1]}" in s))

    imgs = np.load(os.path.join(path, fl_images))
    imgs_torch   = torch.from_numpy(imgs).permute(0,3,1,2) / 255
    #target_torch = torch.from_numpy(np.hstack( [np.load(os.path.join(path, fl_params)), 
    #                                            np.atleast_2d(np.load(os.path.join(path, fl_fitness))).T] ))
    target_torch = torch.from_numpy(np.load(os.path.join(path, fl_params)))
    gtruth   = torch.from_numpy(np.load(os.path.join(path, fl_gtruth))) / 255
    metadata = np.load(os.path.join(path, fl_metadata))

    return imgs, imgs_torch, target_torch, gtruth, metadata

In [9]:
images, images_torch, target_torch, gtruth, metadata = load_data("../dataset/", sample_shape)

In [10]:
# centering and normalizing outputs to use sigmoid layer
target_max = target_torch.max(axis=0).values
target_min = target_torch.min(axis=0).values
target_sigmoid = (target_torch - target_min)/(target_max - target_min)

# lets check
print(target_sigmoid.max(axis=0).values)
print(target_sigmoid.min(axis=0).values)

tensor([1., 1., 1., 1., 1.], dtype=torch.float64)
tensor([0., 0., 0., 0., 0.], dtype=torch.float64)


In [11]:
# normalizing images
mean = torch.atleast_3d(images_torch.mean(axis=(0, 2, 3))).permute(1, 0, 2)
std  = torch.atleast_3d(images_torch.std(axis=(0, 2, 3))).permute(1, 0, 2)
images_torch_norm = (images_torch - mean)/std
#images_torch_norm = images_torch

In [12]:
# lets define custom dataset
class ImageDataset(Dataset):
    def __init__(self, images, targets):
        self.images  = images
        self.targets = targets

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        return self.images[idx].float(), self.targets[idx].float()

In [13]:
dataset = ImageDataset(images_torch_norm, target_sigmoid)

# Traning network
-----------

In [14]:
# define training hyperparameters
INIT_LR = 1e-2
BATCH_SIZE = 15
# define the train and val splits
TRAIN_SPLIT = 0.70
VAL_SPLIT   = (1 - TRAIN_SPLIT)/2
TEST_SPLIT  = (1 - TRAIN_SPLIT)/2
# set the device we will be using to train the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [15]:
numTrainSamples = int(len(dataset) * TRAIN_SPLIT)
numValSamples   = int(len(dataset) * VAL_SPLIT  )
numTestSamples  = len(dataset) - numTrainSamples - numValSamples
(trainData, valData, testData) = random_split(dataset,
									          [numTrainSamples, numValSamples, numTestSamples],
									          generator=torch.Generator().manual_seed(0))

In [16]:
# initialize the train, validation, and test data loaders
trainDataLoader = DataLoader(trainData, shuffle=True, batch_size=BATCH_SIZE)
valDataLoader   = DataLoader(valData,   batch_size=BATCH_SIZE)
testDataLoader  = DataLoader(testData,  batch_size=BATCH_SIZE)

# calculate steps per epoch for training and validation set
trainSteps = len(trainDataLoader.dataset) // BATCH_SIZE
valSteps   = len(valDataLoader.dataset)   // BATCH_SIZE

In [17]:
# initialize our optimizer and loss function
opt = torch.optim.SGD(model.parameters(), lr=INIT_LR)
lossFn = torch.nn.L1Loss(reduction='mean')

In [18]:
# initialize a dictionary to store training history
H = {
	"train_loss": [],
	"val_loss": [],
}

In [None]:
print("[INFO] training the network...")
startTime = time.time()

EPOCHS = 1000

model.to(device)
progress_bar = tqdm(range(0, EPOCHS))
# loop over our epochs
for e in progress_bar:
	# set the model in training mode
	model.train()
	# initialize the total training and validation loss
	totalTrainLoss = 0
	totalValLoss = 0
	# initialize the number of correct predictions in the training
	# and validation step
	# loop over the training set
	for (x, y) in trainDataLoader:
		# send the input to the device
		(x, y) = (x.to(device), y.to(device))
		# perform a forward pass and calculate the training loss
		pred = model(x)
		loss = lossFn(pred, y)
		# zero out the gradients, perform the backpropagation step,
		# and update the weights
		opt.zero_grad()
		loss.backward()
		opt.step()
		# add the loss to the total training loss so far
		totalTrainLoss += loss
	
	# switch off autograd for evaluation
	with torch.no_grad():
		# set the model in evaluation mode
		model.eval()
		# loop over the validation set
		for (x, y) in valDataLoader:
			# send the input to the device
			(x, y) = (x.to(device), y.to(device))
			# make the predictions and calculate the validation loss
			pred = model(x)
			totalValLoss += lossFn(pred, y)

	# calculate the average training and validation loss
	avgTrainLoss = totalTrainLoss / trainSteps
	avgValLoss   = totalValLoss / valSteps
	# update training history
	H["train_loss"].append(avgTrainLoss.cpu().detach().numpy())
	H["val_loss"].append(avgValLoss.cpu().detach().numpy())
	# print the model training and validation information
	progress_bar.set_description(f"Train loss: {avgTrainLoss:.6f} | Val loss: {avgValLoss:.6f}")

endTime = time.time()

In [18]:
# Export to TorchScript and save
model_scripted = torch.jit.script(model) 
model_scripted.save('PST_EST_model_scripted_v1.pt')

In [19]:
# Save metadata
torch.save({
            'shift': target_min,
            'gain': target_max-target_min,
            'shape': sample_shape,
            'image_mean': mean,
            'image_std' : std,
            }, "PST_EST_model_metadata_v1.pkl")