In [None]:
"""
CLDNN-like LSTM model for RadioML dataset.

Reference:
- Convolutional + LSTM + Fully Connected Deep Neural Networks
- Adapted from code contributed by Mika
"""

import os
from keras.models import Model
from keras.layers import Input, Dense, CuDNNLSTM
from keras.utils.vis_utils import plot_model

def LSTMModel(weights=None, input_shape=[1024,2], classes=26, **kwargs):
    """
    Builds an LSTM-based classification model.

    Arguments:
    weights -- path to pre-trained weights file (optional)
    input_shape -- shape of input data (samples, timesteps, features)
    classes -- number of output classes for classification

    Returns:
    model -- compiled Keras model
    """
    if weights is not None and not os.path.exists(weights):
        raise ValueError('The `weights` argument should be either None (random initialization) or a valid weights file path.')

    # Input layer
    input = Input(input_shape, name='input')
    x = input

    # Two stacked LSTM layers using GPU-optimized CuDNNLSTM
    x = CuDNNLSTM(units=128, return_sequences=True)(x)  # first LSTM, returns sequences
    x = CuDNNLSTM(units=128)(x)                         # second LSTM, outputs last timestep only

    # Fully connected output layer with softmax activation for multi-class classification
    x = Dense(classes, activation='softmax', name='softmax')(x)

    # Define the model
    model = Model(inputs=input, outputs=x)

    # Load pre-trained weights if provided
    if weights is not None:
        model.load_weights(weights)

    return model

# Example usage
import keras
if __name__ == '__main__':
    # Instantiate the model
    model = LSTMModel(None, input_shape=(1024,2), classes=26)

    # Compile the model with Adam optimizer and categorical crossentropy loss
    adam = keras.optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999)
    model.compile(loss='categorical_crossentropy', metrics=['accuracy'], optimizer=adam)

    # Save model architecture plot
    plot_model(model, to_file='model.png', show_shapes=True)

    # Print model details
    print('Model layers:', model.layers)
    print('Model config:', model.get_config())
    print('Model summary:', model.summary())

In [None]:
import matplotlib
matplotlib.use('TkAgg')  # Use TkAgg backend for interactive plotting
import matplotlib.pyplot as plt
import numpy as np
import pickle
import csv
import itertools
from sklearn.metrics import precision_recall_fscore_support, mean_squared_error, mean_absolute_error, r2_score

# ===========================
# Function: Plot confusion matrix
# ===========================
def plot_confusion_matrix(cm, title='', cmap=plt.get_cmap("Blues"), labels=[], save_filename=None):
    """
    Visualizes a confusion matrix with class labels and optional saving as PDF.
    """
    plt.figure(figsize=(10, 7))
    plt.imshow(cm*100, interpolation='nearest', cmap=cmap)  # Multiply by 100 to show in %
    plt.title(title, fontsize=10)
    plt.colorbar()
    tick_marks = np.arange(len(labels))
    plt.xticks(tick_marks, labels, rotation=90, size=12)
    plt.yticks(tick_marks, labels, size=12)

    # Annotate each cell with the value
    for i in range(len(tick_marks)):
        for j in range(len(tick_marks)):
            text_color = 'darkorange' if i == j else 'black'
            text = plt.text(j, i, int(np.around(cm[i,j]*100)), ha="center", va="center", fontsize=10, color=text_color)

    plt.tight_layout()
    plt.ylabel('True label', fontdict={'size':16})
    plt.xlabel('Predicted label', fontdict={'size':16})

    # Save figure if filename provided
    if save_filename is not None:
        plt.savefig(save_filename, format='pdf', dpi=1200, bbox_inches='tight')
    plt.close()

# ===========================
# Function: Calculate confusion matrix
# ===========================
def calculate_confusion_matrix(Y, Y_hat, classes):
    """
    Computes the normalized confusion matrix and counts of correct and incorrect predictions.
    """
    n_classes = len(classes)
    conf = np.zeros([n_classes, n_classes])
    confnorm = np.zeros([n_classes, n_classes])

    for k in range(Y.shape[0]):
        i = list(Y[k,:]).index(1)           # True class index
        j = int(np.argmax(Y_hat[k,:]))      # Predicted class index
        conf[i,j] += 1

    # Normalize each row to sum to 1
    for i in range(n_classes):
        confnorm[i,:] = conf[i,:] / np.sum(conf[i,:])

    right = np.sum(np.diag(conf))           # Correct predictions
    wrong = np.sum(conf) - right            # Incorrect predictions
    return confnorm, right, wrong

