In [None]:
pip install h5py
import numpy as np # linear algebra
import h5py
import scipy.io as io
import sklearn
from torch.utils.data import Dataset, DataLoader
import dask.array as da
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from time import time
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, classification_report, f1_score
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import random
import pandas as pd
import seaborn as sns
import pickle
import datetime
import os

### Signal description
The following modulations have been considered as classes:

- 1- Linear Frequency Modulation (LFM)
- 2- 2FSK Binary Frequency Keying
- 3- 4FSK frequency keying
- 4- 8FSK frequency keying
- 5- Frequency modulation with Costas code
- 6- 2PSK Binary Phase Shift Keying
- 7- 4PSK Phase Keying
- 8- 8PSK Phase Keying
- 9- Phase modulation with Barker code
- 10- Huffman Code Phase Modulation
- 11- Phase modulation with Frank code
- 12- Phase Modulation with P1 Code
- 13- Phase Modulation with P2 Code
- 14- Phase Modulation with P3 Code
- 15- Phase Modulation with P4 Code
- 16- Phase Modulation with Px Code
- 17- Phase modulation with Zadoff-Chu code
- 18- Phase Modulation with T1 Code
- 19- Phase modulation with T2 code
- 20- Phase Modulation with T3 Code
- 21- Phase Modulation with T4 Code
- 22- Non-Modulation (NM)
- 23- Complex White Gaussian Noise (Noise)

In [None]:
your_path = 'input/'
classes = ['LFM','2FSK','4FSK','8FSK', 'Costas','2PSK','4PSK',\
           '8PSK','Barker','Huffman','Frank','P1','P2','P3','P4','Px',\
           'Zadoff-Chu','T1','T2','T3','T4','NM','Noise']
with h5py.File(your_path +'X_train.mat', 'r') as f:
    X_train = np.array(f['X_train'], dtype='float32').T
with h5py.File(your_path +'X_val.mat', 'r') as f:
    X_val = np.array(f['X_val'], dtype='float32').T
with h5py.File(your_path +'X_test.mat', 'r') as f:
    X_test = np.array(f['X_test'], dtype='float32').T
    Y_train = io.loadmat(your_path + 'Y_train.mat')['Y_train']
Y_val = io.loadmat(your_path + 'Y_val.mat')['Y_val']
Y_test = io.loadmat(your_path + 'Y_test.mat')['Y_test']
lbl_train = io.loadmat(your_path + 'lbl_train.mat')['lbl_train']
lbl_test = io.loadmat(your_path + 'lbl_test.mat')['lbl_test']
lbl_val = io.loadmat(your_path + 'lbl_val.mat')['lbl_val']
print("Sample:")
print(X_train.shape, X_val.shape, X_test.shape)
print(np.isnan(X_train).sum(), np.isnan(X_val).sum(), np.isnan(X_test).sum())
def shw_smpl(smpl, start=None, end=None):
    label = classes[Y_train[smpl].argmax()]
    plt.figure(figsize=(10,3))
    plt.title(label)
    plt.plot(X_train[smpl, start:end,0], label="i")
    plt.plot(X_train[smpl, start:end,1], label="q")
    plt.legend()
    plt.show()

for smpl in random.sample(range(X_train.shape[0]), 10):
    shw_smpl(smpl, end=50)
    
pd.DataFrame(
    data=lbl_train,
    columns=[
        "Signal Class",
        "Signal-to-noise ratio",
        "Signal Length",
        "Carrier Frequency",
        "LFM Bandwidth OR Symbolic Rate",
        "Frequency Jump"
    ]
)

def barplot(dataframe, title=None):
    plt.figure(figsize=(10,4))
    plt.title(title)
    unique, counts = np.unique(dataframe.argmax(1), return_counts=True)
    ax = sns.barplot(
        x=np.array(classes)[unique],
        y=counts,
        palette="pastel"
    )
    ax.set_xticklabels(ax.get_xticklabels(), rotation=45, horizontalalignment='right')
    plt.show()
    
barplot(Y_train, "train")
barplot(Y_val, "validation")
barplot(Y_test, "test")

