# Complex-Valued Convolutions for Modulation Recognition using Deep Learning
- Author: Jakob Krzyston
- Date: 1/27/2020

This code based on: https://github.com/radioML/examples/blob/master/modulation_recognition/RML2016.10a_VTCNN2_example.ipynb

## Import Packages

In [None]:
import os,random
import numpy as np
os.environ["KERAS_BACKEND"] = "tensorflow"
from keras.utils import np_utils
import keras.models as models
from keras.layers.core import Reshape,Dense,Dropout,Activation,Flatten,Lambda,Permute
from keras.layers.noise import GaussianNoise
from keras.layers.convolutional import Convolution2D, MaxPooling2D, ZeroPadding2D
from keras.regularizers import *
from keras.optimizers import adam
import matplotlib.pyplot as plt
%matplotlib inline
import keras
import pickle, random, time

## Load the dataset
- data was downloaded from https://www.deepsig.io/datasets

In [None]:
Xd = pickle.load(open("RML2016.10a_dict.pkl", 'rb'), encoding = 'latin1')
test_snrs,mods = map(lambda j: sorted( list( set( map( lambda x: x[j], Xd.keys() ) ) ) ), [1,0])
X = []
lbl = []

for mod in mods:
    for snr in test_snrs:
        X.append(Xd[(mod,snr)])
        for i in range(Xd[(mod,snr)].shape[0]):  lbl.append((mod,snr))
X = np.vstack(X)
print(X.shape)

### Partition Data

In [None]:
np.random.seed(2019)
n_examples = X.shape[0]
n_train    = int(round(n_examples * 0.5))
train_idx  = np.random.choice(range(0,n_examples), size=n_train, replace=False)
test_idx   = list(set(range(0,n_examples))-set(train_idx))
X_train    = X[train_idx]
X_test     = X[test_idx]

def to_onehot(yy):
    yy1 = np.zeros([len(yy) ,max(yy)+1])
    yy1[  np.arange(len(yy)),yy] = 1 # ?
    return yy1
Y_train = to_onehot(list(map(lambda x: mods.index(lbl[x][0]), train_idx)))
Y_test = to_onehot(list(map(lambda x: mods.index(lbl[x][0]), test_idx)))

in_shp = list(X_train.shape[1:])
print(X_train.shape, in_shp)
classes = mods

## Build the nets

### CNN2

In [None]:
dr = 0.5 # dropout rate (%)
cnn2 = models.Sequential()
cnn2.add(Reshape([1]+in_shp, input_shape=in_shp))
cnn2.add(ZeroPadding2D((0, 2),data_format='channels_first'))
cnn2.add(Convolution2D(256, (1, 3), padding='valid', activation="relu", name="conv1", kernel_initializer='glorot_uniform', data_format='channels_first'))#ch from 3->4
cnn2.add(Dropout(dr))
cnn2.add(Convolution2D(80, (2, 1), padding='valid', activation="relu", name="conv2", kernel_initializer='glorot_uniform', data_format='channels_first'))
cnn2.add(Dropout(dr))
cnn2.add(Flatten())
cnn2.add(Dense(256, activation='relu', kernel_initializer='he_normal', name="dense1"))
cnn2.add(Dropout(dr))
cnn2.add(Dense(len(classes), kernel_initializer='he_normal', name="dense2"))
cnn2.add(Activation('softmax'))
cnn2.add(Reshape([len(classes)]))
cnn2.compile(loss='categorical_crossentropy', optimizer='adam')
cnn2.summary()

### CNN2-260

In [None]:
cnn2_260 = models.Sequential()
cnn2_260.add(Reshape([1]+in_shp, input_shape=in_shp))
cnn2_260.add(ZeroPadding2D((0, 2),data_format='channels_first'))
cnn2_260.add(Convolution2D(256, (1, 3), padding='valid', activation="relu", kernel_initializer='glorot_uniform', data_format='channels_first'))#ch from 3->4
cnn2_260.add(Dropout(dr))
cnn2_260.add(Convolution2D(80, (2, 1), padding='valid', activation="relu", kernel_initializer='glorot_uniform', data_format='channels_first'))
cnn2_260.add(Dropout(dr))
cnn2_260.add(Flatten())
cnn2_260.add(Dense(260, activation='relu', kernel_initializer='he_normal'))
cnn2_260.add(Dropout(dr))
cnn2_260.add(Dense(len(classes), kernel_initializer='he_normal'))
cnn2_260.add(Activation('softmax'))
cnn2_260.add(Reshape([len(classes)]))
cnn2_260.compile(loss='categorical_crossentropy', optimizer='adam')
cnn2_260.summary()

