In [None]:
!pip install numpy
!pip install torch
!pip install sklearn
!pip install scipy
!pip install tensorflow
!pip install matplotlib

Imports:

In [1]:
import numpy as np
import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import scipy.io
import tensorflow as tf
from tensorflow.keras import layers
import matplotlib.image as mpimg
import time


CNN network:

In [2]:
import torch
import torch.nn as nn

class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()

        # Define the layers

        self.conv1 = nn.Conv1d(1, 32, kernel_size=3,stride=1 , padding=1)
        self.bn1 = nn.BatchNorm1d(32)
        self.relu1 = nn.ReLU()
        self.conv2 = nn.Conv1d(32, 64, kernel_size=3,stride=1 , padding=1)
        self.relu2 = nn.ReLU()
        self.maxpool1 = nn.MaxPool1d(kernel_size=2, stride=2)

        self.conv3 = nn.Conv1d(64, 128, kernel_size=3,stride=1 , padding=1)
        self.bn3 = nn.BatchNorm1d(128)
        self.relu3 = nn.ReLU()
        self.conv4 = nn.Conv1d(128, 128, kernel_size=3,stride=1 , padding=1)
        self.relu4 = nn.ReLU()
        self.maxpool2 = nn.MaxPool1d(kernel_size=2, stride=2)

        self.conv5 = nn.Conv1d(128, 256, kernel_size=3,stride=1 , padding=1)
        self.bn5 = nn.BatchNorm1d(256)
        self.relu5 = nn.ReLU()
        self.conv6 = nn.Conv1d(256, 256, kernel_size=3,stride=1 , padding=1)
        self.relu6 = nn.ReLU()
        self.maxpool3 = nn.MaxPool1d(kernel_size=2, stride=2)

        self.conv7 = nn.Conv1d(256, 512, kernel_size=3,stride=1 , padding=1)
        self.bn7 = nn.BatchNorm1d(512)
        self.relu7 = nn.ReLU()
        self.conv8 = nn.Conv1d(512, 512, kernel_size=3,stride=1 , padding=1)
        self.relu8 = nn.ReLU()
        self.maxpool4 = nn.MaxPool1d(kernel_size=2, stride=2)

        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(512 * 6, 512)
        self.do1 = nn.Dropout(p=0.05)
        self.bn9 = nn.BatchNorm1d(512)
        self.relu9 = nn.ReLU()
        self.fc2 = nn.Linear(512, 256)
        self.bn10 = nn.BatchNorm1d(256)
        self.relu10 = nn.ReLU()
        self.fc3 = nn.Linear(256, 2)

    def forward(self, x):
        # Perform forward pass

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.maxpool1(x)

        x = self.conv3(x)
        x = self.relu3(x)
        x = self.conv4(x)
        x = self.relu4(x)
        x = self.maxpool2(x)


        x = self.conv5(x)
        x = self.relu5(x)
        x = self.conv6(x)
        x = self.relu6(x)
        x = self.maxpool3(x)

        x = self.conv7(x)
        x = self.relu7(x)
        x = self.conv8(x)
        x = self.relu8(x)
        x = self.maxpool4(x)

        x = self.flatten(x)
        x = self.fc1(x)
        x = self.do1(x)
        x = self.relu9(x)
        x = self.fc2(x)
        x = self.relu10(x)
        x = self.fc3(x)

        return x

Dataset Class:

In [3]:
class CustomDataset(Dataset):
    def __init__(self, x_train, y_train):
        self.x_train = x_train
        self.y_train = y_train

    def __len__(self):
        return len(self.x_train)

    def __getitem__(self, idx):
        x = self.x_train[idx]
        y = self.y_train[idx]
        return x, y

Model Hyper parameters and train/test split:

In [28]:
# Step 0: read data from Mat file
file_path = r"Dataset.mat"
mat = scipy.io.loadmat(file_path)
x_values = mat['featuresMat']          # data, labels
y_values = mat['positionMat']
x_values = x_values.transpose()
y_values = y_values.transpose()/1000

# time to train our model
# hyper-parameters
batch_size = 1024
learning_rate = 5e-5
epochs = 500

x_train, x_test, y_train, y_test = train_test_split(x_values, y_values, test_size=0.15, shuffle=True)
x_train = torch.tensor(x_train)
y_train = torch.tensor(y_train)

x_test = torch.tensor(x_test)
y_test = torch.tensor(y_test)

