# Modulation Classification Training (Floating-point)

Welcome to the training notebook for the modulation classification model.

The goal of this notebook is to train a Convolutional Neutral Network (CNN) model to learn how to classify modulation schemes using the DeepRFSoC dataset recorded with an AMD RFSoC device.

## Model Dimensions
The CNN model is a small, 4-layer network with two convolutional layers and two fully-connected layers.

<img src="./assets/networktopology.png" width="800" alt="Model Topology">

---

## Import Packages and Confirm GPU are available
Whether you are using CUDA or ROCm, confirm you can accelerate the training loops with a GPU. Otherwise, this can run on the CPU (slow).

In [None]:
# Import modules
import torch
import numpy as np
import plotly.graph_objects as go
from tqdm.notebook import tqdm

# Select which GPU to use (if available)
gpu = 0
if torch.cuda.is_available():
    torch.cuda.set_device(gpu)
    device = torch.device('cuda')
else:
    device = torch.device('cpu')
print(f'Using device: {device}')

## Load the DeepRFSoC dataset
Ensure the DeepRFSoC dataset has been downloaded to your local drive and point to it with the variable `dataset_path` below. Consult the `README.md` in this folder on instructions on how to download the DeepRFSoC dataset.

In [None]:
import os.path

dataset_path = './DeepRFSoC.pkl'
os.path.isfile(dataset_path) # Confirm the dataset path is correct

## Preprocess dataset
Loop over classes and noise levels (SNRs) and collect them.

In [None]:
import pickle
classes = ['QPSK','BPSK','QAM16','QAM64','PSK8','PAM4','GFSK','CPFSK']
snrs = ['-20','-16','-12','-8','-4','0','4','8','12','16','20','24', '28', '30']

import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np

with open(dataset_path,'rb') as f:
    Xd = pickle.load(f)
mods = classes
snrs = snrs
X = []
lbl = []
snr_lbl = []
for mod in mods:
    for snr in snrs:
        tmp = Xd[mod,snr]
        X.append(tmp)
        for i in range(Xd[mod,snr].shape[2]):
            lbl.append(mods.index(mod))
            snr_lbl.append(snr)
X = np.dstack(X)
X = np.moveaxis(X,2,0)
labels = lbl
dataset_values = X

## Create PyTorch Dataset Class
Create `Dataset` class for easy access to frames during our training loop.

In [None]:

validation_split = 0.2
test_split = 0.1
shuffle_dataset = True
# Creating data indices for training, validation, and test splits:
dataset_size = len(labels)
indices = list(range(dataset_size))
split = int(np.floor((validation_split+test_split) * dataset_size))
if shuffle_dataset:
    np.random.seed(1234)
    np.random.shuffle(indices)
train_indices, val_test_indices = indices[split:], indices[:split]
valid_split = int(np.floor(validation_split * len(val_test_indices)))
val_indices, test_indices = val_test_indices[valid_split:], val_test_indices[:valid_split]
# Creating PT data samplers and loaders:
train_sampler = torch.utils.data.SubsetRandomSampler(train_indices)
valid_sampler = torch.utils.data.SubsetRandomSampler(val_indices)
test_sampler = torch.utils.data.SubsetRandomSampler(test_indices)

class AMCDataset(Dataset):
    def __init__(self, dataset, labels):
        super(AMCDataset,self).__init__()
        self.classes = ['QPSK','BPSK','QAM16','QAM64','PSK8','PAM4','GFSK','CPFSK']
        self.snrs = ['-20','-16','-12','-8','-4','0','4','8','12','16','20','24','28','30']
        self.dataset, self.labels = dataset, labels

    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        data = self.dataset[idx,:,:]
        label = self.labels[idx]
        # return data, label
        return torch.from_numpy(data.astype(np.float32).reshape(1,2,-1)), torch.tensor(label)
    
dataset = AMCDataset(dataset_values, labels)

## Define AMC Model

Four layer CNN model with floating-point weights and activations.

In [None]:
from torch import nn

class AMC(nn.Module):
    def __init__(self):
        super(AMC, self).__init__()
        self.conv_layers = nn.ModuleList()
        self.linear_layers = nn.ModuleList()
        self.conv_layers.append(nn.Identity())
        self.conv_layers.append(nn.Conv2d(
            kernel_size=(1,3),
            in_channels=1,
            out_channels=64,
            bias=False,
        ))
        self.conv_layers.append(nn.ReLU())
        self.conv_layers.append(nn.Conv2d(
            kernel_size=(2,3),
            in_channels=64,
            out_channels=16,
            bias=False,
        ))
        self.conv_layers.append(nn.ReLU())
        self.linear_layers.append(nn.Linear(
            in_features=1984,
            out_features=128,
            bias=False,
        ))
        self.linear_layers.append(nn.ReLU())
        self.linear_layers.append(nn.Linear(
            in_features=128,
            out_features=8,
            bias=False,
        ))

    def forward(self, x):
        for mod in self.conv_layers:
            x = mod(x)
        if len(x.shape) > 3:
            x = x.transpose(1,3).flatten(start_dim=1)
        else:
            x = x.transpose(0,2).flatten(start_dim=0)
        for mod in self.linear_layers:
            x = mod(x)
        return x
    