### Complex

In [None]:
# Define the linear combination
def LC(x):
	y = K.constant([0, 1, 0, -1, 0, 1],shape=[2,3])
	return K.dot(x,K.transpose(y))

In [None]:
complex_CNN = models.Sequential()
complex_CNN.add(Reshape([1]+in_shp, input_shape=in_shp))
complex_CNN.add(ZeroPadding2D((1, 2),data_format='channels_first'))
complex_CNN.add(Convolution2D(256, (2, 3), padding='valid', activation='linear', name="conv1", kernel_initializer='glorot_uniform', data_format='channels_first'))#ch from 3->4
complex_CNN.add(Permute((1,3,2)))
complex_CNN.add(Lambda(LC))
complex_CNN.add(Permute((1,3,2)))
complex_CNN.add(Activation('relu'))
complex_CNN.add(Dropout(dr))
complex_CNN.add(Convolution2D(80, (2, 3), padding='valid', activation="relu", name="conv2", kernel_initializer='glorot_uniform', data_format='channels_first'))
complex_CNN.add(Dropout(dr))
complex_CNN.add(Flatten())
complex_CNN.add(Dense(256, activation='relu', kernel_initializer='he_normal', name="dense1"))
complex_CNN.add(Dropout(dr))
complex_CNN.add(Dense( len(classes), kernel_initializer='he_normal', name="dense2" ))
complex_CNN.add(Activation('softmax'))
complex_CNN.add(Reshape([len(classes)]))
complex_CNN.compile(loss='categorical_crossentropy', optimizer='adam')
complex_CNN.summary()

### Parameterize the Training Process

In [None]:
# Number of epochs
epochs = 100
# Training batch size
batch_size = 1024  

## Train the networks

In [None]:
#train CNN2
start = time.time()
filepath = 'cnn2.wts.h5'
history_cnn2 = cnn2.fit(X_train,
    Y_train,
    batch_size=batch_size,
    epochs=epochs,
    # show_accuracy=False,
    verbose=2,
    validation_data=(X_test, Y_test),
    class_weight='auto',
    callbacks = [
        keras.callbacks.ModelCheckpoint(filepath, monitor='val_loss', verbose=0, save_best_only=True, mode='auto'),
        keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, verbose=0, mode='auto')
    ])
cnn2.load_weights(filepath)
end = time.time()
duration = end - start
print('CNN2 Training time = ' + str(round(duration/60,5)) + 'minutes')

#train CNN2-260
start = time.time()
filepath = 'cnn2_260.wts.h5'
history_cnn2_260 = cnn2_260.fit(X_train,
    Y_train,
    batch_size=batch_size,
    epochs=epochs,
    # show_accuracy=False,
    verbose=2,
    validation_data=(X_test, Y_test),
    class_weight='auto',
    callbacks = [
        keras.callbacks.ModelCheckpoint(filepath, monitor='val_loss', verbose=0, save_best_only=True, mode='auto'),
        keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, verbose=0, mode='auto')
    ])
cnn2_260.load_weights(filepath)
end = time.time()
duration = end - start
print('CNN2-260 Training time = ' + str(round(duration/60,5)) + 'minutes')

#train Complex
start = time.time()
filepath = 'complex.wts.h5'
history_complex = complex_CNN.fit(X_train,
    Y_train,
    batch_size=batch_size,
    epochs=epochs,
    # show_accuracy=False,
    verbose=2,
    validation_data=(X_test, Y_test),
    class_weight='auto',
    callbacks = [
        keras.callbacks.ModelCheckpoint(filepath, monitor='val_loss', verbose=0, save_best_only=True, mode='auto'),
        keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, verbose=0, mode='auto')
    ])
complex_CNN.load_weights(filepath)
end = time.time()
duration = end - start
print('Complex Training time = ' + str(round(duration/60,5)) + 'minutes')

## Explore Results

### Loss Curve Plots

