In [None]:
import time
import _pickle as cPickle

import numpy as np
from numpy.random import seed

import pandas as pd

import matplotlib.pyplot as plt
%matplotlib widget

from scipy.fft import fft, fftfreq, fftshift
from scipy.signal import find_peaks, savgol_filter

from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split

from tensorflow.random import set_seed
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, BatchNormalization
from tensorflow.keras.optimizers import Adam

from keras import callbacks 


**test**: This variable corresponds to the name to be used to save the results of the test.

In [None]:
test = 'test1'

We import the dataset used for this test: **RadioML 2016.10A**.
<br>
This dataset can be found in https://www.deepsig.ai/datasets/

In [None]:
Xd = pd.read_pickle("/home/ymondino/Documents/repos/machinelearning/RML2016/RML2016.10a_dict.pkl")

The variables **snrs** and **mods** contain all the SNR values and modulation types that exist in the dataset.

In [None]:
snrs, mods = map(lambda j: sorted(set(map(lambda x: x[j], Xd.keys()))), [1,0])

We do not make use of all SNR values, so **usedSnr** contains the used SNR values for training, testing and validating the model.

In [None]:
usedSnr = [ -10, -8, -6, -4, -2, 0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 

The variable **X** contains the signals that will be used for training, testing and validating the model. While **lbl** contains the information about the modulation type and SNR value of each of these signals.

In [None]:
X = []  
lbl = []

for mod in mods:
    for snr in usedSnr:
        X.append(Xd[(mod,snr)]) 
        for i in range(Xd[(mod,snr)].shape[0]):  lbl.append((mod,snr))
X = np.vstack(X)
X.shape

**X_mods** contains only the information about modulation type of each signal example.

In [None]:
X_mods = np.array(list(map(lambda x: lbl[x][0],range(0,len(lbl)))))
len(X_mods)

**mods** contains the list of modulation types that we will use in this classifier.

In [None]:
mods = ['BPSK','QAM16','QPSK','8PSK','GFSK','CPFSK','PAM4']

Each combination of SNR value and modulation type has 1000 signal examples, we choose 600 of these 1000 signals to be used for testing the model.

In [None]:
n_examples = 1000

n_test = 600       

test_idx = np.random.choice(list(set(range(0, n_examples))), size = n_test, replace = False)

test_idx[0:5]

**dataset** contains the identification of the 400 signal examples, of each combination of SNR value and modulation type, used for training and validating the model.

In [None]:
dataset = np.random.choice(list(set(range(0, n_examples)) - set(test_idx)), size = 400, replace = False)
len(dataset)

**calcFeatures** calculates all the input features of the neural network.
<br><br>
PARAMETERS:
    <br>- **X**: Signal examples.
    <br>- **idx**: Indexes of the signal examples to consider from all the ones contained in X.
    <br>- **X_mods**: Modulation types of the signal examples in X.
    <br>- **lbl**: Information of modulation type and SNR value of the signal examples in X.
    <br>- **usedSnr**: List of SNR values present in X.
<br><br>
RETURNS:
    <br>- Nine arrays (one for each feature) with the calculated feature values for each of the signal examples.

In [None]:
def calcFeatures (X, idx, mods, X_mods, lbl, usedSnr):
      
    n_train = len(idx)
    n_snr = len(usedSnr)
    
    C20_values = np.zeros((len(mods) * n_snr, n_train))
    C60_values = np.zeros((len(mods) * n_snr, n_train))
    
    skewness_values = np.zeros((len(mods) * n_snr, n_train))
    skewness_f_values = np.zeros((len(mods) * n_snr, n_train))
    
    sigma_v_values = np.zeros((len(mods) * n_snr, n_train))
    sigma_deltap_values = np.zeros((len(mods) * n_snr, n_train)) 
    
    std_amp_values = np.zeros((len(mods) * n_snr, n_train))
    mean_amp_values = np.zeros((len(mods) * n_snr, n_train))
    mean_temp_values = np.zeros((len(mods) * n_snr, n_train))
        
    for i,mod in enumerate (mods): 
        idx_sameMod = np.where(np.array(X_mods) == mod)[0]
        X_sameMod = X[idx_sameMod]  
        
        for j,snr in enumerate (usedSnr):  
            snr_sameMod = np.array(list(map(lambda x: lbl[x][1], idx_sameMod)))
            idx_sameMod_snr = np.where(np.array(snr_sameMod) == snr)[0]
            X_sameMod_snr = X_sameMod[idx_sameMod_snr]   
            
            X_idx = X_sameMod_snr[idx]

            for ex in range(0, X_idx.shape[0]):
                
                X_0 = X_idx[ex][0]
                X_1 = X_idx[ex][1]
                

                signal = X_0 + X_1 * 1j
                
                # The next normalization is the one implemented on the public dataset.
                # We implement it in the same way to normalize the cases when the signals are interfered
                ener = sum(abs(signal))
                signal = signal / np.sqrt(ener)
                
                ampli = abs(signal) 
                
                X_0 = np.real(signal)
                X_1 = np.imag(signal)
                
                ma = ampli.mean() 
                
                acn = ampli / ma - 1
                
                #------------------------
                
                C20_values[j + i*n_snr][ex] = abs((signal**2).mean())
                
                #------------------------
                
                M20 = (signal**2).mean()
                
                M40 = (signal**4).mean()
                
                M60 = (signal**6).mean()
                
                C60_value = M60 - 15 * M20 * M40 + 3 * M20**3
                
                C60_values[j + i*n_snr][ex] = abs(C60_value)**(2/6)
                
                #-----------------------
                
                num = ((ampli - ma)**3).mean()
                
                den = ((ampli - ma)**2).mean()
                den = den**(3/2)
                
                skewness_values[j + i*n_snr][ex] = abs(num / den)
                
                #-----------------------
                
                R = fft(signal)
                R = fftshift(R)
                
                mu = R.mean()
                
                sigma = ((R - mu) * np.conj(R - mu)).mean()
                
                num = ((R - mu)**3).mean()
                
                den = sigma**(3/2)
                
                skewness_f_values[j + i*n_snr][ex] = abs(num / den)
                
                #-------------------------
                
                a = np.sqrt(ampli / ampli.var()) - 1
                
                
                sigma_v_values[j + i*n_snr][ex] = a.std()
                
                #-------------------------
                
                frec = 1/(2 * np.pi) * 1 / (1 + (X_1 / X_0)**2)
                
                sigma_deltap_values[j + i*n_snr][ex] = frec.std()
                
                #-------------------------
                
                X0_f = savgol_filter(X_0, 7, 2)
                X1_f = savgol_filter(X_1, 7, 2)

                ampli_f = np.sqrt(np.square(X0_f) + np.square(X1_f)) 

                peaks, _ = find_peaks(ampli_f, distance = 2) 
                negPeaks,_ = find_peaks(-ampli_f, distance = 2)
    
                allPeaks = np.concatenate((peaks, negPeaks, [0]))
                allPeaks = np.sort(allPeaks)

                ampSeg = []
                tempSeg = []

                for indx in range(0, len(allPeaks) - 1):
                    ampSeg.append(abs(ampli_f[allPeaks[indx + 1]] - ampli_f[allPeaks[indx]]))
                    tempSeg.append(allPeaks[indx + 1] - allPeaks[indx])
                    
                ampSeg = np.array(ampSeg)
     
                mean_amp_values[j + i*n_snr][ex] = ampSeg.mean()
        
                tempSeg = np.array(tempSeg)
     
                mean_temp_values[j + i*n_snr][ex] = tempSeg.mean()
        
                #--------------------------
            
                ampSeg = ampSeg - ampSeg.mean()
     
                std_amp_values[j + i*n_snr][ex] = sum(abs(ampSeg)) / len(ampSeg)
        
                
    return  C20_values, skewness_f_values, sigma_v_values, \
    std_amp_values, mean_amp_values, skewness_values, \
    sigma_deltap_values, C60_values, mean_temp_values

**generate_data_array** generates the input data array of the neural network.
<br><br>
PARAMETERS:
    <br>- **X**: Signal examples.
    <br>- **idx**: Indexes of the signal examples to consider from all the ones contained in X.
    <br>- **X_mods**: Modulation types of the signal examples in X.
    <br>- **lbl**: Information of modulation type and SNR value of the signal examples in X.
    <br>- **usedSnr**: List of SNR values present in X.
<br><br>
RETURNS:
    <br>- One matrix that contains all the input features of the neural network.

In [None]:
def generate_data_array(X, idx, mods, X_mods, lbl, usedSnr):
 

    C20_values, skewness_f_values, sigma_v_values, \
    std_amp_values, mean_amp_values, skewness_values, \
    sigma_deltap_values, C60_values, mean_temp_values = calcFeatures (X, idx, mods, X_mods, lbl, usedSnr)  
    
    X_all = np.hstack(( \
                        C20_values[0].reshape(-1, 1), \
                        C60_values[0].reshape(-1, 1), \
                        skewness_values[0].reshape(-1, 1), \
                        skewness_f_values[0].reshape(-1, 1), \
                        sigma_v_values[0].reshape(-1, 1), \
                        sigma_deltap_values[0].reshape(-1, 1), \
                        std_amp_values[0].reshape(-1, 1), \
                        mean_amp_values[0].reshape(-1, 1), \
                        mean_temp_values[0].reshape(-1, 1) \
                     ))


    for p in range(1, len(usedSnr) * len(mods)):
        X_data = np.hstack((\
                        C20_values[p].reshape(-1, 1), \
                        C60_values[p].reshape(-1, 1), \
                        skewness_values[p].reshape(-1, 1), \
                        skewness_f_values[p].reshape(-1, 1), \
                        sigma_v_values[p].reshape(-1, 1), \
                        sigma_deltap_values[p].reshape(-1, 1), \
                        std_amp_values[p].reshape(-1, 1), \
                        mean_amp_values[p].reshape(-1, 1), \
                        mean_temp_values[p].reshape(-1, 1) \
                           ))

        X_all = np.vstack((X_all, X_data))

    return X_all

**createY** expresses the modulation type of each signal example in one-hot encoding format and with an int value between 0 and 6. 
<br><br>
PARAMETERS:
    <br>- **idx**: Indexes of the signal examples that will be used. This parameter is just used to know the amount of signal examples of each combination of moulation type and SNR value are being used.
    <br>- **X_mods**: List of modulation types that are being used.
    <br>- **usedSnr**: List of SNR values that are being used.
<br><br>
RETURNS:
    <br>- **Y**: Modulation types expressed in one-hot encoding format.
    <br>- **Y_Classes**: Modulation types expressed with an int number.

In [None]:
def createY(idx, mods, usedSnr):
    Y_Classes = []
    num_examples = len(idx)
    num_snr = len(usedSnr)

    for mod_indx in range(0, len(mods)):
        Y_Classes = Y_Classes + [mod_indx for i in range (num_examples * num_snr)]

    def to_onehot(yy):
        yy1 = np.zeros([len(yy), max(yy) + 1])
        yy1[np.arange(len(yy)),yy] = 1
        return yy1

    Y = to_onehot(Y_Classes)
    
    return Y, Y_Classes


We define 2 general training parameters: **number of training epochs** and **batch size**. We also specify the path where to save the model weights.

In [None]:
nb_epoch = 1000     
batch_size = 50

filepath = 'revision/probando/train_'+ test + '.h5'

**generateNeuralNetwork** generates the Neural Network Model. 
<br><br>
PARAMETERS:
    <br>- **X_shape**: Number of input features.
    <br>- **Y_shape**: Number of output nodes.
    <br>- **n_neurons**: Number of neurons in the hidden layer.
    <br>- **lr**: Learning rate.
<br><br>
RETURNS:
    <br>- **model**: Proposed Neural Network Model ready to be trained.

In [None]:
def generateNeuralNetwork(X_shape, Y_shape, n_neurons, lr):
    
    model = Sequential()
    
    model.add(BatchNormalization(input_shape = (X_shape,)))
    
    model.add(Dense(n_neurons, activation = 'relu',name = "dense1"))
    
    model.add(Dense(Y_shape, activation = 'softmax', name = "dense2"))
    
    optA = Adam(learning_rate = lr)
    
    model.compile(loss = 'categorical_crossentropy', optimizer = optA, metrics = ['accuracy'])
    
    return model

The training and testing of the model is repeated **iterations** times. 
<br>
We generate a dictionary with the testing results of each iteration. This dictionary contains the accuracy and f1 scores for the signal examples of each combination of SNR value and modulation type.

In [None]:
data_dict = pd.DataFrame()

iterations = 10

for it in range(iterations): 
    dataset = np.random.choice(list(set(range(0, 1000)) - set(test_idx)), size = 200, replace = False)
    
    inicio = time.time()
    
    X_train = generate_data_array(X, dataset, mods, X_mods, lbl, usedSnr)
    Y_train, Y_trainClasses = createY(dataset, mods, usedSnr)
    
    model = generateNeuralNetwork(X_train.shape[1], Y_train.shape[1], 15, 0.005)
    
    X_tr, X_val, Y_tr, Y_val = train_test_split(X_train, Y_train, test_size = 0.2)
    
    
    history = model.fit(
                    X_tr,
                    Y_tr,
                    batch_size = batch_size,
                    epochs = nb_epoch,
                    verbose = 0,
                    validation_data = (X_val, Y_val),
                    callbacks = [
                        callbacks.ModelCheckpoint(filepath, monitor = 'val_loss', verbose = 0, save_best_only = True, mode = 'auto'),
                        callbacks.EarlyStopping(monitor = 'val_loss', patience = 25, verbose = 0, mode = 'auto')
                        ]
                    )
    
    model.load_weights(filepath)
    
    fin = time.time()
    
    print(f"Iteration {it}: {(fin - inicio) / 60} minutes")
    
    acc = np.zeros(len(range(-10, 20, 2)))
    f1 = np.zeros(len(range(-10, 20, 2)))
    
    
    for j,snr in enumerate(range(-10, 20, 2)):    
        X_same_snr = []  
        lbl_same_snr = []
        
        for mod in mods:
                X_same_snr.append(Xd[(mod, snr)])
                for i in range(Xd[(mod,snr)].shape[0]):  lbl_same_snr.append((mod,snr))


        X_same_snr = np.vstack(X_same_snr)

        X_mods_same_snr = np.array(list(map(lambda x: lbl_same_snr[x][0], range(0, len(lbl_same_snr)))))

        X_test = generate_data_array(X_same_snr, test_idx, mods, X_mods_same_snr, lbl_same_snr, [snr])

        Y_test, Y_testClasses = createY(test_idx, mods, [snr])

        predictedClasses =  np.argmax(model.predict(X_test), axis = -1)

        acc = accuracy_score(Y_testClasses, predictedClasses)

        f1 = f1_score(Y_testClasses, predictedClasses, average = None)
        
        data_dict_i = pd.DataFrame({'it' : it, 'SNR' : snr, 'mod' : mods, 'acc' : acc, 'f1' : f1})
        data_dict = pd.concat([data_dict, data_dict_i])
    
data_dict.to_csv('revision/probando/data_' + test + '.csv',index = False)

We calculate the average accuracy and f1 score.

In [None]:
p = pd.read_csv('revision/probando/data_' + test + '.csv')  

acc_mean = np.zeros((15, 1))
f1_mean = np.zeros((15, 1))

for i, snr in enumerate(range(-10, 20, 2)):
    acc_mean[i] = p.loc[p['SNR'] == snr]['acc'].mean()*100
    f1_mean[i] = p.loc[p['SNR'] == snr]['f1'].mean()

print(f"acc: {acc_mean.mean()}")
print(f"f1: {f1_mean.mean()}")

We plot the mean accuracy for each SNR value.

In [None]:
p = pd.read_csv('revision/probando/data_' + test + '.csv')  

acc_mean = np.zeros((15, 1))
acc_std = np.zeros((15, 1))

for i,snr in enumerate(range(-10, 20, 2)):
    acc_mean[i] = p.loc[p['SNR'] == snr]['acc'].mean() * 100
    acc_std[i] = p.loc[p['SNR'] == snr]['acc'].std() * 100
    
    
fig, ax = plt.subplots()
plt.plot(range(-10, 20, 2), acc_mean, 'C0-', label = 'Proposed model')

plt.grid(True)

ax.set_xticks(range(-10, 20, 5))
ax.set_yticks(range(0, 100, 10))
plt.xlabel('SNR [dB]', fontsize = 13)
plt.ylabel('Acc [%]', fontsize = 13)

plt.savefig('revision/probando/dataTest_' + test + 'result.png',dpi = 300) 
