In [None]:
#Multi-task Convolutional Neural Network for Radio Frequency Signal Classification
#Author: Chelsea Cook
#Date: 12 December 2021

#Data source: ANDRO Computational Solutions, LLC (2020). RadarCommDataset [Data set]. 
#GitHub. https://github.com/ANDROComputationalSolutions/RadarCommDataset


In [None]:
#Import packages

import pandas as pd
import numpy as np
import ast
import sys
import argparse
import matplotlib.pyplot as plt
import h5py
import decimal
import keras
import plotly
from sklearn import preprocessing
import time

In [None]:
#Access dataset from file

dkeys = []  # labels of each entry in the h5
W = []      # dataset container
Wq = []
Wi = []

with h5py.File("C:\\Users\\lason\\OneDrive\\Documents\\DSC CAPSTONE\\RadComDynamic\\RadComDynamic.hdf5", 'r') as f:
    for key in f.keys():
        dkeys.append(key)
        W.append(f[key][:])
        #print(key)
        #print('real', f[key][:128])
        #print('imag', f[key][128:])
f.close()

In [None]:
#Break out values in dkeys to see each line of keys

dkeys_2 = [ast.literal_eval(i) for i in dkeys]

In [None]:
dkeys_2

In [None]:
#Put keys into dataframe with column headers

dkeys_df = pd.DataFrame(dkeys_2, columns = ['Modulation', 'Signal_Type', 'SNR', 'Sample_Value'])

In [None]:
for col in dkeys_df.columns:
    print(col)

In [None]:
#View keys
dkeys_df


In [None]:
#Look at mean SNR values across modulation types to see how they are distributed

dkeys_df.groupby(['Modulation'])['SNR'].mean()

In [None]:
#Look at mean SNR values across signal types to see how they are distributed

dkeys_df.groupby(['Signal_Type'])['SNR'].mean()

In [None]:
#Print variable types for keys

dkeys_df.dtypes

In [None]:
#Explore distribution of modulation categories

dkeys_df["Modulation"].value_counts()

In [None]:
#Explore distribution of signal type categories

dkeys_df["Signal_Type"].value_counts()

In [None]:
#Print number of samples with each SNR value

dkeys_df["SNR"].value_counts().sort_index()

In [None]:
#Print number of times each sample value appears (total number of waveforms with 700 snapshots)

dkeys_df["Sample_Value"].value_counts().sort_index()

In [None]:
#Understanding type and shape of waveform data object (W)

print(W[0])
print(type(W))

print(np.shape(W))

In [None]:
#Put waveform data (I/Q) into dataframe

w_df = pd.DataFrame(W)

In [None]:
#View the dataframe

w_df.head()

In [None]:
#Join keys to waveform (I/Q) data

df_combined = pd.merge(dkeys_df, w_df, left_index=True, right_index=True)

In [None]:
print(df_combined[:5])

In [None]:
#Create Train/Validation/Test sets by shuffling data first for random distribution of samples
#Set seed to prevent variation in sets on subsequent compilation

np.random.seed(74)
df_shuffled = np.array(df_combined)
np.random.shuffle(df_shuffled)
# df_shuffled = pd.DataFrame(df_shuffled, columns = df_combined.columns)
df_shuffled[0]


In [None]:
#Put this array back into a dataframe for easier separation of dependent and independent variables

full_shuffled_df = pd.DataFrame(df_shuffled, columns = df_combined.columns)
full_shuffled_df.head()

In [None]:
#Recreate array of I/Q values to reshape for modeling

IQ_separate = full_shuffled_df.iloc[: ,4:]
IQ_separate.head()

In [None]:
IQ_array = np.asarray(IQ_separate).astype(np.float32)
IQ_array.ndim
np.shape(IQ_array)

In [None]:
#Reshape and check dimensions of I/Q array for use in CNN modeling

