# Sequential Convolutional Recurrent Neural Networks for Fast Automatic Modulation Classification

![models](models.png)

## Preparation

In [None]:
import pickle
import os

import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelBinarizer, StandardScaler
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import tensorflow as tf
from tensorflow import keras

In [None]:
def load_data(filepath="RML2016.10a_dict.dat", test_size=0.1):
    """
    load benchmark dataset.
    """
    radio_data = pickle.load(open(filepath, "rb"),
                             encoding='latin1')

    # ['8PSK', 'AM-DSB', 'AM-SSB', 'BPSK', 'CPFSK', 'GFSK', 'PAM4', 'QAM16', 'QAM64', 'QPSK', 'WBFM']
    # [-20, -18, -16, -14, -12, -10, -8, -6, -4, -2, 0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    mods, snrs = map(lambda idx: sorted(
        list(set(map(lambda key: key[idx], radio_data.keys())))), [0, 1])

    features = []
    labels = []

    for mod in mods:
        for snr in snrs:
            iqs = radio_data[(mod, snr)]
            features.append(iqs)
            for i in range(len(iqs)):
                labels.append((mod, snr))

    X = np.vstack(features)
    # Convert X to（n_samples, timesteps, n_features）
    X = np.transpose(X, (0, 2, 1))
    
    labels = np.array(labels)
    lb = LabelBinarizer()
    y = lb.fit_transform(labels[:, 0])
    print("Labels: {}".format(lb.classes_))

    X_train, X_test, y_train, y_test, labels_train, labels_test = train_test_split(
        X, y, labels, test_size=test_size, random_state=random_state, stratify=labels)

    return X_train, X_test, y_train, y_test, labels_train, labels_test

def L2_norm(X):
    for i in range(X.shape[0]):
        X[i,:,0] = X[i,:,0] / np.linalg.norm(X[i,:,0])
        
    return X

def to_amp_phase(X):
    X_complex = X[:, :, 0] + 1j * X[:, :, 1]
    
    X_amp = np.abs(X_complex)
    X_ang = np.arctan2(X[:, :, 1], X[:, :, 0]) / np.pi
    
    
    X_amp = X_amp.reshape(X_amp.shape[0], X_amp.shape[1], 1)
    X_ang = X_ang.reshape(X_ang.shape[0], X_ang.shape[1], 1)
    
    X = np.concatenate((X_amp, X_ang), axis=2)
    
    return X

def feature_scaling(X_train, X_test):
    scalor = StandardScaler()

    X_train_2d = X_train.reshape(
        X_train.shape[0]*X_train.shape[1], X_train.shape[2])
    X_test_2d = X_test.reshape(
        X_test.shape[0]*X_test.shape[1], X_test.shape[2])

    scalor.fit(X_train_2d)
    X_train_2d_norm = scalor.transform(X_train_2d)
    X_test_2d_norm = scalor.transform(X_test_2d)

    X_train_norm = X_train_2d_norm.reshape(
        X_train.shape[0], X_train.shape[1], X_train.shape[2])
    X_test_norm = X_test_2d_norm.reshape(
        X_test.shape[0], X_test.shape[1], X_test.shape[2])

    return X_train_norm, X_test_norm

In [None]:
X_train, X_test, y_train, y_test, labels_train, labels_test = load_data("RML2016.10a_dict.dat")

In [None]:
X_train, X_test = feature_scaling(X_train, X_test)

## Baselines

> The two models are chosen as the baselines for further com- parisons due to their results showing the significant improve- ments upon expert feature-based approaches. Any further improvements should be considered state-of-the-art.

### CNN Baseline

> One is the CNN architecture proposed by O’shea et al. [19]. As shown in Fig. 1(a), the baseline model is a 4-layer network made up of two convolutional layers and two dense layers. Each hidden layer utilizes rectified linear unit (ReLU) activation functions and dropout of 50% except for a softmax activation function on the one-hot output layer. Adam optimizer and categorical cross entropy loss function are applied to the base model.

In [None]:
X_train_cnn = X_train.reshape(X_train.shape[0], 1, X_train.shape[2], X_train.shape[1])
X_test_cnn = X_test.reshape(X_test.shape[0], 1, X_test.shape[2], X_test.shape[1])

model = keras.models.Sequential()
model.add(keras.layers.Conv2D(256, (1, 3), activation='relu', padding='same', data_format='channels_first', input_shape=[1, 2, 128]))
model.add(keras.layers.Dropout(0.5))
model.add(keras.layers.Conv2D(80, (2, 3), activation='relu', padding='same', data_format='channels_first', dilation_rate=1))
model.add(keras.layers.Dropout(0.5))
model.add(keras.layers.Flatten())

model.add(keras.layers.Dense(256, activation='relu', kernel_initializer='he_normal'))
model.add(keras.layers.Dropout(0.5))
model.add(keras.layers.Dense(11, activation='softmax', kernel_initializer='he_normal'))

print(model.summary())

model.compile(loss='categorical_crossentropy',
                  optimizer='adam',
                  metrics=['accuracy'])

callbacks_list = [
    keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=3,
        verbose=1
    )
]

history = model.fit(X_train_cnn, y_train,
                        batch_size=1024,
                        epochs=200,
                        callbacks=callbacks_list, 
                        validation_split=0.2)