# ===========================
# Function: Calculate per-class accuracy from confusion matrix
# ===========================
def calculate_acc_at1snr_from_cm(cm):
    """
    Returns the recognition accuracy per class from the confusion matrix.
    """
    return np.round(np.diag(cm) / np.sum(cm, axis=1), 3)

# ===========================
# Function: Compute standard metrics
# ===========================
def calculate_metrics(Y, Y_hat):
    """
    Computes accuracy, precision, recall, F1-score, MSE, MAE, and R2 score.
    """
    Y_true = np.argmax(Y, axis=1)
    Y_pred = np.argmax(Y_hat, axis=1)

    accuracy = accuracy_score(Y_true, Y_pred)
    precision, recall, f1, _ = precision_recall_fscore_support(Y_true, Y_pred, average='weighted')

    mse = mean_squared_error(Y_true, Y_pred)
    mae = mean_absolute_error(Y_true, Y_pred)
    r2 = r2_score(Y_true, Y_pred)

    return accuracy, precision, recall, f1

# ===========================
# Function: Calculate metrics per SNR and optionally plot
# ===========================
def calculate_acc_cm_each_snr(Y, Y_hat, Z, classes=None, save_figure=True, min_snr=0):
    """
    Computes accuracy, precision, recall, and F1-score for each SNR level.
    Optionally generates plots of metrics vs SNR.
    """
    Z_array = Z[:, 0]  # Extract SNR values
    snrs = sorted(list(set(Z_array)))  # Unique SNR levels
    acc_mod_snr = np.zeros((len(classes), len(snrs)))

    metrics = {
        'accuracy': [],
        'precision': [],
        'recall': [],
        'f1': []
    }

    # Loop over each SNR and compute metrics
    for snr in snrs:
        Y_snr = Y[np.where(Z_array == snr)]
        Y_hat_snr = Y_hat[np.where(Z_array == snr)]

        accuracy, precision, recall, f1 = calculate_metrics(Y_snr, Y_hat_snr)
        metrics['accuracy'].append(accuracy)
        metrics['precision'].append(precision)
        metrics['recall'].append(recall)
        metrics['f1'].append(f1)

    # Plot metrics vs SNR
    for metric, values in metrics.items():
        plt.figure(figsize=(8, 6))
        plt.plot(snrs, values, label=metric)
        for x, y in zip(snrs, values):
            plt.text(x, y, round(y, 3), ha='center', va='bottom', fontsize=8)
        plt.xlabel("Signal to Noise Ratio (dB)")
        plt.ylabel(metric.capitalize())
        plt.title(f"{metric.capitalize()} vs SNR")
        plt.legend()
        plt.grid()
        plt.savefig(f'figure/{metric}_vs_snr.pdf', format='pdf', dpi=1200, bbox_inches='tight')
        plt.show()

In [None]:
# -----------------------------
# Setup environment and imports
# -----------------------------
# Set Keras backend to TensorFlow and choose GPU device 3
os.environ["KERAS_BACKEND"] = "tensorflow"
os.environ["CUDA_VISIBLE_DEVICES"] = "3"

# Import standard libraries for computation, plotting, and data handling
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.collections import LineCollection
from matplotlib.colors import ListedColormap, BoundaryNorm
import pickle, random, sys, h5py
import keras
import keras.backend as K
from keras.callbacks import LearningRateScheduler, TensorBoard
from keras.regularizers import *
from keras.optimizers import Adam
from keras.models import model_from_json
import tensorflow as tf
from keras.utils.np_utils import to_categorical
import pandas as pd

