# Deep Learning Based Velocity Estimation From Doppler-Radar

This notebook contains all trained models discussed in the submitted report. Since we were not allowed to use the actual data from Trackman, synthetic dummy data (random generated in the required size) are used for testing. That also explains the extremly high test rmse in each case. (Using dummy data has been agreed with our supervisor Mark Henney)

In [8]:
import torch
import torch.nn as nn
from torch.nn.functional import mse_loss as torch_mse_loss

from typing import Iterable


from pathlib import Path

import cv2

import numpy as np

# import time


# Constants
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

ROOT = Path().resolve()
DATA_ROOT = ROOT / "synthetic_data"
MODEL_DIR = ROOT / "models"

SEQ_LEN = 1600  # Fixed sequence length
INPUT_SIZE = 8  # Number of features in your time-series data

### Define model architectures

#### Baseline Model Architecture

In [9]:
class SpectrVelCNNRegr(nn.Module):
    """Baseline model for regression to the velocity

    Use this to benchmark your model performance.
    """
    loss_fn = nn.MSELoss()

    def __init__(self):
        super().__init__()
        
        
        self.conv1=nn.Sequential(
            nn.Conv2d(in_channels=6,
                      out_channels=16,
                      kernel_size=5,
                      stride=1,
                      padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )

        self.conv2=nn.Sequential(
            nn.Conv2d(
                in_channels=16,
                out_channels=32,
                kernel_size=5,
                stride=1,
                padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )

        self.conv3=nn.Sequential(
            nn.Conv2d(in_channels=32,
                      out_channels=64,
                      kernel_size=5,
                      stride=1,
                      padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv4=nn.Sequential(
            nn.Conv2d(in_channels=64,
                      out_channels=128,
                      kernel_size=3,
                      stride=1,
                      padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.flatten=nn.Flatten()
        self.linear1=nn.Linear(in_features=37120,out_features=1024)
        self.linear2=nn.Linear(in_features=1024,out_features=256)
        self.linear3=nn.Linear(in_features=256,out_features=1)
    
    def _input_layer(self, input_data):
        return self.conv1(input_data)

    def _hidden_layer(self, x):
        x=self.conv2(x)
        x=self.conv3(x)
        x=self.conv4(x)
        x=self.flatten(x)
        x=self.linear1(x)
        return self.linear2(x)

    def _output_layer(self, x):
        return self.linear3(x)

    def forward(self, input_data):
        x = self._input_layer(input_data)
        x = self._hidden_layer(x)
        return self._output_layer(x)

#### GRU Architecture

In [10]:
def mse_loss(output: torch.Tensor,
             target: torch.Tensor) -> torch.Tensor:
    return torch_mse_loss(output, target)

In [11]:
class RNN(nn.Module):
    def __init__(self, mode, input_size, hidden_size, num_layers, dropout, output_size):
        super(RNN, self).__init__()
        
        # Save the mode for use in forward
        self.mode = mode
        
        # Define the appropriate RNN type based on the mode
        if mode == "RNN": 
            self.rnn = nn.RNN(
                input_size=input_size,
                hidden_size=hidden_size,
                num_layers=num_layers,
                dropout=dropout,
                nonlinearity='relu',
                bias=True,
                batch_first=False
            )
        elif mode == "GRU":
            self.rnn = nn.GRU(
                input_size=input_size,
                hidden_size=hidden_size,
                num_layers=num_layers,
                dropout=dropout,
                bias=True,
                batch_first=False,
                bidirectional=False
            )
        elif mode == "LSTM":
            self.rnn = nn.LSTM(
                input_size=input_size,
                hidden_size=hidden_size,
                num_layers=num_layers,
                dropout=dropout,
                batch_first=False
            )
        else:
            raise ValueError("Undefined mode, enter 'RNN', 'GRU' or 'LSTM'")
        
        # BatchNorm on the hidden size
        self.bn = nn.LayerNorm(hidden_size)
        
        # Fully connected layers
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, 128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128,64),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(64, output_size)
        )

    def forward(self, x):
        batch_size, seq_len, feature_dim = x.shape
        input_size = feature_dim
        
        # Reshape input and permute dimensions
        x = x.view(batch_size, seq_len, input_size)
        x = x.permute(1, 0, 2)  # [seq_len, batch_size, input_size]
        
        # Forward pass through the appropriate RNN type
        if self.mode in ["RNN", "GRU", "LSTM"]:
            out, _ = self.rnn(x)
        else:
            raise ValueError("Undefined mode, enter 'RNN', 'GRU' or 'LSTM'")
        
        # Take the output of the last time step
        out = self.bn(out[-1])  # [batch_size, hidden_size]
        
        # Pass through fully connected layers
        out = self.fc(out)
        return out

#### CNN Architecture for Preprocessed Data

In [12]:
class CNN_97(nn.Module):
    loss_fn = nn.MSELoss()

    def __init__(self, 
                    num_conv_layers=4,
                    conv_dropout=0.2, 
                    num_fc_layers=3, 
                    kernel_size=(5, 5),
                    stride = 1, 
                    padding = 2,
                    pooling_size = 2,
                    linear_dropout=0.1, 
                    activation=nn.ReLU,
                    hidden_units=1024,
                    out_channels = 16, # Starting output channels for the first conv layer
                    use_fc_batchnorm=True,
                    use_cnn_batchnorm=True,
                    input_shape=(6, 74, 918)):
        super().__init__()
        
        # Define dynamic convolutional layers
        conv_layers = []
        in_channels = input_shape[0]  # Assuming the input has 6 channels
        height, width = input_shape[1], input_shape[2]

        for _ in range(num_conv_layers):
            conv_layers.append(nn.Conv2d(in_channels=in_channels,
                                            out_channels=out_channels,
                                            kernel_size=(kernel_size[0], kernel_size[1]),
                                            stride=stride,
                                            padding=padding))
            if use_cnn_batchnorm:  # Add BatchNorm2d if enabled
                conv_layers.append(nn.BatchNorm2d(out_channels))
            conv_layers.append(activation())
            conv_layers.append(nn.MaxPool2d(kernel_size=pooling_size))
            conv_layers.append(nn.Dropout(conv_dropout))
            
            in_channels = out_channels  # Update input channels for the next layer
            out_channels *= 2  # Double output channels for each subsequent layer
            
            # this is just to track the sizes 
            # Update dimensions after convolution and pooling
            height = (height + 2 * padding - kernel_size[0]) // stride + 1  # Convolution output height
            width = (width + 2 * padding - kernel_size[1]) // stride + 1    # Convolution output width
            height, width = height // pooling_size, width // pooling_size  # After max pooling with kernel_size=2

        self.conv_layers = nn.Sequential(*conv_layers)
        self.flatten = nn.Flatten()

        # Calculate the flattened dimension after convolution layers
        input_dim = height*width*in_channels

        # Define dynamic fully connected layers
        fc_layers = []
        for i in range(num_fc_layers - 1): #added halfing of hidden units each layer
            fc_layers.append(nn.Linear(input_dim, hidden_units))
            if use_fc_batchnorm:  # Add BatchNorm1d if enabled
                fc_layers.append(nn.BatchNorm1d(hidden_units))
            fc_layers.append(activation())
            fc_layers.append(nn.Dropout(linear_dropout))
            input_dim = hidden_units  # Update for the next layer
            hidden_units = hidden_units // 2 # halve hidden units
        # Final output layer with a single output feature
        fc_layers.append(nn.Linear(input_dim, 1))
        self.fc_layers = nn.Sequential(*fc_layers)

    def forward(self, input_data):
        x = self.conv_layers(input_data)
        #print(f"Shape after conv layers (before flatten): {x.shape}")  # Debug shape
        x = self.flatten(x)
        #print(f"Flattened size before fully connected layers: {x.shape}")  # Debug shape after flattening
        return self.fc_layers(x)

#### CNN Architecture of optimized CNN

In [13]:
class SpatialAttention(nn.Module):
    def __init__(self, in_channels):
        super(SpatialAttention, self).__init__()
        # 1x1 convolution to get a spatial attention map
        self.attention_conv = nn.Conv2d(in_channels, 1, kernel_size=1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # Generate the attention map
        attention_map = self.attention_conv(x)
        attention_map = self.sigmoid(attention_map)  # Apply sigmoid to get attention weights
        return x * attention_map  # Apply attention to the input feature map
    
class CNN_97_withAtt(nn.Module):
    loss_fn = nn.MSELoss()

    def __init__(self, 
                 num_conv_layers=4,
                 conv_dropout=0.2, 
                 num_fc_layers=3, 
                 kernel_size=(5, 5),
                 stride = 1, 
                 padding = 2,
                 pooling_size = 2,
                 linear_dropout=0.1, 
                 activation=nn.ReLU,
                 hidden_units=1024,
                 out_channels = 16, # Starting output channels for the first conv layer
                 use_fc_batchnorm=True,
                 use_cnn_batchnorm=True,
                 input_shape=(6, 74, 918)):
        super().__init__()
        
        # Define dynamic convolutional layers
        conv_layers = []
        in_channels = input_shape[0]  # Assuming the input has 6 channels
        height, width = input_shape[1], input_shape[2]

        for _ in range(num_conv_layers):
            conv_layers.append(nn.Conv2d(in_channels=in_channels,
                                         out_channels=out_channels,
                                         kernel_size=(kernel_size[0], kernel_size[1]),
                                         stride=stride,
                                         padding=padding))
            if use_cnn_batchnorm:  # Add BatchNorm2d if enabled
                conv_layers.append(nn.BatchNorm2d(out_channels))
            conv_layers.append(activation())
            conv_layers.append(nn.MaxPool2d(kernel_size=pooling_size))
            conv_layers.append(nn.Dropout(conv_dropout))

            # Add Spatial Attention after each convolution block
            conv_layers.append(SpatialAttention(out_channels))
            
            in_channels = out_channels  # Update input channels for the next layer
            out_channels *= 2  # Double output channels for each subsequent layer
            
            # this is just to track the sizes 
            # Update dimensions after convolution and pooling
            height = (height + 2 * padding - kernel_size[0]) // stride + 1  # Convolution output height
            width = (width + 2 * padding - kernel_size[1]) // stride + 1    # Convolution output width
            height, width = height // pooling_size, width // pooling_size  # After max pooling with kernel_size=2

        self.conv_layers = nn.Sequential(*conv_layers)
        self.flatten = nn.Flatten()

        # Calculate the flattened dimension after convolution layers
        input_dim = height*width*in_channels

        # Define dynamic fully connected layers
        fc_layers = []
        for i in range(num_fc_layers - 1): #added halfing of hidden units each layer
            fc_layers.append(nn.Linear(input_dim, hidden_units))
            if use_fc_batchnorm:  # Add BatchNorm1d if enabled
                fc_layers.append(nn.BatchNorm1d(hidden_units))
            fc_layers.append(activation())
            fc_layers.append(nn.Dropout(linear_dropout))
            input_dim = hidden_units  # Update for the next layer
            hidden_units = hidden_units // 2 # halve hidden units
        # Final output layer with a single output feature
        fc_layers.append(nn.Linear(input_dim, 1))
        self.fc_layers = nn.Sequential(*fc_layers)

    def forward(self, input_data):
        x = self.conv_layers(input_data)
        #print(f"Shape after conv layers (before flatten): {x.shape}")  # Debug shape
        x = self.flatten(x)
        #print(f"Flattened size before fully connected layers: {x.shape}")  # Debug shape after flattening
        return self.fc_layers(x)

### Test all trained models with synthetic data

*IMPORTANT NOTE: Results do not match the results of the submitted report. Synthetic dummy data need to be used.*

#### Baseline Model

In [None]:
# Set model parameter
model_baseline = SpectrVelCNNRegr()

# Load pretrained model
model_path = MODEL_DIR / "baseline"
model_baseline.load_state_dict(torch.load(model_path, map_location=DEVICE))

# Set the model to evaluation mode
model_baseline.eval()

# Define loss function
loss_fn = model_baseline.loss_fn

# # Test model
test_data_path = DATA_ROOT / "dummy_stacked_spectrograms.npy"
test_data = np.transpose(np.load(test_data_path), (2,0,1))

spectrogram = torch.tensor(test_data).unsqueeze(0).to(DEVICE).float()
target = torch.tensor([10.0]).to(DEVICE).squeeze()

# Evaluate
with torch.no_grad():
    test_output = model_baseline(spectrogram).squeeze()
    test_loss = loss_fn(test_output.squeeze(), target)
    test_rmse = test_loss.sqrt()


print(f"Test RMSE: {test_rmse.item()}")

#### Best GRU

In [None]:
# Optimal hyperparameters obtained from training
OPTIMAL_CONFIG = {
    "mode": "GRU",  
    "hidden_size": 256,
    "num_layers": 2,
    "dropout": 0.3,
    "output_size": 1,  # Single output for regression
}

def load_model(config, model_path):
    """Load the trained model with the given configuration."""
    model = RNN(
        mode=config["mode"],
        input_size=INPUT_SIZE,
        hidden_size=config["hidden_size"],
        num_layers=config["num_layers"],
        dropout=config["dropout"],
        output_size=config["output_size"],
    ).to(DEVICE)
    model.load_state_dict(torch.load(model_path, map_location=DEVICE))
    model.eval()
    return model

# Load the model
model_path = MODEL_DIR / "GRU_best.pth"
if not model_path.exists():
    raise FileNotFoundError(f"Model file not found: {model_path}")

model = load_model(OPTIMAL_CONFIG, model_path)
print(f"Model loaded successfully on {DEVICE}.")

def test_dummy_sample_with_rmse(model):
    
    # Create a dummy input tensor and GT 
    dummy_input = torch.randn(1, SEQ_LEN, INPUT_SIZE).to(DEVICE)   
    ground_truth = torch.tensor([1.0]).to(DEVICE).squeeze()  

    # Perform forward pass
    with torch.no_grad():
        output = model(dummy_input)

    # Compute RMSE
    mse_loss = nn.MSELoss()(output.squeeze(), ground_truth)
    rmse = mse_loss.sqrt().item()

    print(f"Dummy input shape: {dummy_input.shape}")
    print(f"Model output: {output.item():.4f}")
    print(f"Ground truth: {ground_truth.item():.4f}")
    print(f"Test RMSE: {rmse:.4f}")

# Test the model
test_dummy_sample_with_rmse(model)

#### Best CNN with preprocessed data

In [None]:
# Synthetic test data
test_data_path = DATA_ROOT / "dummy_stacked_spectrograms.npy"
spectrogram = np.transpose(np.load(test_data_path), (2,0,1))


# Data Preprocessing
# Define power and phase channels
NUM_POWER_CHANNELS = 4
PHASE_SPECTROGRAM_LIMITS = (-np.pi, np.pi)

spectrogram_processed = spectrogram.copy()
# Apply Sobel filter with ksize=31 on x-axis only to power channels
for ch in range(NUM_POWER_CHANNELS):
    # Get the channel data
    channel_data = spectrogram_processed[:, :, ch].astype(np.float64)

    # Apply Sobel filter in the x-direction
    sobelx = cv2.Sobel(channel_data, cv2.CV_64F, dx=1, dy=0, ksize=31)

    # Compute the gradient magnitude
    sobelx = np.sqrt(sobelx**2)

    # Replace the channel data with the Sobel filtered data
    spectrogram_processed[:, :, ch] = cv2.normalize(sobelx, None, 0, 1, cv2.NORM_MINMAX)

# Normalize the phase channels (channels 4 and 5)
spectrogram_processed[:, :, NUM_POWER_CHANNELS:] -= PHASE_SPECTROGRAM_LIMITS[0]
spectrogram_processed[:, :, NUM_POWER_CHANNELS:] /= PHASE_SPECTROGRAM_LIMITS[1] - PHASE_SPECTROGRAM_LIMITS[0]

spectrogram_processed = torch.tensor(spectrogram_processed).unsqueeze(0).to(DEVICE).float()
target = torch.tensor([10.0]).to(DEVICE).squeeze()


# Define model parameter
model_prepro = CNN_97(
    num_conv_layers=2,  # Example value, use the same as the trained model
    num_fc_layers=3,
    conv_dropout=0,
    linear_dropout=0,
    kernel_size=(5, 5),
    activation=nn.ReLU,  # Example activation function, change as needed
    hidden_units=64,
    padding=1,
    stride=1,
    pooling_size=1,
    out_channels=8,
    use_fc_batchnorm=True,
    use_cnn_batchnorm=True
).to(DEVICE)

# Load pretrained model
model_path = MODEL_DIR / "CNNprepro_best"
model_prepro.load_state_dict(torch.load(model_path, map_location=DEVICE))

# Set the model to evaluation mode
model_prepro.eval()

# Define loss function
loss_fn = model_prepro.loss_fn

# Evaluate
with torch.no_grad():
    test_output = model_prepro(spectrogram_processed).squeeze()
    test_loss = loss_fn(test_output.squeeze(), target)
    test_rmse = test_loss.sqrt()


print(f"Test RMSE: {test_rmse.item()}")

#### Best Optimized CNN

In [None]:
# Set model parameter
model_optimCNN = CNN_97_withAtt(
    num_conv_layers=3,  # Example value, use the same as the trained model
    num_fc_layers=3,
    conv_dropout=0,
    linear_dropout=0,
    kernel_size=(5, 7),
    activation=nn.ReLU,  # Example activation function, change as needed
    hidden_units=64,
    padding=0,
    stride=2,
    pooling_size=1,
    out_channels=8,
    use_fc_batchnorm=True,
    use_cnn_batchnorm=True
).to(DEVICE)

# Load pretrained model
model_path = MODEL_DIR / "CNNoptimized_best"
model_optimCNN.load_state_dict(torch.load(model_path, map_location=DEVICE))

# Set the model to evaluation mode
model_optimCNN.eval()

# Define loss function
loss_fn = model_optimCNN.loss_fn

# # Test model
test_data_path = DATA_ROOT / "dummy_stacked_spectrograms.npy"
test_data = np.transpose(np.load(test_data_path), (2,0,1))

spectrogram = torch.tensor(test_data).unsqueeze(0).to(DEVICE).float()
target = torch.tensor([10.0]).to(DEVICE).squeeze()

# Evaluate
with torch.no_grad():
    test_output = model_optimCNN(spectrogram).squeeze()
    test_loss = loss_fn(test_output.squeeze(), target)
    test_rmse = test_loss.sqrt()


print(f"Test RMSE: {test_rmse.item()}")