model = AMC()

## Training, Validation, and Test functions

In [None]:
from sklearn.metrics import accuracy_score

def train(model, train_loader, optimizer, criterion):
    losses = []
    # ensure model is in training mode
    model.train()    

    for (inputs, label) in tqdm(train_loader, desc="Batches", leave=False):   
        if gpu is not None:
            inputs = inputs.cuda()
            label = label.cuda()
        model.train()
        criterion.train()
        # print(f"Inputs:{inputs.shape}. label:{label}")
        # forward pass
        output = model(inputs)
        loss = criterion(output, label)
        
        # backward pass + run optimizer to update weights
        optimizer.zero_grad() 
        loss.backward()
        optimizer.step()
        # model.clip_weights(-1,1)
        # keep track of loss value
        losses.append(loss.cpu().detach().numpy())
           
    return losses

def valid(model, val_loader, criterion):    
    # ensure model is in eval mode
    losses = []
    model.eval() 
    y_true = []
    y_pred = []
    with torch.no_grad():
        for (inputs, label) in val_loader:
            if gpu is not None:
                inputs = inputs.cuda()
                label = label.cuda()
            output = model(inputs)
            loss = criterion(output, label)
            losses.append(loss.cpu().detach().numpy())
            pred = output.argmax(dim=1, keepdim=True)
            y_true.extend(label.tolist()) 
            y_pred.extend(pred.reshape(-1).tolist())
        
    return accuracy_score(y_true, y_pred), losses

def test(model, test_loader):    
    # ensure model is in eval mode
    model.eval() 
    y_true = []
    y_pred = []
    with torch.no_grad():
        for (inputs, label) in test_loader:
            if gpu is not None:
                inputs = inputs.cuda()
                label = label.cuda()
            output = model(inputs)
            pred = output.argmax(dim=1, keepdim=True)
            y_true.extend(label.tolist()) 
            y_pred.extend(pred.reshape(-1).tolist())
        
    return accuracy_score(y_true, y_pred)

def display_loss_plot(losses, title="Training loss", xlabel="Iterations", ylabel="Loss"):
    go.Figure([go.Scatter(y=losses)])

## Train the Model

With a cross entropy loss function and an Adam optimiser, the network is trained across 100 epochs.

In [None]:
import json
name = 'float_deeprfsoc'
batch_size = 128
num_epochs = 100
patience = 8

data_loader_train = DataLoader(dataset, batch_size=batch_size, sampler=train_sampler)
data_loader_valid = DataLoader(dataset, batch_size=batch_size, sampler=valid_sampler)
data_loader_test = DataLoader(dataset, batch_size=batch_size, sampler=test_sampler)

# save the losses for each q
training_loss = {}
validation_loss = {}

model = AMC()
if gpu is not None:
    model = model.cuda()

# loss criterion and optimiser
criterion = nn.CrossEntropyLoss()
if gpu is not None:
    criterion = criterion.cuda()
# Backup optimiser:
optimiser = torch.optim.Adam(model.parameters(), lr=1e-4)

running_loss = []
running_val_loss = []
running_test_acc = []
min_val_loss = np.inf

for epoch in tqdm(range(num_epochs), desc="Epochs"):
    loss_epoch = train(model, data_loader_train, optimiser, criterion)
    val_acc, val_loss = valid(model, data_loader_test, criterion)
    print("Epoch %d: Training loss = %f, Validation loss = %f,val accuracy = %f" % (epoch, np.mean(loss_epoch), np.mean(val_loss), val_acc))
    mean_val_loss = np.mean(val_loss)
    if min_val_loss > mean_val_loss:
        print(f'Val loss decreased({min_val_loss:.6f} -> {mean_val_loss:.6f})\t Saving Model...')
        min_val_loss = mean_val_loss
        # Saving State Dict
        save_path = f'saved_models_fixed/saved_model_{name}.path'
        torch.save(model.state_dict(), save_path)
        count = 0
    else:
        if count < patience:
            count += 1
        else:
            print('Early Stopping Triggered!')
            break
    running_loss.append(np.mean(loss_epoch))
    running_val_loss.append(np.mean(val_loss))
    running_test_acc.append(val_acc)
training_loss = running_loss
validation_loss = running_val_loss
with open(f'loss_metrics/training_loss_{name}.pkl','wb') as f:
    pickle.dump(training_loss,f,protocol=pickle.HIGHEST_PROTOCOL)