CNN_Tensors = IQ_array.reshape((125361,16,16,1))
np.shape(CNN_Tensors)
CNN_Tensors.ndim
CNN_Tensors[0].ndim
CNN_Tensors.astype(np.float32)

In [None]:
#Create training, validation and test sets for each DV/IV by separating the variable, ensuring it is an array (for acceptance 
#in the models), and then splitting its contents 70/20/10

#I/Q data (waveforms)

IQData = CNN_Tensors

IQ_train = IQData[:87753]
IQ_val = IQData[87753:112825]
IQ_test = IQData[112825:]

#Modulation

ModData = full_shuffled_df["Modulation"]
ModData_array = ModData.to_numpy()

Mod_train = ModData_array[:87753]
Mod_val = ModData_array[87753:112825]
Mod_test = ModData_array[112825:]

#Signal Type

SigData = full_shuffled_df["Signal_Type"]
SigData_array = SigData.to_numpy()

Sig_train = SigData_array[:87753]
Sig_val = SigData_array[87753:112825]
Sig_test = SigData_array[112825:]



In [None]:
#First single-task CNN

from keras import layers
from keras import models

model = models.Sequential()
model.add(layers.Conv2D(32, (3,3), activation = 'relu', input_shape=(16,16,1)))
model.add(layers.MaxPooling2D((2,2)))
model.add(layers.Conv2D(64, (3,3), activation = 'relu'))

model.summary()

In [None]:
#Flatten output from convolutional layers to add dense classifier layers

model.add(layers.Flatten())
model.add(layers.Dense(64, activation = 'relu'))
model.add(layers.Dense(6, activation = 'softmax'))

model.summary()

In [None]:
#Classify modulation type using CNN; first, categorical data (modulation types) must be encoded

Mod_train_dummies = pd.get_dummies(Mod_train).astype(np.float32)
Mod_val_dummies = pd.get_dummies(Mod_val).astype(np.float32)
print(Mod_train_dummies[:5])
np.shape(Mod_train_dummies)

In [None]:
#Compile model

model.compile(optimizer='rmsprop',
             loss='categorical_crossentropy',
             metrics = ['accuracy'])

#Train model over 15 epochs

history = model.fit(IQ_train, Mod_train_dummies, epochs=15, batch_size=64, validation_data=(IQ_val, Mod_val_dummies))

In [None]:
#Visualize accuracy of modulation classification over epochs

import matplotlib.pyplot as plt

acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
loss = history.history['loss']

epochs = range(1, len(loss) + 1)

plt.plot(epochs, acc, 'bo', label = 'Training acc')
plt.plot(epochs, val_acc, 'b', label = 'Validation acc')
plt.ylim((.45,.8))
plt.title('Training and validation accuracy - Modulation')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.show()


#Chart demonstrates that accuracy on the validation data flattens out after 10 epochs and model begins to overfit


In [None]:
#Encode signal type data
Sig_train_dummies = pd.get_dummies(Sig_train).astype(np.float32)
Sig_val_dummies = pd.get_dummies(Sig_val).astype(np.float32)

print(Sig_train_dummies[:5])
type(Sig_train_dummies)

In [None]:
#Model signal type in single-task CNN

sigmodel = models.Sequential()
sigmodel.add(layers.Conv2D(32, (3,3), activation = 'relu', input_shape=(16,16,1)))
sigmodel.add(layers.MaxPooling2D((2,2)))
sigmodel.add(layers.Conv2D(64, (3,3), activation = 'relu'))

sigmodel.add(layers.Flatten())
sigmodel.add(layers.Dense(64, activation = 'relu'))
sigmodel.add(layers.Dense(8, activation = 'softmax'))

sigmodel.compile(optimizer='rmsprop',
             loss='categorical_crossentropy',
             metrics = ['accuracy'])

history2 = sigmodel.fit(IQ_train, Sig_train_dummies, epochs=15, batch_size=64, validation_data=(IQ_val, Sig_val_dummies))