x_train = x_train.unsqueeze(1)
x_test = x_test.unsqueeze(1)

trainset = CustomDataset(x_train, y_train)
testset = CustomDataset(x_test, y_test)

# dataloaders - creating batches and shuffling the data
trainloader = torch.utils.data.DataLoader(
    trainset, batch_size=batch_size, shuffle=True, num_workers=2)
testloader = torch.utils.data.DataLoader(
    testset, batch_size=batch_size, shuffle=False, num_workers=2)

# device - cpu or gpu?
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# loss criteria
mse_criterion = nn.MSELoss()

# build our model and send it to the device
model = SimpleCNN().to(device) # no need for parameters as we alredy defined them in the class

# optimizer - SGD, Adam, RMSProp...
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

Model Training:

In [None]:
# Training loop
loss_history = []
test_loss_history = []
for epoch in range(1, epochs + 1):
    model.train()  # Put the model in training mode

    running_loss = 0.0
    mse_running_loss = 0.0

    for i, data in enumerate(trainloader, 0):

        inputs, labels = data

        # Send inputs and labels to the device
        inputs = inputs.to(device, dtype=torch.float32)
        labels = labels.to(device, dtype=torch.float32)
        # Zero the gradients
        optimizer.zero_grad()
        # Forward pass
        outputs = model(inputs)

        # Calculate MSE loss
        mse_loss = mse_criterion(outputs, labels)

        # Combine MSE loss according to your requirements
        loss = mse_loss

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Update the running losses
        running_loss += loss.item()
        mse_running_loss += mse_loss.item()

    # Calculate average training losses
    avg_loss = running_loss / len(trainloader)
    avg_mse_loss = mse_running_loss / len(trainloader)
    loss_history.append(avg_mse_loss)
    # Evaluate the model on the test set
    model.eval()  # Put the model in evaluation mode
    test_loss = 0.0
    test_mse_loss = 0.0
    test_mae_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for data in testloader:

            inputs, labels = data

            # Send inputs and labels to the device
            inputs = inputs.to(device, dtype=torch.float32)
            labels = labels.to(device, dtype=torch.float32)

            # Forward pass
            outputs = model(inputs)

            # Calculate MSE loss
            mse_loss = mse_criterion(outputs, labels)

            # Calculate MAE loss

            # Combine MSE and MAE losses according to your requirements
            loss = mse_loss

            # Update the test losses
            test_loss += loss.item()
            test_mse_loss += mse_loss.item()

            # Get predictions
            _, predicted = torch.max(outputs.data, 1)

            # Update total and correct predictions
            total += labels.size(0)

    # Calculate average test losses and accuracy
    avg_test_loss = test_loss / len(testloader)
    test_loss_history.append(avg_test_loss)
    avg_test_mse_loss = test_mse_loss / len(testloader)

    # Print the epoch number, training losses, test losses, and test accuracy
    print(f"Epoch: {epoch} | Training Loss: {avg_loss:.4f} (MSE: {avg_mse_loss:.4f}) | Test Loss: {avg_test_loss:.4f}")


Save Model:

In [None]:
torch.save(model, 'model.pth')

Plotters:

In [None]:
cells = np.array([[381, 504, 65],
         [428, -17, 514],
         [244, -44, -597],
         [401, -408, -447],
         [229, -716, 233],
         [485, -254, 907],
         [318, 944, -257],
         [324, -1070, -497],
         [272, -357, -1145]])

#Fig 1
#--------
#FIX
loss = loss_history
arr_epochs = np.arange(epochs)
fig = plt.figure(1, figsize=(10, 10))
fig.suptitle('CNN Analysis')
ax1 = plt.subplot2grid((1, 1), (0, 0), colspan=1, rowspan=1)

# Exclude the first few epochs so the graph is easier to read
SKIP = 0
ax1.plot(arr_epochs[SKIP:], loss[SKIP:], 'r-', label='Training loss')
ax1.plot(arr_epochs[SKIP:], test_loss_history[SKIP:], 'g-', label='Test loss')
ax1.set_title('Training loss')
ax1.set_xlabel('Epochs')
ax1.set_ylabel('Loss')
ax1.legend()
ax1.grid(True, which='both', axis='both')
############
# Calculate and print the loss on our test dataset
with torch.no_grad():
    for data in testloader:

        inputs, labels = data

        # Send inputs and labels to the device
        inputs = inputs.to(device, dtype=torch.float32)
        labels = labels.to(device, dtype=torch.float32)

        # Forward pass
        outputs = model(inputs)

        # Calculate MSE loss
        mse_loss = mse_criterion(outputs, labels)

        # Calculate MAE loss

        # Combine MSE and MAE losses according to your requirements
        loss = mse_loss

        # Update the test losses
        test_loss += loss.item()
        test_mse_loss += mse_loss.item()

        # Get predictions
        _, predicted = torch.max(outputs.data, 1)


        # Update total and correct predictions
        total += labels.size(0)