In [None]:
snrs = [-20, -18, -16, -14, -12, -10, -8, -6, -4, -2, 0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

metrics_test = []

for snr in snrs:
    idx = np.where(labels_test[:, 1] == str(snr))
    mods = ['AM-SSB', 'PAM4', 'QPSK', '8PSK', 'BPSK', 'QAM16', 'QAM64', 'WBFM', 'CPFSK', 'AM-DSB', 'GFSK']
    
    loss_and_metrics = model.evaluate(X_test_cnn[idx], y_test[idx], batch_size=128);
    metrics_test.append((snr, loss_and_metrics[0], loss_and_metrics[1]))

df_metrics_test = pd.DataFrame(metrics_test, columns=['snr', 'loss', 'acc'])
df_metrics_test.plot(kind='scatter', x='snr', y='acc')

### LSTM Baseline

> The other baseline model is proposed by Rajendran et al. [23], shown in Fig. 1(b). The model is comprised of two 128-unit long short-term memory (LSTM) layers and an 11-unit dense layer with a softmax activation. The first LSTM layer returns the full sequences while the sec- ond one just returns the last state. The dropout is also adopted to reduce overfitting. Adam optimizer and categorical cross entropy loss function are applied to the model. Note that this model learns from the time domain information of the modulation schemes using amplitude-phase format, instead of IQ format.

In [None]:
X_train_lstm = to_amp_phase(X_train)
X_test_lstm = to_amp_phase(X_test)

In [None]:
model = keras.models.Sequential()
model.add(keras.layers.LSTM(128, dropout=0, recurrent_dropout=0.2, return_sequences=True, input_shape=(128, 2)))
model.add(keras.layers.LSTM(128, dropout=0, recurrent_dropout=0.2, return_sequences=False))
model.add(keras.layers.Dense(11, activation='softmax'))
print(model.summary())

model.compile(loss='categorical_crossentropy',
                  optimizer='adam',
                  metrics=['accuracy'])

callbacks_list = [
    keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=3,
        verbose=1
    )
]


history = model.fit(X_train_lstm, y_train,
                    batch_size=128,
                    epochs=80,
                    callbacks=callbacks_list, 
                    validation_split=0.2)

In [None]:
snrs = [-20, -18, -16, -14, -12, -10, -8, -6, -4, -2, 0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

metrics_test = []

for snr in snrs:
    idx = np.where(labels_test[:, 1] == str(snr))
    mods = ['AM-SSB', 'PAM4', 'QPSK', '8PSK', 'BPSK', 'QAM16', 'QAM64', 'WBFM', 'CPFSK', 'AM-DSB', 'GFSK']
    
    loss_and_metrics = model.evaluate(X_test_lstm[idx], y_test[idx], batch_size=128);
    metrics_test.append((snr, loss_and_metrics[0], loss_and_metrics[1]))

df_metrics_test = pd.DataFrame(metrics_test, columns=['snr', 'loss', 'acc'])
df_metrics_test.plot(kind='scatter', x='snr', y='acc')

## SCRNN

> Fig. 1(c) provides the illustration of the proposed SCRNN architecture. As schematically shown in Fig. 1(c), the first and second convolutional layers each contain 128 5-tap filters except for the first one followed by a max-pooling layer with a pooling size of 3. The layer 3 and layer 4 are LSTM layers composed of 128 units each, and both return the full sequences. The last dense layer contains 11-class neurons representing the modulation schemes.

In [None]:
model = keras.models.Sequential()
model.add(keras.layers.Conv1D(128, 5, activation='relu', kernel_constraint=keras.constraints.max_norm(3), input_shape=(128, 2)))  
model.add(keras.layers.MaxPooling1D(3))
model.add(keras.layers.Conv1D(128, 5, activation='relu', kernel_constraint=keras.constraints.max_norm(3)))  
model.add(keras.layers.LSTM(128, dropout=0.5, recurrent_dropout=0.5, return_sequences=True))
model.add(keras.layers.LSTM(128, dropout=0.5, recurrent_dropout=0.5, return_sequences=True))
model.add(keras.layers.Flatten())
model.add(keras.layers.Dropout(0.5))
model.add(keras.layers.Dense(11, activation='softmax'))

print(model.summary())

model.compile(loss='categorical_crossentropy',
                  optimizer='adam',
                  metrics=['accuracy'])

callbacks_list = [
    keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=3,
        verbose=1
    )
]


history = model.fit(X_train, y_train,
                    batch_size=128,
                    epochs=80,
                    callbacks=callbacks_list, 
                    validation_split=0.2)

In [None]:
snrs = [-20, -18, -16, -14, -12, -10, -8, -6, -4, -2, 0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

metrics_test = []

for snr in snrs:
    idx = np.where(labels_test[:, 1] == str(snr))
    mods = ['AM-SSB', 'PAM4', 'QPSK', '8PSK', 'BPSK', 'QAM16', 'QAM64', 'WBFM', 'CPFSK', 'AM-DSB', 'GFSK']
    
    loss_and_metrics = model.evaluate(X_test[idx], y_test[idx], batch_size=128);
    metrics_test.append((snr, loss_and_metrics[0], loss_and_metrics[1]))

df_metrics_test = pd.DataFrame(metrics_test, columns=['snr', 'loss', 'acc'])
df_metrics_test.plot(kind='scatter', x='snr', y='acc')