In [None]:
#Visualize accuracy of signal type classification over epochs

import matplotlib.pyplot as plt

accy = history2.history['accuracy']
val_accy = history2.history['val_accuracy']
loss = history2.history['loss']

epochs = range(1, len(loss) + 1)

plt.plot(epochs, accy, 'bo', label = 'Training acc')
plt.plot(epochs, val_accy, 'b', label = 'Validation acc')
plt.ylim((.45,.8))
plt.title('Training and validation accuracy - Signal')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.show()

#Chart demonstrates that accuracy on the validation data improves through ten epochs, then begins to decline and the model overfits

In [None]:
#Multi-task learning model (CNN) for both Modulation and Signal Classification

from keras import Input
from keras.models import Model

IQ_input = Input(shape=(16,16,1), dtype='float32')

x = layers.Conv2D(64, (3,3), activation = 'relu')(IQ_input)
x = layers.MaxPooling2D(2,2)(x)
#x = layers.Conv2D(64, (3,3), activation = 'relu')(x)
#x = layers.MaxPooling2D(2,2)(x)
#x = layers.Flatten()(x)
#x = layers.Dense(64, activation = 'relu')(x)

mod_predict = layers.Conv2D(32, (3,3), activation = 'relu')(x)
mod_predict = layers.Dropout(0.5)(mod_predict)
mod_predict = layers.Flatten()(mod_predict)
mod_predict = layers.Dense(256, activation = 'relu')(mod_predict)
mod_predict = layers.Dense(6, activation = 'softmax', name = 'modtype')(mod_predict)

sig_predict = layers.Conv2D(32, (3,3), activation = 'relu')(x)
sig_predict = layers.Dropout(0.5)(sig_predict)
sig_predict = layers.Flatten()(sig_predict)
sig_predict = layers.Dense(256, activation = 'relu')(sig_predict)
sig_predict = layers.Dense(8, activation = 'softmax', name = 'sigtype')(sig_predict)

MTL = Model(IQ_input,
               [mod_predict, sig_predict])


In [None]:
MTL.compile(optimizer = 'adam', loss=['categorical_crossentropy', 'categorical_crossentropy'], loss_weights = [.2, .8], metrics = ['accuracy'])

In [None]:
Predictions = MTL.fit(IQ_train, {'modtype': Mod_train_dummies, 'sigtype': Sig_train_dummies}, validation_data=(IQ_val, {'modtype': Mod_val_dummies, 'sigtype': Sig_val_dummies}), epochs = 30, batch_size = 64)

In [None]:
#Chart training set accuracy against validation set accuracy (modulation first, then signal type)
#Observe where validation accuracy peaks and overfitting become evident (train/val accuracy diverge)

import plotly.graph_objects as go

plt.clf()
modchart = go.Figure()
modchart.add_trace(go.Scatter(
                        y=Predictions.history['modtype_accuracy'],
                        name='Training'))
modchart.add_trace(go.Scatter(
                        y=Predictions.history['val_modtype_accuracy'],
                        name='Validation'))

modchart.update_layout(height=500,
                      width=700,
                      yaxis_range=[.5,.8],
                      title='MTL Modulation Accuracy',
                      xaxis_title='Epoch',
                      yaxis_title='Accuracy')

modchart.show()

In [None]:
plt.clf()
sigchart = go.Figure()
sigchart.add_trace(go.Scatter(
                        y=Predictions.history['sigtype_accuracy'],
                        name='Training'))
sigchart.add_trace(go.Scatter(
                        y=Predictions.history['val_sigtype_accuracy'],
                        name='Validation'))

sigchart.update_layout(height=500,
                      width=700,
                      yaxis_range=[.5,.8],
                      title='MTL Signal Type Accuracy',
                      xaxis_title='Epoch',
                      yaxis_title='Accuracy')

sigchart.show()