with open(f'loss_metrics/validation_loss_{name}.pkl','wb') as f:
    pickle.dump(validation_loss,f,protocol=pickle.HIGHEST_PROTOCOL)

## Testing the Resulting Trainged Model

Testing the performance of the model in classifying modulation schemes across varying Signal-to-Noise Ratios (SNRs)

In [None]:
name = 'float_deeprfsoc'
classes = mods
device = torch.device('cpu')
X_test = dataset_values[test_indices,:,:]
Y_test = np.array(labels)[test_indices]
accs = {}
test_model = AMC()
test_model.load_state_dict(torch.load(f'saved_models_fixed/saved_model_{name}.path', map_location=device))
test_model.eval()
acc = {}
for snr in snrs:
    # extract classes @ SNR
    test_SNRs = list(map(lambda x: snr_lbl[x], test_indices))
    test_X_i = X_test[np.where(np.array(test_SNRs)==snr)]
    test_Y_i = Y_test[np.where(np.array(test_SNRs)==snr)]
    # conf matrix
    conf = np.zeros([len(classes),len(classes)])
    confnorm = np.zeros([len(classes),len(classes)])
    # estimate classes
    in_model = np.reshape(test_X_i, (-1,1,2,128))
    for i in range(0,in_model.shape[0]):
        in_reshape = np.reshape(in_model[i,:,:,:],(-1,1,2,128))
        test_Y_i_hat = test_model(torch.from_numpy(in_model[i,:,:,:]).to(torch.float32)).cpu().detach().numpy()
        j = test_Y_i[i]
        k = int(np.argmax(test_Y_i_hat))
        conf[j,k] = conf[j,k] + 1
    for i in range(0,len(classes)):
        confnorm[i,:] = conf[i,:] / np.sum(conf[i,:])
    cor = np.sum(np.diag(conf))
    ncor = np.sum(conf) - cor
    print(f'{snr}dB SNR. Overall Accuracy: {cor / (cor+ncor)}')
    acc[snr] = 1.0*cor/(cor+ncor)
    for i, mod in enumerate(classes):
        acc[(mod, snr)] = conf[i,i] / np.sum(conf[i,:])

In [None]:
# save test accuracy
with open(f'test_results/test_accuracy_{name}.pkl','wb') as f:
    pickle.dump(acc,f,protocol=pickle.HIGHEST_PROTOCOL)

## Save a Test set for MATLAB
Saving a test set for the highest SNR to build hardware model in MATLAB/Simulink.

In [None]:
# make test set of only 30dB SNR
X_test = dataset_values[test_indices,:,:]
Y_test = np.array(labels)[test_indices]
test_SNRs = list(map(lambda x: snr_lbl[x], test_indices))
test_X_i = X_test[np.where(np.array(test_SNRs)=='30')]
test_Y_i = Y_test[np.where(np.array(test_SNRs)=='30')]

from scipy.io import savemat
name = 'float_deeprfsoc'
inputs = np.reshape(test_X_i, (-1,1,2,128))
labels = test_Y_i
datadict = {'inputs': inputs, 'labels': labels}
savemat(f'matlab_saved_models/inputs_amc_{name}_30dB.mat', datadict)


## Plot Accuracy vs SNR
SNR vs Accuracy for the trained model. We can confirm the model has learned how to classify models if the classification performance increases as the noise levels decrease.

In [None]:
fig = go.Figure([go.Scatter(x=list(map(int,snrs)), y=[acc[snr]*100 for snr in snrs], mode='lines+markers', marker=dict(symbol='circle-open', size=8, color='#8ecddd'))])

fig.update_layout(
    title='Accuracy vs SNR for float model',
    xaxis=dict(title='SNR (dB)'),
    yaxis=dict(title='Accuracy (%)',range=[0, 100]),
    legend_title='Quantised model',
    font=dict(family='Arial', size=16, color='black'),
    margin=dict(l=50, r=50, t=50, b=50),
    # paper_bgcolor='rgba(0,0,0,0)',
    template='plotly_white',
    height=600,
    width=800,
)

## Export weights to MATLAB
Working with the base `torch` library.

In [None]:
from scipy.io import savemat
model = model.cpu()

Wconv1 = model.conv_layers[1].weight.detach().numpy()
Wconv2 = model.conv_layers[3].weight.detach().numpy()
Wdense1 = model.linear_layers[0].weight.detach().numpy()
Wdense2 = model.linear_layers[2].weight.detach().numpy()
mdict = {'Wconv1': Wconv1, 'Wconv2': Wconv2, 'Wdense1': Wdense1, 'Wdense2': Wdense2}
savemat(f'matlab_saved_models/model_{name}.mat', mdict)
# save input for testing
inputs = np.reshape(X_test, (-1,1,2,128))
labels = Y_test
datadict = {'inputs': inputs, 'labels': labels}
savemat(f'matlab_saved_models/inputs_amc_{name}.mat', datadict)

---