In [None]:
# CNN2 loss curves
plt.figure(figsize = (7,7))
plt.plot(history_cnn2.epoch, history_cnn2.history['loss'], label="CNN2: Training Error + Loss")
plt.plot(history_cnn2.epoch, history_cnn2.history['val_loss'], label="CNN2: Validation Error")
plt.xlabel('Epoch')
plt.ylabel('% Error')
plt.legend()
plt.savefig('train_cnn2.pdf',transparent = True, bbox_inches = 'tight', pad_inches = 0.01)

In [None]:
# CNN2-260 loss curves
plt.figure(figsize = (7,7))
plt.plot(history_cnn2_260.epoch, history_cnn2_260.history['loss'], label="CNN2-260: Training Error + Loss")
plt.plot(history_cnn2_260.epoch, history_cnn2_260.history['val_loss'], label="CNN2-260: Validation Error")
plt.xlabel('Epoch')
plt.ylabel('% Error')
plt.legend()
plt.savefig('train_cnn2_260.pdf',transparent = True, bbox_inches = 'tight', pad_inches = 0.01)

In [None]:
# Complex loss curves
plt.figure(figsize = (7,7))
plt.plot(history_complex.epoch, history_complex.history['loss'], label='Complex: Training Error + Loss')
plt.plot(history_complex.epoch, history_complex.history['val_loss'], label='Complex: Validation Error')
plt.xlabel('Epoch')
plt.ylabel('% Error')
plt.legend()
plt.savefig('train_complex.pdf',transparent = True, bbox_inches = 'tight', pad_inches = 0.01)

### Confusion Matrices

In [None]:
# Create a function to plot the confusion matrices
def plot_confusion_matrix(cm, title='', cmap=plt.cm.Blues, labels=[]):
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    tick_marks = np.arange(len(labels))
    plt.xticks(tick_marks, labels, rotation=45)
    plt.yticks(tick_marks, labels)
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
# Plot confusion matrix
test_Y_hat = cnn2.predict(X_test, batch_size=batch_size)
conf = np.zeros([len(classes),len(classes)])
confnorm = np.zeros([len(classes),len(classes)])
for i in range(0,X_test.shape[0]):
    j = list(Y_test[i,:]).index(1)
    k = int(np.argmax(test_Y_hat[i,:]))
    conf[j,k] = conf[j,k] + 1
for i in range(0,len(classes)):
    confnorm[i,:] = conf[i,:] / np.sum(conf[i,:])
cor = np.sum(np.diag(conf))
ncor = np.sum(conf) - cor
print("Overall Accuracy - CNN2: ", cor / (cor+ncor))
acc = 1.0*cor/(cor+ncor)
plt.figure()
plot_confusion_matrix(confnorm, labels=classes)
plt.savefig('Confusion_CNN2.jpg',transparent = True, bbox_inches = 'tight', pad_inches = 0.01)

In [None]:
# Plot confusion matrix
test_Y_hat = cnn2_260.predict(X_test, batch_size=batch_size)
conf = np.zeros([len(classes),len(classes)])
confnorm = np.zeros([len(classes),len(classes)])
for i in range(0,X_test.shape[0]):
    j = list(Y_test[i,:]).index(1)
    k = int(np.argmax(test_Y_hat[i,:]))
    conf[j,k] = conf[j,k] + 1
for i in range(0,len(classes)):
    confnorm[i,:] = conf[i,:] / np.sum(conf[i,:])
cor = np.sum(np.diag(conf))
ncor = np.sum(conf) - cor
print("Overall Accuracy - CNN2-260: ", cor / (cor+ncor))
acc = 1.0*cor/(cor+ncor)
plt.figure()
plot_confusion_matrix(confnorm, labels=classes)
plt.savefig('Confusion_CNN2_260.jpg',transparent = True, bbox_inches = 'tight', pad_inches = 0.01)

In [None]:
# Plot confusion matrix
test_Y_hat = complex_CNN.predict(X_test, batch_size=batch_size)
conf = np.zeros([len(classes),len(classes)])
confnorm = np.zeros([len(classes),len(classes)])
for i in range(0,X_test.shape[0]):
    j = list(Y_test[i,:]).index(1)
    k = int(np.argmax(test_Y_hat[i,:]))
    conf[j,k] = conf[j,k] + 1
for i in range(0,len(classes)):
    confnorm[i,:] = conf[i,:] / np.sum(conf[i,:])
