In [None]:
import numpy as np
import scipy.io as sio
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import Dataset
from torch.utils.data import DataLoader


### Loading Training and Test Data from Local Directory

Training and test data is loaded from two local .mat files and are formatted into numpy ndarrays.

In [None]:
images = sio.loadmat('DS.mat')
images = images['DS']
tr_images = images[:800, :, :, :]
test_images = images[800:, :, :, :]
labels = sio.loadmat('labels.mat')
labels = labels['par']
tr_labels = labels[:800, :]
test_labels = labels[800:, :]
# Change the format of tensor since in torch channels are in the second axis.
tr_images = np.moveaxis(tr_images, -1, 1)
test_images = np.moveaxis(test_images, -1, 1)
print(np.shape(tr_images), np.shape(test_images))
print(np.shape(tr_labels), np.shape(test_labels))


### Defining the pyTorch Image Dataset Class

In this cell we define an "image dataset" type to be used as the input to our pyTorch CNN model.

Class methods like get, constructor... that are required for the dataset to function are defined for use.

In [None]:
class TF_image_dataset(Dataset):
    def __init__(self, in_dataset, out_dataset, transform=None):
        self.in_dataset = torch.Tensor(in_dataset)
        self.out_dataset = torch.Tensor(out_dataset)
        self.transform = transform

    def __len__(self):
        return self.in_dataset.size(0)

    def __getitem__(self, index):
        x = self.in_dataset[index]
        x = self.transform(x)
        y = self.out_dataset[index]
        return x, y


def get_loader(in_dataset, out_dataset, batch_size, transform, num_workers, pin_memory=True):
    tr_ds = TF_image_dataset(
        in_dataset=in_dataset,
        out_dataset=out_dataset,
        transform=transform,
    )
    train_loader = DataLoader(tr_ds,
                              batch_size=batch_size,
                              num_workers=num_workers,
                              pin_memory=pin_memory,
                              shuffle=True,
                              )

    return train_loader


### Declare and Construct Loader for the Encoder Decoder Model

We declare and build the final input data to our CNN model.

In [None]:
BATCH_SIZE = 15
NUM_WORKERS = 0
PIN_MEMORY = True
TRANSFORM = transforms.Compose(
    [
        transforms.Normalize(mean=[0, 0], std=[50, 3.14])]
)

train_loader = get_loader(tr_images, tr_labels,
                          BATCH_SIZE, TRANSFORM, NUM_WORKERS, PIN_MEMORY)
test_loader = get_loader(test_images, test_labels, 200,
                         TRANSFORM, NUM_WORKERS, PIN_MEMORY)
imag, l = next(iter(train_loader))
print(imag.shape)


### A Visualization

This cell prints a batch of samples from the training dataset.

In [None]:
plt.figure(figsize=(30, 30))
for i in range(BATCH_SIZE):
    ax = plt.subplot(5, 5, i+1)
    plt.imshow(imag[i, 1, :, :], cmap='gray')
    plt.axis('off')
    # print(torch.max(imag[i,0,:,:]),torch.min(imag[i,0,:,:])) # Printing the max and min value in the plotted samples
    # print(torch.mean(imag[i,0,:,:]),torch.mean(imag[i,0,:,:])) # Printing the mean value in the plotted samples


### Defining the Encoder Decoder CNN

A pyTorch NN class and it's forward method is defined that will be used as the Encoder Decoder model.

In [None]:
class Autoencoder(nn.Module):
    def __init__(self):
        super().__init__()
        # inputsize--> N, 2, 200, 200
        self.encoder = nn.Sequential(
            nn.Conv2d(2, 8, 10, stride=5),  # --> N, 8, 39, 39
            nn.ReLU(),
            nn.Conv2d(8, 16, 5, stride=2),  # --> N, 16, 18, 18
            nn.ReLU(),
            nn.Conv2d(16, 32, 5),  # --> N, 32, 14, 14
            nn.ReLU(),
            nn.Conv2d(32, 64, 14)  # --> N, 64, 1, 1
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 32, 14),  # --> N, 32, 14, 14
            nn.ReLU(),
            nn.ConvTranspose2d(32, 16, 5),  # --> N, 16, 18, 18
            nn.ReLU(),
            nn.ConvTranspose2d(16, 8, 10, stride=3),  # --> N, 8, 61, 61
            nn.ReLU(),
            nn.ConvTranspose2d(8, 2, 20, stride=3),  # --> N, 8, 101, 101
            nn.Tanh()
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded


