# **Loading Libraries**

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten, Conv1D, MaxPool1D, BatchNormalization
from keras.optimizers import Adam,RMSprop
from keras.callbacks import ModelCheckpoint, History
from keras.utils.np_utils import to_categorical
from sklearn.metrics import confusion_matrix
from keras import regularizers

from scipy import signal
from scipy.signal import chirp, spectrogram
import math
import random as rd
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()

# **Dataset Generation** 

Signal                      | Label         
:---------------------------|:-------------:
Continuous Wave             | 0             
Bi-phase Shift Keying       | 1             
Linear Frequency Modulation | 2             



In [None]:
def LFM(t,f0,t1,f1,SNRdb):
  noise = np.random.uniform(low=-1, high=1, size=(len(t),))
  SNR = 10**(SNRdb/10);
  A = math.sqrt(2*SNR);
  signal = A*chirp(t, f0, t1, f1, method='linear', phi = rd.random()*math.pi) + noise
  return signal

def BPSK(t,position,freq,SNRdb):
  phase = rd.random()*math.pi
  phi = math.pi + phase
  w = 2*math.pi*freq
  SNR = 10**(SNRdb/10)
  A = math.sqrt(2*SNR)
  noise = np.random.uniform(low=-1, high=1, size=(len(t),))
  signal = A*np.cos(w*t + phase) + noise
  isegment= 256;    
  jstart=1+isegment*3 + position;
  jend=isegment*4 + position
  shifted = A*np.cos(w*t+phi)
  signal[jstart:jend]= shifted[jstart:jend] + noise[jstart:jend]
  return signal

def CW(t,freq,SNRdb):
  phase = rd.random()*math.pi
  w = 2*math.pi*freq
  SNR = 10**(SNRdb/10)
  A = math.sqrt(2*SNR)
  noise = np.random.uniform(low=-1, high=1, size=(len(t),))
  signal = A*np.cos(w*t+phase) + noise
  return signal

Ysig = []
fs = 1280*1e6
t = np.linspace(0, 1*1e-6, 1280)
NFFT = 256
start = 160*1e6
finish = 185*1e6
freq = 160*1e6
freq_bin = np.arange(0, 1280, 5)



# **Signal Plots**

In [None]:
plt.subplot(2,1,1)
x = LFM(t,10e6,1e-6,100e6,100)/100000
plt.plot(t*1e6,x)
plt.title("Linear Frequency Modulation")
plt.ylabel("Amplitude")
plt.xlabel("Time (μs)")
plt.tight_layout()

plt.subplot(2,1,2)
y = BPSK(t,128,50e6,100)/100000
plt.plot(t*1e6,y)
plt.title("Bi-phase Shift Keying")
plt.ylabel("Amplitude")
plt.xlabel("Time (μs)");
plt.tight_layout()

In [None]:
#train set generation
Ysig = []
window = signal.blackman(1280)
for run in range(30000):
  SNR = rd.randint(-10,10)
  freq = rd.randint(100,600)*1e6
  start = 160*1e6
  finish = rd.randint(161,560)*1e6

  signal1 = np.append(CW(t,freq,SNR),0)
  Ysig.append(signal1)

  signal1 = np.append(BPSK(t,0,freq,SNR),1)
  Ysig.append(signal1)

  signal1 = np.append(LFM(t,start,1e-6,finish,SNR),2)
  Ysig.append(signal1)
    
    

In [None]:
#test set generation
test_set = []
test_labels = []

for SNR in range(-10,11):
  for run in range (100):
    freq = rd.randint(100,600)*1e6
    start = 160*1e6
    finish = rd.randint(161,560)*1e6

    signal1 = CW(t,freq,SNR)
    test_set.append(signal1)
    test_labels.append(0)

    signal1 = BPSK(t,0,freq,SNR)
    test_set.append(signal1)
    test_labels.append(1)

    signal1 = LFM(t,start,1e-6,finish,SNR)
    test_set.append(signal1)
    test_labels.append(2)


In [None]:
#Converted to excel file
signals = pd.DataFrame(Ysig)
signals.to_csv("train.csv",index = False)

signals = pd.DataFrame(test_set)
signals.to_csv("test.csv",index = False)

signals = pd.DataFrame(test_labels)
signals.to_csv("test_labels.csv",index = False)

# Training