In [None]:
#Get predicted values for Modulation and Signal Type to compare to actual values and plot accuracy by SNR
#Predictions are returned as probabilities for each class, must encode so that the positive class is 1 and remainder are 0
#Convert encoded prediction values to class names and match against actuals, calculate prediction accuracy by SNR

MTL_predict= MTL.predict(IQ_train)
lb_sig = preprocessing.LabelBinarizer()
lb_mod = preprocessing.LabelBinarizer()

modlabels = np.argmax(MTL_predict[0], axis=-1)
bin_modlabels = lb_mod.fit_transform(modlabels)

siglabels = np.argmax(MTL_predict[1], axis=-1)
bin_siglabels = lb_sig.fit_transform(siglabels)


In [None]:
#Reformat modulation predictions into dummies dataframe

mod_predictions = pd.DataFrame(bin_modlabels, columns=Mod_train_dummies.columns)
print(mod_predictions[:5])


In [None]:
#Reformat signal predictions into dummies dataframe

sig_predictions = pd.DataFrame(bin_siglabels, columns=Sig_train_dummies.columns)
print(sig_predictions[:5])

In [None]:
#Collapse binary labels into class names

collapsed_mod_predictions = mod_predictions.idxmax(axis=1)
print(collapsed_mod_predictions[:5])

In [None]:
collapsed_sig_predictions = sig_predictions.idxmax(axis=1)
print(collapsed_sig_predictions[:5])

In [None]:
#Get the actual modulation and signal classes, as well as SNR, from the training dataset

Subset_df = full_shuffled_df[["Modulation","Signal_Type","SNR","Sample_Value"]]
print(Subset_df.head())

In [None]:
#Create combined dataframe with the predicted classes

Add_mod = pd.merge(Subset_df, collapsed_mod_predictions.rename('Mod_Predictions'), left_index=True, right_index=True)

Compare_df = pd.merge(Add_mod, collapsed_sig_predictions.rename('Sig_Predictions'), left_index=True, right_index=True)
print(Compare_df.head())

print(len(Compare_df))

In [None]:
#Create tables of prediction accuracy at each SNR, including SNR, number of correct predictions, total observations, and % accuracy

SNR_values = Compare_df.SNR.value_counts()
SNR_values.index

mod_acc_list = []
sig_acc_list = []

for snr in SNR_values.index:
    counter_mod = 0
    counter_sig = 0
    temp_df = Compare_df[Compare_df['SNR']==snr].reset_index(drop=True)
    for x in range(temp_df.shape[0]):
        if temp_df['Modulation'][x] == temp_df['Mod_Predictions'][x]:
            counter_mod+=1
        if temp_df['Signal_Type'][x] == temp_df['Sig_Predictions'][x]:
            counter_sig+=1
    mod_snr_result = counter_mod/Compare_df[Compare_df['SNR']==snr].shape[0]
    sig_snr_result = counter_sig/Compare_df[Compare_df['SNR']==snr].shape[0]
    mod_acc_list.append([snr, mod_snr_result, counter_mod, Compare_df[Compare_df['SNR']==snr].shape[0]])
    sig_acc_list.append([snr, sig_snr_result, counter_sig, Compare_df[Compare_df['SNR']==snr].shape[0]])

In [None]:
#Create chart of modulation accuracy by SNR as dataframe and print in order from low to high SNR
mod_acc_list

mod_acc_df = pd.DataFrame(mod_acc_list, columns = ['SNR', 'Accuracy', 'Match', 'Total_Rows'])
sig_acc_df = pd.DataFrame(sig_acc_list, columns = ['SNR', 'Accuracy', 'Match', 'Total_Rows'])

In [None]:
mod_acc_by_snr = mod_acc_df.sort_values("SNR", ascending=True)
mod_acc_by_snr

In [None]:
#Create same chart for signal type

sig_acc_by_snr = sig_acc_df.sort_values("SNR", ascending=True)
sig_acc_by_snr

In [None]:
#Plot modulation and signal accuracy on training data by SNR