In [None]:
class MyDataset(Dataset):
    def __init__(self, filenameX, filenameY):
        with h5py.File(your_path +filenameX + '.mat', 'r') as f:
            x = np.array(f[filenameX], dtype='float32').T
        y = io.loadmat(your_path + filenameY+'.mat')[filenameY]
        self.n_samples = x.shape[0]
        self.X = torch.from_numpy(x).view(-1, 1, 1024, 2)
        self.y = torch.from_numpy(y).argmax(1)

    def __len__(self):
        return self.n_samples
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]
    
trainset = MyDataset("X_train", "Y_train")
valset = MyDataset("X_val", "Y_val")
testset = MyDataset("X_test", "Y_test")

# Consider increasing if high number of features.
# Find best batch size for efficiency
batch_size = 4096

trainloader = DataLoader(trainset, shuffle=True, batch_size = batch_size)
valloader = DataLoader(valset, shuffle=False, batch_size = batch_size)
testloader = DataLoader(testset, shuffle=False, batch_size = batch_size)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

def test_epoch(dataloader, model):
    model.eval()
    true, pred = torch.empty(0).to(torch.long), torch.empty(0)
    for batchx, batchy in tqdm(dataloader, leave=False):
        batchx = batchx.to(device)
        with torch.no_grad():
            output = model(batchx)
            true = torch.cat((true, batchy))
            pred = torch.cat((pred, output.cpu()))
    return true, pred

def train_epoch(dataloader, model, criterion, optimizer):
    model.train()
    all_, right, total_loss, count = 0, 0, 0, 0
    for batchx, batchy in tqdm(dataloader, leave=False):
        batchx, batchy = batchx.to(device), batchy.to(device)
        optimizer.zero_grad()
        pred = model(batchx)
        loss = criterion(pred, batchy)
        loss.backward()
        optimizer.step()
        all_ += batchy.shape[0]
        right += (pred.argmax(1) == batchy).sum().cpu()
        count += 1
        total_loss += loss.cpu().item()
    return (right/all_).item(), total_loss/count

def get_metrics(pred, true):
    accuracy = accuracy_score(pred, true)
    precision = precision_score(pred, true, average="weighted", zero_division=0)
    recall = recall_score(pred, true, average="weighted", zero_division=0)
    f1 = f1_score(pred, true, average="weighted", zero_division=0)
    conf_matrix = confusion_matrix(pred, true)
    return accuracy, precision, recall, f1, conf_matrix

def init_history(*data):
    if data:
        data = [data]
    return pd.DataFrame(
        data=data,
        columns=["accuracy train","accuracy val", "train loss",
                 "val loss","precision", "recall", "f1"]
    )

def model_training(model, trainloader, valloader, criterion, optimizer, epochs=20):
    start_time = time()
    history = init_history()
    for epoch in range(epochs):
        train_acc, train_loss = train_epoch(trainloader, model, criterion, optimizer)
        true, pred = test_epoch(valloader, model)
        val_loss = criterion(pred, true).item()
        val_acc, val_prec, val_rec, val_f1, _ = get_metrics(pred.argmax(dim=1), true)
        history = pd.concat((history, init_history(
            train_acc, val_acc, train_loss,val_loss,val_prec, val_rec, val_f1
        )),axis=0, ignore_index=True)
        print(f"epoch [{epoch:2d}] train loss [{train_loss:5.4f}] train acc \
[{train_acc:5.4f}] val loss [{val_loss:5.4f}] val acc [{val_acc:5.4f}]")
    total_time = (time() - start_time)/60
    print(f"total_time: {total_time}")
    return history, total_time

def show_learning_curve(history, train="accuracy train",val="accuracy val", title="accuracy"):
    plt.figure(figsize=(8,3))
    plt.plot(history[train], label="train")
    plt.plot(history[val], label="validation")
    plt.title(title)
    plt.xticks(range(history.shape[0]))
    plt.grid()
    plt.legend()
    plt.show()
    
def heatmap(congf_m):
    plt.figure(figsize=(16,13))
    sns.heatmap(conf_m, xticklabels=classes, yticklabels=classes, annot=True,
                fmt="4d", cmap="Blues");
    plt.show()

def save_results(data=None):
    file_exists = os.path.exists('results.csv')
    
    if not file_exists:
        new_res = pd.DataFrame(
        columns=["model", "accuracy", "precision", "recall", "f1", "time(m)", "params", "timestamp"]
        )
        new_res.to_csv('results.csv')
    
    new_res = pd.DataFrame(data)
    new_res.to_csv('results.csv', mode='a', header=False, index=False)
    
    return new_res

