In [4]:
### Neural Network Implementation ###

import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import copy


class elasticityDataset(Dataset):
    def __init__(self, device, numberOfSamples):
        self.E = torch.load("datasetStrain/E.pt", map_location=device, weights_only=False)[:numberOfSamples]
        self.eps = torch.load("datasetStrain/eps.pt", map_location=device, weights_only=False)[:numberOfSamples]
        self.numberOfSamples = len(self.E)

    def __len__(self):
        return self.numberOfSamples

    def __getitem__(self, idx):
        return (self.E[idx], self.eps[idx])


class noActivation(torch.nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        return x


def makeCNNBlocks(channels, numberOfConvolutionsPerBlock, kernelSize, activation, normalization, skipChannels=None,
                  lastActivation=True):
    padding = (kernelSize - 1) // 2
    if skipChannels == None:
        skipChannels = [0 for i in channels]

    convolutions = torch.nn.ModuleList()
    normalizations = torch.nn.ModuleList()
    activations = torch.nn.ModuleList()
    for i in range(len(channels) - 1):
        for j in range(numberOfConvolutionsPerBlock):
            if j == 0:
                inChannels = channels[i] + skipChannels[i]
            else:
                inChannels = channels[i + 1]

            convolutions.append(torch.nn.Conv2d(inChannels,
                                                channels[i + 1],
                                                kernelSize,
                                                stride=1,
                                                padding=padding))
            normalizations.append(normalization(inChannels))

            if i == len(channels) - 2 and j == numberOfConvolutionsPerBlock - 1 and lastActivation == False:
                activations.append(noActivation())
            else:
                activations.append(activation())

    return convolutions, normalizations, activations


class UNet(torch.nn.Module):
    def __init__(self, channels, channelsOut, numberOfConvolutionsPerBlock, kernelSize):
        super().__init__()
        self.kernelSize = kernelSize
        self.channels = channels  # channelsIn is defined implicitly
        self.channelsOut = channelsOut
        self.numberOfConvolutionsPerBlock = numberOfConvolutionsPerBlock

        self.numberOfBottleNeckLayers = self.numberOfConvolutionsPerBlock
        self.channelsDown = copy.deepcopy(self.channels)
        self.channelsUp = copy.deepcopy(self.channels)[::-1]
        self.channelsUp[-1] = self.channelsOut

        self.activation = lambda: torch.nn.LeakyReLU(inplace=True)
        self.normalization = lambda s: torch.nn.BatchNorm2d(s)

        # downsampling
        self.convolutionsDown, self.normalizationsDown, self.activationsDown = makeCNNBlocks(self.channelsDown,
                                                                                             self.numberOfConvolutionsPerBlock,
                                                                                             self.kernelSize,
                                                                                             self.activation,
                                                                                             self.normalization)
        self.downsample = torch.nn.MaxPool2d(kernel_size=2, stride=2)

        # bottleneck
        self.convolutionsBottleneck, self.normalizationsBottleneck, self.activationsBottleneck = makeCNNBlocks(
            [self.channelsDown[-1], self.channelsUp[0]],
            self.numberOfBottleNeckLayers,
            self.kernelSize,
            self.activation,
            self.normalization)

        # upsampling
        skipChannels = self.channelsDown[1:][::-1]
        skipChannels[-1] += 1
        self.convolutionsUp, self.normalizationsUp, self.activationsUp = makeCNNBlocks(self.channelsUp,
                                                                                       self.numberOfConvolutionsPerBlock,
                                                                                       self.kernelSize,
                                                                                       self.activation,
                                                                                       self.normalization,
                                                                                       skipChannels=skipChannels,
                                                                                       lastActivation=False)
        self.upsample = torch.nn.Upsample(scale_factor=2, mode='bilinear')  # could be changed to 'nearest'

    def forward(self, x):
        x0 = x
        x_ = []  # skip connections

        for i in range(len(self.channelsDown) - 1):
            for j in range(self.numberOfConvolutionsPerBlock):
                index = i * self.numberOfConvolutionsPerBlock + j
                x = self.activationsDown[index](self.convolutionsDown[index](self.normalizationsDown[index](x)))
            x_.append(x)
            x = self.downsample(x)

        for j in range(self.numberOfBottleNeckLayers):
            index = j
            x = self.activationsBottleneck[index](
                self.convolutionsBottleneck[index](self.normalizationsBottleneck[index](x)))

        for i in range(len(self.channelsUp) - 1):
            x = torch.cat((self.upsample(x), x_[-(i + 1)]), 1)  # concatenate for skip connections
            if i == len(self.channelsUp) - 2:
                x = torch.cat((x, x0), 1)
            for j in range(self.numberOfConvolutionsPerBlock):
                index = i * self.numberOfConvolutionsPerBlock + j
                x = self.activationsUp[index](self.convolutionsUp[index](self.normalizationsUp[index](x)))

        return x


class FeedforwardCNN(torch.nn.Module):
    def __init__(self, channels, channelsOut, kernelSize):
        super().__init__()
        self.kernelSize = kernelSize
        self.channels = channels + [channelsOut]  # channelsIn is defined implicitly
        #        self.channelsOut = channelsOut

        self.activation = lambda: torch.nn.LeakyReLU(inplace=True)
        self.normalization = lambda s: torch.nn.BatchNorm2d(s)

        self.convolutions, self.normalizations, self.activations = makeCNNBlocks(self.channels, 1, kernelSize,
                                                                                 self.activation, self.normalization,
                                                                                 lastActivation=False)

    def forward(self, x):
        for i in range(len(self.channels) - 1):
            x = self.activations[i](self.convolutions[i](self.normalizations[i](x)))
        return x


class UNetWithSubsequentFeedforwardCNN(torch.nn.Module):
    def __init__(self, channelsUNet, numberOfConvolutionsPerBlockUNet, channelsFeedforwardCNN, channelsOut, kernelSize):
        super().__init__()
        self.uNet = UNet(channelsUNet, channelsFeedforwardCNN[0], numberOfConvolutionsPerBlockUNet, kernelSize)
        self.feedforwardCNN = FeedforwardCNN(channelsFeedforwardCNN, channelsOut, kernelSize)

        self.activation = torch.nn.LeakyReLU(inplace=True)

    def forward(self, x):
        x = self.uNet(x)
        x = self.activation(x)
        x = self.feedforwardCNN(x)
        return x


def initWeights(m):
    """Initialize weights of neural network with xavier initialization."""
    if type(m) == torch.nn.Linear or type(m) == torch.nn.Conv2d or type(m) == torch.nn.Conv3d:
        torch.nn.init.kaiming_normal_(m.weight, nonlinearity='leaky_relu')
        m.bias.data.fill_(0.0)


def costFunction(prediction, label):
    return torch.mean((prediction - label) ** 2)

### Fim ###


import torch
from torch.utils.data import DataLoader
from torchsummary import summary           # torchinfo replaces torchsummary, uses same syntax
import numpy as np
import time
import matplotlib.pyplot as plt
from torch.utils.tensorboard import SummaryWriter
import NeuralNetwork

#CNN for strain distribution defined in NeuralNetwork.py
import datetime
import copy
import torchvision

#User settings
#tensorBoard options


writeGraph = False
writeHistogram = False
writeLearningHistory = False
writePredictions = False

#driver setup


seed = 2
torch.manual_seed(seed)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
writer = SummaryWriter(log_dir="./logs/fit" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))