ax = plt.gca()

mod_acc_by_snr.plot(kind='line',x='SNR',y='Accuracy', ax=ax)
plt.title('Modulation Accuracy by SNR')
plt.xlabel('SNR')
plt.ylabel('Modulation Accuracy')
plt.show()

In [None]:
#Chart for signal type

ax = plt.gca()

sig_acc_by_snr.plot(kind='line',x='SNR',y='Accuracy', ax=ax)
plt.title('Signal Accuracy by SNR')
plt.xlabel('SNR')
plt.ylabel('Signal Accuracy')
plt.show()

In [None]:
#Test final MTL model on Test dataset
#Use time to print how long it takes to predict

#First, encode testdata
Mod_test_dummies = pd.get_dummies(Mod_test).astype(np.float32)
Sig_test_dummies = pd.get_dummies(Sig_test).astype(np.float32)

tic=time.perf_counter()
TestSetPredictions = MTL.evaluate(IQ_test, {'modtype': Mod_test_dummies, 'sigtype': Sig_test_dummies})
toc=time.perf_counter()
print(f"Predicted 25,072 values in {toc - tic:0.4f} seconds")

In [None]:
#Get predicted values for Modulation and Signal Type to compare to actual values and plot accuracy by SNR
#Predictions are returned as probabilities for each class, must encode so that the positive class is 1 and remainder are 0
#Convert encoded prediction values to class names and match against actuals, calculate prediction accuracy by SNR

MTL_predict_test= MTL.predict(IQ_test)
lb_sig1 = preprocessing.LabelBinarizer()
lb_mod1 = preprocessing.LabelBinarizer()

modtestlabels = np.argmax(MTL_predict_test[0], axis=-1)
bin_modtestlabels = lb_mod1.fit_transform(modtestlabels)

sigtestlabels = np.argmax(MTL_predict_test[1], axis=-1)
bin_sigtestlabels = lb_sig1.fit_transform(sigtestlabels)

In [None]:
#Reformat modulation predictions into dummies dataframe

mod_test_predictions = pd.DataFrame(bin_modtestlabels, columns=Mod_test_dummies.columns)
print(mod_test_predictions[:5])


In [None]:
#Reformat signal predictions into dummies dataframe

sig_test_predictions = pd.DataFrame(bin_sigtestlabels, columns=Sig_test_dummies.columns)
print(sig_test_predictions[:5])

In [None]:
#Collapse binary labels into class names

collapsed_mod_test_predictions = mod_test_predictions.idxmax(axis=1)
print(collapsed_mod_test_predictions[:5])

In [None]:
collapsed_sig_test_predictions = sig_test_predictions.idxmax(axis=1)
print(collapsed_sig_test_predictions[:5])

In [None]:
print(len(Subset_df))

In [None]:
print(len(collapsed_mod_test_predictions))
print(collapsed_mod_test_predictions.head)

In [None]:
#Join test predictions with test portion of dataset to compare for accuracy 

Test_Subset_df = Subset_df[112825:]
Reset=Test_Subset_df.reset_index(drop=True)

Add_test_mod = pd.merge(Reset, collapsed_mod_test_predictions.rename('Mod_Test_Predictions'), left_index=True, right_index=True)

Final_Predictions_df = pd.merge(Add_test_mod, collapsed_sig_test_predictions.rename('Sig_Test_Predictions'), left_index=True, right_index=True)
print(Final_Predictions_df.head())

In [None]:
SNR_final_values = Final_Predictions_df.SNR.value_counts()
SNR_final_values.index

mod_accy_list = []
sig_accy_list = []