### Defining CNN Cost Function and Optimizer

In [None]:
model = Autoencoder()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)


### Training the Encoder Decoder Model

We now use the training dataset that contains our input images and labels to train the encoder decoder model to reconstruct the images in the input to the output.

In [None]:
num_epochs = 15
for epoch in range(num_epochs):
    for (img, _) in train_loader:
        recon = model(img)
        loss = criterion(recon, img)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f'Epoch:{epoch+1}, Loss:{loss.item():.4f}')


### Fetch and Plot 5 Input Samples and Corresponding Reconstructed Outputs 

Sanity check of the validity and quality of the encoder decoder model.

In [None]:
# Fetch 5 input and output samples
out = []
for (img, _) in train_loader:
    recon = model(img)
    out.append((img, recon))
print(type(img), type(recon))
print(img.size(), recon.size())

# Plot the 5 input samples
plt.figure(figsize=(30, 30))
for i in range(5):
    ax = plt.subplot(5, 5, i+1)
    plt.imshow(img.numpy()[i, 1, :, :], cmap='gray')
    plt.axis('off')

# Plot the 5 corresponding output samples
plt.figure(figsize=(30, 30))
for i in range(5):
    ax = plt.subplot(5, 5, i+1)
    plt.imshow(recon.detach().numpy()[i, 1, :, :], cmap='gray')
    plt.axis('off')


### Extracting Latent Variables

We extract the latent variables from the output of the encoder in our model. Two lists are created which house the variables and their labels (System parameters) for each of the training and test datasets.

In [None]:
# Getting latent variables for the training set
x_FC, y_FC = [], []

for (img, L) in train_loader:
    latent = model.encoder(img)
    latent = latent.squeeze()
    x_FC.append(latent)
    y_FC.append(L)

x_FC = torch.cat(x_FC, axis=0)
x_FC = x_FC.detach()
y_FC = torch.cat(y_FC, axis=0)

# Getting latent variables for the training set
test_x_FC, test_y_FC = [], []

print(x_FC.size())
for (img, L) in test_loader:
    latent = model.encoder(img)
    latent = latent.squeeze()
    test_x_FC.append(latent)
    test_y_FC.append(L)

test_x_FC = torch.cat(test_x_FC, axis=0)
test_x_FC = test_x_FC.detach()
test_y_FC = torch.cat(test_y_FC, axis=0)


### Defining the Fully Connected NN at the Output

A pyTorch NN class and it's forward method is defined that will be used as the Fully Connected NN model.

In [None]:
class FC (nn.Module):
    def __init__(self):
        super().__init__()
        self.NN = nn.Sequential(
            nn.Linear(64, 32),
            nn.Sigmoid(),
            nn.Linear(32, 16),
            nn.Sigmoid(),
            nn.Linear(16, 6)
        )

    def forward(self, x):

        return self.NN(x)


### Defining FCC Cost Function and Optimizer

In [None]:
model2 = FC()
criterion2 = nn.MSELoss()
optimizer2 = optim.Adam(model2.parameters(), lr=1e-3)

### Training the Fully Connected Model 

We now use the training dataset that contains our latent variables and their labels to train the FCC to get the system parameters from information that was extracted from input images through the latent variables. 

In [None]:
print(x_FC.size())

num_epochs = 10000
for epoch in range(num_epochs):
    pred = model2(x_FC)
    loss = criterion2(pred, y_FC)
    optimizer2.zero_grad()
    loss.backward()
    optimizer2.step()
    print(f'Epoch:{epoch+1}, Loss:{loss.item():.4f}')


### Result Visualizations

In [None]:
pred = model2(test_x_FC)

# Scatter plot
plt.figure(figsize=(12, 10), dpi=80)
for i in range(6):
    plt.subplot(2, 3, i+1)
    plt.scatter(test_y_FC.numpy()[:, i], pred.detach().numpy()[:, i])

# Histogram
plt.figure(figsize=(12, 10), dpi=80)
for i in range(6):
    plt.subplot(2, 3, i+1)
    plt.hist((test_y_FC.numpy()[:, i]-pred.detach().numpy()
             [:, i])/test_y_FC.numpy()[:, i]*100)