In [None]:
train = pd.read_csv("train.csv")
X_test = pd.read_csv("test.csv")
Y_test = pd.read_csv("test_labels.csv")

X_test = X_test.values.reshape((len(X_test),1280,1))
Y_test = Y_test.values.reshape((len(Y_test),1))

In [None]:
Y_train = train["1280"].astype(int)
Y_train = to_categorical(Y_train, num_classes = 3)

X_train = train.drop(labels = ["1280"],axis = 1)

In [None]:
random_seed = rd.randint(1,10)
X_train, X_val, Y_train, Y_val = train_test_split(X_train, Y_train, test_size = 0.2, random_state=random_seed)
X_train = X_train.values.reshape((len(X_train),1280,1))
X_val = X_val.values.reshape((len(X_val),1280,1))

In [None]:
model = Sequential()

model.add(Conv1D(filters = 32, kernel_size = 3,activation ='relu', input_shape = (1280,1)))
model.add(Conv1D(filters = 32, kernel_size = 3, activation ='relu'))
model.add(Dropout(0.4))

model.add(Conv1D(filters = 32, kernel_size = 3,activation ='relu',kernel_regularizer=regularizers.l1(0.001)))
model.add(Conv1D(filters = 32, kernel_size = 3, activation ='relu'))
model.add(Dropout(0.4))

model.add(Conv1D(filters = 64, kernel_size = 3, activation ='relu',kernel_regularizer=regularizers.l1(0.001)))
model.add(Conv1D(filters = 64, kernel_size = 3, activation ='relu'))
model.add(Dropout(0.4))


model.add(Flatten())
model.add(Dense(256, activation = "relu"))
model.add(Dropout(0.5))
model.add(Dense(3, activation = "softmax"))

In [None]:
# Define the optimizer
optimizer = RMSprop(lr=0.001, rho=0.9, epsilon=1e-08, decay=0.0)
# Compile the model
model.compile(optimizer = optimizer , loss = "categorical_crossentropy", metrics=["accuracy"])


In [None]:
epoch = 200
# Save model with best Val
filepath="Best Weights.hdf5"
checkpoint = ModelCheckpoint(filepath, monitor='val_acc', verbose=1, save_best_only=True, mode='max')
callbacks_list = [checkpoint, History()]
# Fit the model
history = model.fit(X_train,Y_train, batch_size= 1024,
                    epochs = epoch, validation_data = (X_val,Y_val),
                    callbacks=callbacks_list, verbose = 1)

In [None]:
# Accuracy and Loss Plots
plt.plot(history.history["acc"])
plt.plot(history.history["val_acc"])
plt.legend(["train_acc","val_acc"])
plt.title("Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.savefig("Accuracy")
plt.clf()

plt.plot(history.history["loss"])
plt.plot(history.history["val_loss"])
plt.legend(["train_loss","val_loss"])
plt.title("Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.savefig("Loss")


In [None]:
# Evaluate against a test set
signal = []
y_pred = []
y_actual = []
correct = 0

for index in range(0,len(X_test)):
  signal =  np.expand_dims(X_test[index], axis = 0)
  prediction = model.predict(signal)
  y_pred.append(int(np.argmax(prediction)))
  if np.argmax(prediction) == Y_test[index][0]:
    correct += 1
  

accuracy = correct/len(X_test)

print("accuracy:", accuracy)

In [None]:
#Confusion matrix
confusion_matrix = [[0,0,0],[0,0,0],[0,0,0]]
y_pred = []
y_actual = []
signals = []

for index in range(0,len(X_train)):
  signals =  np.expand_dims(X_train[index], axis = 0)
  prediction = model.predict(signals)
  y_pred.append(int(np.argmax(prediction)))
  y_actual.append(np.argmax(Y_train[index]))



In [None]:
y_pred = np.asarray(y_pred)
y_actual = np.asarray(y_actual)
matrix = confusion_matrix(y_actual, y_pred, labels=[0,1,2])

#ax.set_xticklabels(["CW","BPSK","LFM"])
fig, ax = plt.subplots(figsize=(6,6))
for x in range(3):
  for y in range(3):
    ax.text(x,y,matrix[y][x], horizontalalignment='center')

ax.set_xticklabels(["","CW","","BPSK","","LFM"])
ax.set_yticklabels(["","CW","","BPSK","","LFM"])
plt.title("Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("True")
ax.imshow(matrix, cmap = plt.cm.Blues)
plt.savefig("Confusion Matrix")