#neural network architecture selection


selectNN = 2  # 0 for UNet, 1 for sequential CNN, 2 for UNet with subsequent feedforward CNN

#model parameters


kernelSize = 3

if selectNN == 0:
    channels = [1, 32, 64]
    channelsOut = 3
    numberOfConvolutionsPerBlock = 1
    model = NeuralNetwork.UNet(channels, channelsOut, numberOfConvolutionsPerBlock, kernelSize)
elif selectNN == 1:
    channels = [1, 32, 64, 32]
    channelsOut = 3
    model = NeuralNetwork.FeedforwardCNN(channels, channelsOut, kernelSize)
elif selectNN == 2:
    channelsUNet = [1, 32, 64]
    numberOfConvolutionsPerBlockUNet = 1
    channelsFeedforwardCNN = [64, 32, 16]
    channelsOut = 3
    model = NeuralNetwork.UNetWithSubsequentFeedforwardCNN(channelsUNet, numberOfConvolutionsPerBlockUNet,
                                                           channelsFeedforwardCNN, channelsOut, kernelSize)
model.to(device)
summary(model, (1, 1, 32, 32))
if writeGraph == True:
    writer.add_graph(model, torch.randn((1, 1, 32, 32), device=device))

#hyperparameters


batchSize = 128
alpha = -0.2
beta = 0.2
weightDecay = 0
lr = 2e-3
epochs = 1000
earlyStopping = True