for snr1 in SNR_final_values.index:
    counter_mod1 = 0
    counter_sig1 = 0
    temp_df1 = Final_Predictions_df[Final_Predictions_df['SNR']==snr1].reset_index(drop=True)
    for x in range(temp_df1.shape[0]):
        if temp_df1['Modulation'][x] == temp_df1['Mod_Test_Predictions'][x]:
            counter_mod1+=1
        if temp_df1['Signal_Type'][x] == temp_df1['Sig_Test_Predictions'][x]:
            counter_sig1+=1
    mod_snr_results = counter_mod1/Final_Predictions_df[Final_Predictions_df['SNR']==snr1].shape[0]
    sig_snr_results = counter_sig1/Final_Predictions_df[Final_Predictions_df['SNR']==snr1].shape[0]
    mod_accy_list.append([snr1, mod_snr_results, counter_mod1, Final_Predictions_df[Final_Predictions_df['SNR']==snr1].shape[0]])
    sig_accy_list.append([snr1, sig_snr_results, counter_sig1, Final_Predictions_df[Final_Predictions_df['SNR']==snr1].shape[0]])

In [None]:
mod_accy_df = pd.DataFrame(mod_accy_list, columns = ['SNR', 'Accuracy', 'Match', 'Total_Rows'])
sig_accy_df = pd.DataFrame(sig_accy_list, columns = ['SNR', 'Accuracy', 'Match', 'Total_Rows'])

In [None]:
mod_accy_by_snr = mod_accy_df.sort_values("SNR", ascending=True)
mod_accy_by_snr

In [None]:
sig_accy_by_snr = sig_accy_df.sort_values("SNR", ascending=True)
sig_accy_by_snr

In [None]:
#Chart accuracy by SNR

xm=mod_accy_by_snr['SNR']
ym=mod_accy_by_snr['Accuracy']
xs=sig_accy_by_snr['SNR']
ys=sig_accy_by_snr['Accuracy']

fig, ax = plt.subplots()

ax.plot(xm,ym, label='Modulation')
ax.plot(xs,ys, label='Signal Type')
ax.legend(loc = 'lower right')
plt.title('Modulation & Signal Prediction Accuracy by SNR')
plt.xlabel('SNR')
plt.ylabel('Accuracy')
plt.show()

In [None]:
#Code provided by ANDRO in GitHub to view/print the waveform images

def parse_args():
    "Parse the command line arguments"
    parser = argparse.ArgumentParser()
    parser.add_argument("-num", type=int,default="0",
                        help="Which sample to pick. 0 to 699")
    parser.add_argument("-snr", type=int,default="-10",
                        help="SNR: -20 to 18 in 2 step increments")
    parser.add_argument("-mod",default="amdsb",
                        help="Modulation options: pulsed,fmcw,bpsk,amdsb,amssb,ask")
    parser.add_argument("-sig",default="AM radio",
                        help="Signal type options: Airborne-detection,Airborne-range,Air-Ground-MTI,Ground mapping,Radar-Altimeter,Satcom,AM radio,short-range")
    return parser.parse_known_args()
#     return parser.parse_args()

In [None]:
def main():
    args, unknown = parse_args()
#     args = parse_args()
    with h5py.File('C:\\Users\\lason\\OneDrive\\Documents\\DSC CAPSTONE\\RadComDynamic\\RadComDynamic.hdf5', 'r') as f:
        key = args.mod,args.sig,args.snr,args.num 
        waveform = f[str(key)][:]
        real = waveform[0:128]
        imag = waveform[128:]
    f.close()
   
    # Plot and visualize the selected sample
    plt.figure(figsize=[8, 6])
    plt.plot(real,'-go')
    plt.plot(imag,'-bo')
    plt.title(str(key), fontsize=16)
    plt.show()
if __name__ == "__main__":
    print('Stop')
    sys.exit(not main())

In [None]:
#Another way to print the waveform image

for idx, val in enumerate(dkeys):
    real = W[idx][0:128]
    imag = W[idx][128:]
    
    plt.figure(figsize=[8, 6])
    plt.plot(real,'-go')
    plt.plot(imag,'-bo')
    plt.title(val, fontsize=16)
    plt.show()