cor = np.sum(np.diag(conf))
ncor = np.sum(conf) - cor
print("Overall Accuracy - Complex: ", cor / (cor+ncor))
acc = 1.0*cor/(cor+ncor)
plt.figure()
plot_confusion_matrix(confnorm, labels=classes)
plt.savefig('Confusion_Complex.jpg',transparent = True, bbox_inches = 'tight', pad_inches = 0.01)

### Accuracy by SNR (Confusion Matrices @ -20 dB and 20 dB)

In [None]:
# create one hot labels
labels_oh       = np.eye(11)
samples_db      = np.zeros((20, 11000, 2, 128))
truth_labels_db = np.zeros((20, 11000, 11))

# Pull out the data by SNR
for i in range(len(test_snrs)):
    for j in range(len(mods)):
        samples_db[i, j*1000:(j+1)*1000,:,:]    = Xd[(mods[j],test_snrs[i])]
        truth_labels_db[i, j*1000:(j+1)*1000,:] = labels_oh[j]

In [None]:
# Plot confusion matrix
acc_cnn2 = np.zeros(len(test_snrs))
for s in range(20):

    # extract classes @ SNR
#     test_SNRs = map(lambda x: lbl[x][1], test_idx)
    test_X_i = samples_db[s]
    test_Y_i = truth_labels_db[s]
    
    # estimate classes
    test_Y_i_hat = cnn2.predict(test_X_i)
    conf = np.zeros([len(mods),len(mods)])
    confnorm = np.zeros([len(mods),len(mods)])
    for i in range(0,test_X_i.shape[0]):
        j = list(test_Y_i[i,:]).index(1)
        k = int(np.argmax(test_Y_i_hat[i,:]))
        conf[j,k] = conf[j,k] + 1
    for i in range(0,len(mods)):
        confnorm[i,:] = conf[i,:] / np.sum(conf[i,:])
    #print the confusion matrix @ -20dB and 20dB
    if s == 0 or s == 19:
        plt.figure()
        plot_confusion_matrix(confnorm, labels=classes)
        plt.savefig('Confusion_CNN2_'+str(s)+'.jpg',transparent = True, bbox_inches = 'tight', pad_inches = 0.01)
    cor = np.sum(np.diag(conf))
    ncor = np.sum(conf) - cor
#     print("Overall Accuracy: ", cor / (cor+ncor))
    acc_cnn2[s] = 1.0*cor/(cor+ncor)
# Save results to a pickle file for plotting later
print(acc_cnn2)

In [None]:
# Plot confusion matrix
acc_cnn2_260 = np.zeros(len(test_snrs))
for s in range(20):

    # extract classes @ SNR
#     test_SNRs = map(lambda x: lbl[x][1], test_idx)
    test_X_i = samples_db[s]
    test_Y_i = truth_labels_db[s]
    
    # estimate classes
    test_Y_i_hat = cnn2_260.predict(test_X_i)
    conf = np.zeros([len(mods),len(mods)])
    confnorm = np.zeros([len(mods),len(mods)])
    for i in range(0,test_X_i.shape[0]):
        j = list(test_Y_i[i,:]).index(1)
        k = int(np.argmax(test_Y_i_hat[i,:]))
        conf[j,k] = conf[j,k] + 1
    for i in range(0,len(mods)):
        confnorm[i,:] = conf[i,:] / np.sum(conf[i,:])
    #print the confusion matrix @ -20dB and 20dB
    if s == 0 or s == 19:
        plt.figure()
        plot_confusion_matrix(confnorm, labels=classes)
        plt.savefig('Confusion_CNN2_260_'+str(s)+'.jpg',transparent = True, bbox_inches = 'tight', pad_inches = 0.01)    
    cor = np.sum(np.diag(conf))
    ncor = np.sum(conf) - cor
#     print("Overall Accuracy: ", cor / (cor+ncor))
    acc_cnn2_260[s] = 1.0*cor/(cor+ncor)
# Save results to a pickle file for plotting later
print(acc_cnn2_260)

In [None]:
# Plot confusion matrix
acc_complex = np.zeros(len(test_snrs))
for s in range(20):

    # extract classes @ SNR