#Pre-processing
#prepare dataset


numberOfTrainingSamples = 1
numberOfSamples = numberOfTrainingSamples + 32
dataset = NeuralNetwork.elasticityDataset(device, numberOfSamples)
# normalization
dataset.E = (dataset.E - np.mean([3000, 85000])) / np.std([3000, 85000])
datasetTraining, datasetValidation = torch.utils.data.random_split(dataset, [numberOfTrainingSamples,
                                                                             len(dataset) - numberOfTrainingSamples],
                                                                   generator=torch.Generator().manual_seed(2))

dataloaderTraining = DataLoader(datasetTraining, batch_size=batchSize)
dataloaderValidation = DataLoader(datasetValidation, batch_size=len(dataset))

if writePredictions == True:
    sample = next(iter(dataloaderValidation))
    validationLabelImages = torchvision.utils.make_grid(sample[1][:16], normalize=True, value_range=(-1, 1))
    for i in range(3):
        writer.add_image(f'validation label {i + 1}', validationLabelImages[i], dataformats='HW')


#Training
#optimizer and scheduler instantiation


optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weightDecay)
lr_lambda = lambda epoch: (beta * epoch + 1) ** alpha
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)

#history variables


trainingCostHistory = np.zeros(epochs)
validationCostHistory = np.zeros(epochs)

#training loop


start = time.perf_counter()
start0 = start
bestCost = 1e10

for epoch in range(epochs):
    if writeHistogram == True:
        for name, param in model.named_parameters():
            writer.add_histogram(name, param, epoch)

    model.train()
    for batch, sample in enumerate(dataloaderTraining):
        optimizer.zero_grad()

        prediction = model(sample[0])
        cost = NeuralNetwork.costFunction(prediction, sample[1])

        trainingCostHistory[epoch] += cost.detach() * len(sample[1])

        cost.backward()
        optimizer.step()

    trainingCostHistory[epoch] /= numberOfTrainingSamples
    scheduler.step()
    if writeHistogram == True:
        for name, param in model.named_parameters():
            writer.add_histogram(f'{name}.grad', param.grad, epoch)

    model.eval()
    sample = next(iter(dataloaderValidation))
    with torch.no_grad():
        prediction = model(sample[0])
        cost = NeuralNetwork.costFunction(prediction, sample[1])

        validationCostHistory[epoch] = cost
        if validationCostHistory[epoch] < bestCost:
            modelParametersBest = copy.deepcopy(model.state_dict())
            bestCost = validationCostHistory[epoch]

        if writePredictions == True:
            validationPredictionImages = torchvision.utils.make_grid(prediction[:16], normalize=True,
                                                                     value_range=(-1, 1))
            for i in range(3):
                writer.add_image(f'validation prediction {i + 1}', validationPredictionImages[i], epoch,
                                 dataformats='HW')

    elapsedTime = time.perf_counter() - start
    if epoch % 10 == 0:
        string = "Epoch: {}/{}\t\tTraining cost: {:.2e}\t\tValidation cost: {:.2e}\nElapsed time for last epoch: {:.2f} s"
        print(string.format(epoch + 1, epochs, trainingCostHistory[epoch], validationCostHistory[epoch], elapsedTime))
    start = time.perf_counter()

    if writeLearningHistory == True:
        writer.add_scalar('training_loss', trainingCostHistory[epoch], epoch)
        writer.add_scalar('validation_loss', validationCostHistory[epoch], epoch)

if earlyStopping == True:
    model.load_state_dict(modelParametersBest)
print("Total elapsed time during training: {:.2f} s".format(time.perf_counter() - start0))
writer.close()

SyntaxError: leading zeros in decimal integer literals are not permitted; use an 0o prefix for octal integers (NeuralNetwork.py, line 2)