# data=[["model 1", acc, prec, rec, f1, model1_scores[1], model1_scores[0], datetime.datetime.now()]]

# Model 1 - Single Conv layer

In [None]:
model_name = "single conv layer"

class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.l1 = nn.Sequential(
            nn.BatchNorm2d(1),
            nn.Conv2d(in_channels=1, out_channels=4, kernel_size=(3,2), padding=(1,0)),
            nn.ReLU(),
            nn.MaxPool2d((2,1)),
            nn.Dropout2d(0.2),
        )
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(1024*2, 23),
            nn.Softmax(dim=1),
        )
    def forward(self, x):
        x = self.l1(x)
        x = self.fc(x)
        return x
    
model = MyModel().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.005)


# Model 2 - Increase kernel size

In [None]:
model_name = "single conv layer, increase kernel size"

class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.l1 = nn.Sequential(
            nn.BatchNorm2d(1),
            nn.Conv2d(in_channels=1, out_channels=4, kernel_size=(11,1), padding='same'),
            nn.ReLU(),
            nn.MaxPool2d((2,1)),
            nn.Dropout2d(0.2),
        )
        
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(1024*4, 23),
            nn.Softmax(dim=1),
        )
    def forward(self, x):
        x = self.l1(x)
        x = self.fc(x)
        return x
    
model = MyModel().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0008)

# Model 3 - 4 conv layers

In [None]:
model_name = "8 conv small to large kernel, 4096 batch size"

class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.l1 = nn.Sequential(
            nn.BatchNorm2d(1),
            nn.Conv2d(in_channels=1, out_channels=16, kernel_size=(3,1), padding="same"),
            nn.ReLU(),
            nn.BatchNorm2d(16),
            
            nn.Conv2d(in_channels=16, out_channels=16, kernel_size=(3,1), padding="same"),
            nn.ReLU(),
            nn.MaxPool2d((2,1)),
            nn.BatchNorm2d(16),
            nn.Dropout2d(0.2),
        )
        self.l2 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=(7,1), padding="same"),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(7,1), padding="same"),
            nn.ReLU(),
            nn.MaxPool2d((2,1)),
            nn.BatchNorm2d(32),
            nn.Dropout2d(0.2),
        )
        self.l3 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(11,1), padding="same"),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=(11,1), padding="same"),
            nn.ReLU(),
            nn.MaxPool2d((2,1)),
            nn.BatchNorm2d(64),
            nn.Dropout2d(0.3),
        )
        self.l4 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(15,1), padding="same"),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(15,1), padding="same"),
            nn.ReLU(),
            nn.MaxPool2d((2,1)),
            nn.BatchNorm2d(128),
            nn.Dropout2d(0.3),
        )
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128*128, 23),
            nn.Softmax(dim=1),
        )
            
            
    def forward(self, x):
        x = self.l1(x)
        x = self.l2(x)
        x = self.l3(x)
        x = self.l4(x)
        x = self.fc(x)
        return x
    
model = MyModel().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0005)
history, total_t = model_training(model, trainloader, valloader, criterion, optimizer, 20)

In [None]:
history, total_t = model_training(model, trainloader, valloader, criterion, optimizer, 20)
model_scores = [sum([p.numel() for p in model.parameters()]), total_t, history]
with open(model_name+".pickle", "wb") as f:
    pickle.dump(model_scores, f)
    
torch.save(model.state_dict(), model_name+".pth")
with open(model_name+".pickle", "rb") as f:
    model_scores = pickle.load(f)
    
model.load_state_dict(torch.load(model_name+".pth"))
model.eval();

show_learning_curve(model_scores[2], "train loss", "val loss", "cross entropy loss")
show_learning_curve(model_scores[2])
true, pred = test_epoch(testloader, model)
acc, prec, rec, f1, conf_m = get_metrics(true, pred.argmax(dim=1))
print(f"Accuracy: {acc:.4f}")
print(f"Precision: {prec:.4f}")
print(f"Recall: {rec:.4f}")
print(f"F1 score: {f1:.4f}")
heatmap(conf_m)
print(classification_report(true, pred.argmax(1), zero_division=0))
total_res = save_results(
    data=[[model_name, acc, prec, rec, f1, model_scores[1], model_scores[0], datetime.datetime.now()]]
)
total_res