# -----------------------------
# Function to convert I/Q data to amplitude and phase
# -----------------------------
def to_amp_phase(X_train, X_val, X_test, nsamples):
    """
    Convert complex-valued I/Q signals to amplitude and normalized phase representation.

    Arguments:
    X_train, X_val, X_test -- input datasets with shape (samples, nsamples, 2) representing I/Q components
    nsamples -- number of samples per signal

    Returns:
    X_train, X_val, X_test -- transformed datasets in shape (samples, nsamples, 2) with amplitude and phase
    """

    # Convert I/Q to complex numbers
    X_train_cmplx = X_train[:, :,0] + 1j * X_train[:, :,1]
    X_val_cmplx   = X_val[:, :,0] + 1j * X_val[:, :,1]
    X_test_cmplx  = X_test[:, :,0] + 1j * X_test[:, :,1]

    # Compute amplitude and normalized phase for training set
    X_train_amp = np.abs(X_train_cmplx)
    X_train_ang = np.arctan2(X_train[:, :,1], X_train[:, :,0]) / np.pi

    # Reshape and concatenate amplitude & phase
    X_train_amp = np.reshape(X_train_amp, (-1, 1, nsamples))
    X_train_ang = np.reshape(X_train_ang, (-1, 1, nsamples))
    X_train = np.concatenate((X_train_amp, X_train_ang), axis=1)
    X_train = np.transpose(X_train, (0, 2, 1))  # shape -> (samples, nsamples, 2)

    # Repeat the same transformation for validation set
    X_val_amp = np.abs(X_val_cmplx)
    X_val_ang = np.arctan2(X_val[:, :,1], X_val[:, :,0]) / np.pi
    X_val_amp = np.reshape(X_val_amp, (-1, 1, nsamples))
    X_val_ang = np.reshape(X_val_ang, (-1, 1, nsamples))
    X_val = np.concatenate((X_val_amp, X_val_ang), axis=1)
    X_val = np.transpose(X_val, (0, 2, 1))

    # Repeat the same transformation for test set
    X_test_amp = np.abs(X_test_cmplx)
    X_test_ang = np.arctan2(X_test[:, :,1], X_test[:, :,0]) / np.pi
    X_test_amp = np.reshape(X_test_amp, (-1, 1, nsamples))
    X_test_ang = np.reshape(X_test_ang, (-1, 1, nsamples))
    X_test = np.concatenate((X_test_amp, X_test_ang), axis=1)
    X_test = np.transpose(X_test, (0, 2, 1))

    return (X_train, X_val, X_test)

# -----------------------------
# Define modulation classes for classification
# -----------------------------
classes = ['BPSK','QPSK','8PSK','16PSK','32PSK','64PSK',
           '4QAM','8QAM','16QAM','32QAM','64QAM','128QAM','256QAM',
           '2FSK','4FSK','8FSK','16FSK','4PAM','8PAM','16PAM',
           'AM-DSB','AM-DSB-SC','AM-USB','AM-LSB','FM','PM']

In [None]:
# ===========================
# Load training data
# ===========================
data1 = h5py.File('Dataset/HisarMod2019.1/Train/train.mat','r')  # Open .mat file in read mode
train = data1['data_save'][:]                                   # Extract dataset from HDF5 group 'data_save'

# Swap axes to match model input shape
# Original shape might be (num_samples, 2, 1024) or (1024, 2, num_samples)
# After swapaxes(0,2), shape becomes (num_samples, 2, 1024)
train = train.swapaxes(0,2)

# ===========================
# Load test data
# ===========================
data2 = h5py.File('Dataset/HisarMod2019.1/Test/test.mat','r')
test = data2['data_save'][:]
test = test.swapaxes(0,2)

# ===========================
# Add channel dimension
# ===========================
# Conv2D layers expect input of shape (samples, height, width, channels)
train = np.expand_dims(train, axis=3)  # Add a channel dimension: shape -> (num_samples, 2, 1024, 1)
test = np.expand_dims(test, axis=3)

In [None]:
# ===========================
# Load and preprocess labels
# ===========================

# Load training labels from CSV file
train_labels = pd.read_csv('Dataset/HisarMod2019.1/Train/train_labels1.csv', header=None)
train_labels = np.array(train_labels)  # Convert DataFrame to NumPy array

# Convert integer labels to one-hot encoding
# This is required for categorical cross-entropy loss in Keras
train_labels = to_categorical(train_labels, num_classes=None)


# Load test labels from CSV file
test_labels = pd.read_csv('Dataset/HisarMod2019.1/Test/test_labels1.csv', header=None)
test_labels = np.array(test_labels)    # Convert DataFrame to NumPy array
test_labels = to_categorical(test_labels, num_classes=None)  # One-hot encoding


# ===========================
# Load and preprocess SNR values
# ===========================

# Load training SNR values from CSV
# Each row corresponds to the SNR of a training sample
train_snr = pd.read_csv('Dataset/HisarMod2019.1/Train/train_snr.csv', header=None)
train_snr = np.array(train_snr)  # Convert to NumPy array for easier indexing