# Calculate average test losses and accuracy
avg_test_loss = test_loss / len(testloader)
avg_test_mse_loss = test_mse_loss / len(testloader)
#############

loss = avg_test_mse_loss
# Step 4: Make predictions based on our test dataset
x_test = x_test.to(device, dtype=torch.float32)
predictions= model(x_test)
predictions = np.array(predictions.tolist())

#predictions_val = model.predict(x_validate)


A = mpimg.imread(r'map.bmp')
B = A[:, (np.arange(460)),:]

#Figure 2
fig = plt.figure(2, figsize=(10, 10))
fig.suptitle('CNN Predictions Analysis')
ax1 = plt.subplot2grid((1, 1), (0, 0), colspan=1, rowspan=1)
# Graph the predictions against the actual values
ax1.set_title('Comparison of predictions and actual values')
ax1.imshow(B)
xVec = 1000*y_test[:, 0]
yVec = 1000*y_test[:, 1]
xConv = B.shape[1]*(xVec+750)/1000
yConv = B.shape[0]*(1-(yVec+1000)/1700)
ax1.plot(xConv, yConv, 'b.', label='Actual')
xVec = 1000*predictions[:, 0]
yVec = 1000*predictions[:, 1]
xConv = B.shape[1]*(xVec+750)/1000
yConv = B.shape[0]*(1-(yVec+1000)/1700)
ax1.plot(xConv, yConv, 'r.', label='Predicted')

xVec = cells[:,1]
yVec = cells[:,2]
xConv = B.shape[1]*(xVec+750)/1000
yConv = B.shape[0]*(1-(yVec+1000)/1700)
ax1.plot(xConv, yConv, 'k*', label='Cells', markersize=10)
ax1.grid(True, which='both', axis='both')
ax1.legend()
ax1.axis([-200, 800, -100, 900])
plt.gca().invert_yaxis()
#plt.show()

#Figure 3
fig = plt.figure(3, figsize=(10, 10))
fig.suptitle('NN Predictions Analysis')
ax1 = plt.subplot2grid((1, 1), (0, 0), colspan=1, rowspan=1)
y_test = y_test.numpy()
err = 1000*y_test-1000*predictions
#err_val = 1000*y_validate-1000*predictions_val
ax1.set_title('Error between real position to predictions and actual values')
ax1.plot(err[:, 0], err[:, 1], 'b.', label='Error')
ax1.legend()
ax1.grid(True, which='both', axis='both')
ax1.axis('equal')
plt.show()

#Figure 4
fig = plt.figure(4, figsize=(10, 10))
err_total = (err[:,0]**2+err[:,1]**2)**0.5
err_std_x = np.sqrt(np.mean(err[:,0]**2))
err_std_y = np.sqrt(np.mean(err[:,1]**2))
np.mean(np.sqrt(err[:, 0] ** 2 + err[:, 1] ** 2))
fig.suptitle(f'Prediction errors in true locations, STD_x = {np.round(err_std_x)} , STD_y = {np.round(err_std_y)}')
ax1 = plt.subplot2grid((1, 1), (0, 0), colspan=1, rowspan=1)
ax1.plot(1000*y_test[err_total<100, 0], 1000*y_test[err_total<100, 1], 'b.', label='0-100 meters')
ax1.plot(1000*y_test[(err_total>=100)&(err_total<200), 0], 1000*y_test[(err_total>=100)&(err_total<200), 1], 'g.', label='100-200meters')
ax1.plot(1000*y_test[(err_total>=200)&(err_total<300), 0], 1000*y_test[(err_total>=200)&(err_total<300), 1], 'c.', label='200-300 meters')
ax1.plot(1000*y_test[(err_total>=300), 0], 1000*y_test[(err_total>=300), 1], 'r.', label='300+ meters')
ax1.plot(cells[:,1], cells[:,2], 'k*', label='Cells', markersize=10)
ax1.legend()
ax1.grid(True, which='both', axis='both')
ax1.axis('equal')
plt.show()