#     test_SNRs = map(lambda x: lbl[x][1], test_idx)
    test_X_i = samples_db[s]
    test_Y_i = truth_labels_db[s]
    
    # estimate classes
    test_Y_i_hat = complex_CNN.predict(test_X_i)
    conf = np.zeros([len(mods),len(mods)])
    confnorm = np.zeros([len(mods),len(mods)])
    for i in range(0,test_X_i.shape[0]):
        j = list(test_Y_i[i,:]).index(1)
        k = int(np.argmax(test_Y_i_hat[i,:]))
        conf[j,k] = conf[j,k] + 1
    for i in range(0,len(mods)):
        confnorm[i,:] = conf[i,:] / np.sum(conf[i,:])
    #print the confusion matrix @ -20dB and 20dB
    if s == 0 or s == 19:
        plt.figure()
        plot_confusion_matrix(confnorm, labels=classes)
        plt.savefig('Confusion_Complex_'+str(s)+'.jpg',transparent = True, bbox_inches = 'tight', pad_inches = 0.01)
    cor = np.sum(np.diag(conf))
    ncor = np.sum(conf) - cor
#     print("Overall Accuracy: ", cor / (cor+ncor))
    acc_complex[s] = 1.0*cor/(cor+ncor)
# Save results to a pickle file for plotting later
print(acc_complex)

### Accuracy Comparison of the Architectures

In [None]:
plt.scatter(range(-20,20,2),acc_cnn2*100, label = "CNN2", color = 'coral')
plt.scatter(range(-20,20,2),acc_cnn2_260*100, label = "CNN2-260", color = 'lightblue')
plt.scatter(range(-20,20,2),acc_complex*100, label = 'Complex', color = 'green')
plt.grid()
plt.xlabel('SNR (dB)')
plt.ylabel('Accuracy (%)')
plt.legend()
plt.savefig('Classification_Accuracy_All.jpg',transparent = True, bbox_inches = 'tight', pad_inches = 0.01)

In [None]:
plt.scatter(range(-20,20,2),acc_cnn2*100, label = "CNN2", color = 'coral')
plt.scatter(range(-20,20,2),acc_complex*100, label = 'Complex', color = 'green')
plt.grid()
plt.xlabel('SNR (dB)')
plt.ylabel('Accuracy (%)')
plt.legend()
plt.savefig('Classification_Accuracy_CNN2.jpg',transparent = True, bbox_inches = 'tight', pad_inches = 0.01)

In [None]:
plt.scatter(range(-20,20,2),acc_cnn2_260*100, label = "CNN2-260", color = 'lightblue')
plt.scatter(range(-20,20,2),acc_complex*100, label = 'Complex', color = 'green')
plt.grid()
plt.xlabel('SNR (dB)')
plt.ylabel('Accuracy (%)')
plt.legend()
plt.savefig('Classification_Accuracy_CNN2_260.jpg',transparent = True, bbox_inches = 'tight', pad_inches = 0.01)

In [None]:
plt.scatter(range(-20,20,2), (acc_complex-acc_cnn2)/acc_cnn2*100, label = "CNN2", color = 'coral')
plt.scatter(range(-20,20,2), (acc_complex-acc_cnn2_260)/acc_cnn2_260*100, label = "CNN2-260", color = 'lightblue')
plt.xlabel('SNR (dB)')
plt.ylabel('Classification Improvement (%)')
plt.grid()
plt.legend()
plt.savefig('Classification_Improvement_All.jpg',transparent = True, bbox_inches = 'tight', pad_inches = 0.01)

In [None]:
plt.scatter(range(-20,20,2), (acc_complex-acc_cnn2)/acc_cnn2*100, label = "CNN2", color = 'coral')
plt.xlabel('SNR (dB)')
plt.ylabel('Classification Improvement (%)')
plt.grid()
plt.legend()
plt.savefig('Classification_Improvement_CNN2.jpg',transparent = True, bbox_inches = 'tight', pad_inches = 0.01)

In [None]:
plt.scatter(range(-20,20,2), (acc_complex-acc_cnn2_260)/acc_cnn2_260*100, label = "CNN2-260", color = 'lightblue')
plt.xlabel('SNR (dB)')
plt.ylabel('Classification Improvement (%)')
plt.grid()
plt.legend()
plt.savefig('Classification_Improvement_CNN2_260.jpg',transparent = True, bbox_inches = 'tight', pad_inches = 0.01)