# Load test SNR values from CSV
# Each row corresponds to the SNR of a test sample
test_snr = pd.read_csv('Dataset/HisarMod2019.1/Test/test_snr.csv', header=None)
test_snr = np.array(test_snr)    # Convert to NumPy array

In [None]:
# -----------------------------
# Split dataset into training, validation, and test sets
# -----------------------------

n_examples = train.shape[0]                # Total number of examples in the training dataset
n_train = int(n_examples * 0.8)            # 80% for training
n_val = int(n_examples * 0.2)              # 20% for validation

# Randomly select indices for training samples
train_idx = list(np.random.choice(range(0, n_examples), size=n_train, replace=False))

# Validation indices are the remaining indices not in training set
val_idx = list(set(range(0, n_examples)) - set(train_idx))

# Shuffle the indices to avoid any ordering bias
np.random.shuffle(train_idx)
np.random.shuffle(val_idx)

# Use the selected indices to extract the actual data and labels
X_train = train[train_idx]                 # Training input features
Y_train = train_labels[train_idx]          # Training labels
X_val = train[val_idx]                     # Validation input features
Y_val = train_labels[val_idx]              # Validation labels

# Test set (already predefined)
X_test = test
Y_test = test_labels
Z_test = test_snr                           # SNR values corresponding to test samples

# Convert I/Q data to amplitude and phase representation for all sets
# After this, each sample will have shape (nsamples, 2) where the two channels are amplitude and normalized phase
X_train, X_val, X_test = to_amp_phase(X_train, X_val, X_test, 1024)

In [None]:
# Set up some params
nb_epoch =  200  # number of epochs to train on
batch_size = 300  # training batch size

In [None]:
# -----------------------------
# Create and compile the LSTM model
# -----------------------------

model = LSTMModel()  # Instantiate the LSTM-based model with default input shape (1024,2) and number of classes (26)

# Compile the model specifying:
# - Loss function: categorical crossentropy (used for multi-class classification)
# - Metrics: accuracy (to monitor training and validation performance)
# - Optimizer: Adam (adaptive learning rate optimizer)
model.compile(loss='categorical_crossentropy', metrics=['accuracy'], optimizer='Adam')

# Print a summary of the model architecture
# This includes each layer's type, output shape, and number of parameters
model.summary()

In [None]:
plot_model(model, to_file='model_LSTM.png',show_shapes=True) # print model

In [None]:
# -----------------------------
# Train the LSTM model
# -----------------------------

filepath = 'weights/weights.h5'  # Path to save the best model weights during training

import time
TRS_LSTM = time.time()  # Record start time for training duration measurement

# Train the model using model.fit
history = model.fit(
    X_train,                # Training features
    Y_train,                # Training labels (one-hot encoded)
    batch_size=batch_size,  # Number of samples per gradient update
    epochs=nb_epoch,        # Number of training epochs
    verbose=2,              # Verbosity mode (2 = one line per epoch)
    validation_data=(X_val,Y_val),  # Validation dataset for monitoring overfitting
    callbacks=[             # List of callbacks during training
        keras.callbacks.ModelCheckpoint(
            filepath, monitor='val_loss', verbose=1, save_best_only=True, mode='auto'
        ),  # Save the model weights only when validation loss improves
        keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss', factor=0.5, verbose=1, patince=5, min_lr=1e-6
        ),  # Reduce learning rate by 0.5 if validation loss plateaus for 5 epochs
        keras.callbacks.EarlyStopping(
            monitor='val_loss', patience=5, verbose=1, mode='auto'
        ),  # Stop training early if validation loss does not improve for 5 epochs
        # keras.callbacks.TensorBoard(...)  # Optional: visualize training in TensorBoard
    ]
)

In [None]:
TRE_LSTM = time.time()       # Record the end time after training completes
T_LSTM = TRE_LSTM - TRS_LSTM # Calculate total training duration in seconds

In [None]:
TES_LSTM = time.time()  # Record the start time before evaluating the LSTM model on the test set

In [None]:
#Show simple version of performance
score = model.evaluate(X_test, Y_test, verbose=1, batch_size=batch_size)
print(score)

In [None]:
 calculate_acc_cm_each_snr(Y_test, test_Y_hat, Z_test, classes, min_snr=-18) #Accuracy

In [None]:
TEE_LSTM = time.time()  # Record the end time after evaluating the LSTM model on the test set
T_LSTM_E = TEE_LSTM - TES_LSTM  # Calculate the time taken for LSTM model evaluation on the test set