<a href="https://colab.research.google.com/github/MohsenJadidi/Automatic-Modulation-Classification-AMC/blob/master/CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
from google.colab import drive
drive.mount('/content/drive')

# Importing the dataset

In [0]:
import pickle
fileName = 'RML2016.10a_dict.pkl'
with open("/content/drive/My Drive/Colab Notebooks/"+fileName,'rb') as f:
  data = pickle.load(f,encoding='bytes')
 

In [3]:
import numpy as np
X = []
labels = [] # label each example by a pair (modulation type, snr)
total_examples = 0
analog = [b'AM-DSB', b'AM-SSB', b'WBFM']

for mod_type, snr in data.keys():
    if (mod_type not in analog):      
        current_matrix = data[(mod_type, snr)]        
        total_examples += current_matrix.shape[0]
        for i in range(current_matrix.shape[0]):
            X.append(current_matrix[i])
            labels.append((str(mod_type, 'ascii'), snr)) # mod_type is of type bytes
    
X = np.array(X)         # First row is QPSK snr=2, seconde is PAM4 snr=8 , ...
labels = np.array(labels)

y = labels[:,0]

print(f'loaded {total_examples} signal vectors into X{X.shape} and their corresponding'
      f' labels into labels{labels.shape}')  
# print(np.unique(labels[:,0]))


loaded 160000 signal vectors into X(160000, 2, 128) and their corresponding labels into labels(160000, 2)


In [0]:
# Encoding categorical data
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
labelencoder_y = LabelEncoder()
y = labelencoder_y.fit_transform(y)
onehotencoder = OneHotEncoder()
y = onehotencoder.fit_transform(y.reshape(-1,1)).toarray()

In [5]:
snrList = [str(2*i-20) for i in range(20)]  # snrList = -20, -18, -16 , ... ,0, ... ,18
snr = snrList[19]
numberOfEachExamples = 1000
print("SNR :", snr)

SNR : 18


In [0]:
output = [[labels[i*numberOfEachExamples, 0],y[i*numberOfEachExamples]] for i in range(int(X.shape[0]/numberOfEachExamples))]
output = dict(output)

In [7]:
Xsnr = np.zeros(shape=(X.shape[0],X.shape[1],X.shape[2]+1))
for i in range(X.shape[0]):
    snr = int(labels[i,1])
    Xsnr[i,0,:] = np.insert(X[i,0,:],0,snr)
    Xsnr[i,1,:] = np.insert(X[i,1,:],0,snr)
    
print(Xsnr.shape)

(160000, 2, 129)


In [8]:
###### Splitting the dataset into the Training set and Test set ######
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(Xsnr, y, test_size = 0.2, random_state = 0)
# The below line better for Cross_val part
#X_train, X_test, y_train, y_test = train_test_split(Xsnr, y, test_size = 1, random_state = 0)

print(X_train.shape)
print(X_test.shape)

(128000, 2, 129)
(32000, 2, 129)


In [9]:
X_train = X_train[:,:,1:] # snr important for train

y_test18 = []
X_test18 = []

for i in range(X_test.shape[0]):
    if X_test[i,0,0] == 18:
        X_test18.append(X_test[i])
        y_test18.append(y_test[i])
        
X_test18 = np.array(X_test18)
y_test18 = np.array(y_test18)        
X_test18 = X_test18[:,:,1:]


print(X_train.shape)
print(X_test18.shape)

(128000, 2, 128)
(1653, 2, 128)


In [10]:
# Change IQ to amplitude and phase
X_cmplx = X_train[:,0,:] + 1j* X_train[:,1,:]    
X_amp = np.abs(X_cmplx)
X_ang = np.arctan2(X_train[:,1,:],X_train[:,0,:]) / np.pi
    
X_amp = np.reshape(X_amp,(-1,1,128))
X_ang = np.reshape(X_ang,(-1,1,128))
    
X_train_AmpPhs = np.concatenate((X_amp,X_ang), axis=1) 
##
X_cmplx = X_test18[:,0,:] + 1j* X_test18[:,1,:]    
X_amp = np.abs(X_cmplx)
X_ang = np.arctan2(X_test18[:,1,:],X_test18[:,0,:]) / np.pi
    
X_amp = np.reshape(X_amp,(-1,1,128))
X_ang = np.reshape(X_ang,(-1,1,128))
    
X_test18_AmpPhs = np.concatenate((X_amp,X_ang), axis=1) 
##

print(X_train_AmpPhs.shape)
print(X_test18_AmpPhs.shape)

(128000, 2, 128)
(1653, 2, 128)


In [17]:
from sklearn.preprocessing import StandardScaler
sc = StandardScaler()
X_train = X_train.reshape([X_train.shape[0],256])
#X_train = X_train_AmpPhs.reshape([X_train.shape[0],256])
X_train = sc.fit_transform(X_train)
X_train = X_train.reshape([X_train.shape[0],2,128])
#X_test = X_test.reshape([1600,256])
X_test18 = X_test18.reshape([X_test18.shape[0],256])
#X_test18 = X_test18_AmpPhs.reshape([X_test18.shape[0],256])
X_test18 = sc.transform(X_test18)
X_test18 = X_test18.reshape([X_test18.shape[0],2,128])

# Reshape
X_train = X_train.reshape(-1,2, 128, 1)   #Reshape for CNN -  (6400,2,128)->(6400,2,128,1)!!
X_test = X_test.reshape(-1,2, 128, 1)

print(X_train.shape)
print(X_test18.shape)

(128000, 2, 128, 1)
(1653, 2, 128)


# Making CNN0 model 
(Article : Convolutional Radio Modulation Recognition Networks - Timothy J. O’Shea)


In [18]:
import keras
from keras.models import Sequential
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.layers import Flatten
from keras.layers import Dense
from keras.layers import Dropout

classifier = Sequential()

dout = 0.4
classifier.add(Conv2D(filters=64, kernel_size=(1,3), input_shape =  (2,128,1), padding='same', activation = 'relu')) # 256=number of filter
classifier.add(Dropout(rate = dout))
classifier.add(Conv2D(filters=16,kernel_size=(2,3),  padding='same', activation = 'relu'))
classifier.add(Dropout(rate = dout))

classifier.add(Flatten())

classifier.add(Dense(output_dim = 128 , activation = 'relu')) # hidden layer
classifier.add(Dropout(rate = dout))
classifier.add(Dense(output_dim = 8 , activation = 'softmax'))

#from keras import optimizers
#adamOpt = optimizers.adam(lr = 0.01)

classifier.compile(optimizer = 'adam', loss = 'categorical_crossentropy', metrics = ['accuracy'])

#logger = keras.callbacks.TensorBoard(log_dir="./logs", write_graph=True, histogram_freq=0)

classifier.summary()
print("Model Created!")

Using TensorFlow backend.
W0827 14:16:12.312204 140658646271872 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:74: The name tf.get_default_graph is deprecated. Please use tf.compat.v1.get_default_graph instead.

W0827 14:16:12.348738 140658646271872 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:517: The name tf.placeholder is deprecated. Please use tf.compat.v1.placeholder instead.

W0827 14:16:12.357233 140658646271872 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:4138: The name tf.random_uniform is deprecated. Please use tf.random.uniform instead.

W0827 14:16:12.390862 140658646271872 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:133: The name tf.placeholder_with_default is deprecated. Please use tf.compat.v1.placeholder_with_default instea

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_1 (Conv2D)            (None, 2, 128, 64)        256       
_________________________________________________________________
dropout_1 (Dropout)          (None, 2, 128, 64)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 2, 128, 16)        6160      
_________________________________________________________________
dropout_2 (Dropout)          (None, 2, 128, 16)        0         
_________________________________________________________________
flatten_1 (Flatten)          (None, 4096)              0         
_________________________________________________________________
dense_1 (Dense)              (None, 128)               524416    
_________________________________________________________________
dropout_3 (Dropout)          (None, 128)               0         
__________

#Making CNN1 model (one-convolutional-layer)
(Thesis :DEEP NEURAL NETWORK ARCHITECTURES FOR MODULATION CLASSIFICATION)

In [0]:
import keras
from keras.models import Sequential
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.layers import Flatten
from keras.layers import Dense
from keras.layers import Dropout

classifier = Sequential()

dout = 0.6

classifier.add(Conv2D(256,1,3, input_shape =  (2,128,1), activation = 'relu')) # 256=number of filter
classifier.add(Dropout(rate = dout))

classifier.add(Flatten())

classifier.add(Dense(output_dim = 128 , activation = 'relu')) # hidden layer
classifier.add(Dropout(rate = dout))
classifier.add(Dense(output_dim = 8 , activation = 'softmax'))

#from keras import optimizers
#adamOpt = optimizers.adam(lr = 0.01)

classifier.compile(optimizer = 'adam', loss = 'categorical_crossentropy', metrics = ['accuracy'])

#logger = keras.callbacks.TensorBoard(log_dir="./logs", write_graph=True, histogram_freq=0)

classifier.summary()
print("Model Created!")

# Making CNN2 model  (two-convolutional-layer)

In [0]:
import keras
from keras.models import Sequential
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.layers import Flatten
from keras.layers import Dense
from keras.layers import Dropout

classifier = Sequential()

dout = 0.4
classifier.add(Conv2D(256,(1,3), input_shape =  (2,128,1), activation = 'relu', padding='same')) # 256=number of filter
classifier.add(Dropout(rate = dout))
classifier.add(Conv2D(80,(2,3), activation = 'relu'))
classifier.add(Dropout(rate = dout))

classifier.add(Flatten())

classifier.add(Dense(output_dim = 128 , activation = 'relu')) # hidden layer
classifier.add(Dropout(rate = dout))
classifier.add(Dense(output_dim = 8 , activation = 'softmax'))

#from keras import optimizers
#adamOpt = optimizers.adam(lr = 0.01)

classifier.compile(optimizer = 'adam', loss = 'categorical_crossentropy', metrics = ['accuracy'])

#logger = keras.callbacks.TensorBoard(log_dir="./logs", write_graph=True, histogram_freq=0)

classifier.summary()
print("Model Created!")


#Making CNN3 model (two-convolutional-layer)

In [0]:
import keras
from keras.models import Sequential
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.layers import Flatten
from keras.layers import Dense
from keras.layers import Dropout

classifier = Sequential()

dout = 0.4

classifier.add(Conv2D(80,2,3, activation = 'relu'))
classifier.add(Dropout(rate = dout))

classifier.add(Conv2D(256,1,3, input_shape =  (2,128,1), activation = 'relu')) # 256=number of filter
classifier.add(Dropout(rate = dout))

classifier.add(Flatten())

classifier.add(Dense(output_dim = 128 , activation = 'relu')) # hidden layer
classifier.add(Dropout(rate = dout))
classifier.add(Dense(output_dim = 8 , activation = 'softmax'))

#from keras import optimizers
#adamOpt = optimizers.adam(lr = 0.01)

classifier.compile(optimizer = 'adam', loss = 'categorical_crossentropy', metrics = ['accuracy'])

#logger = keras.callbacks.TensorBoard(log_dir="./logs", write_graph=True, histogram_freq=0)

classifier.summary()
print("Model Created!")

#Making CNN4 model (four-convolutional-layer)

In [0]:
import keras
from keras.models import Sequential
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.layers import Flatten
from keras.layers import Dense
from keras.layers import Dropout

classifier = Sequential()

dout = 0.4
classifier.add(Conv2D(256,1,3, input_shape =  (2,128,1), activation = 'relu')) # 256=number of filter
classifier.add(Dropout(rate = dout))
classifier.add(Conv2D(256,2,3, activation = 'relu')) # 256=number of filter
classifier.add(Dropout(rate = dout))

classifier.add(Conv2D(80,1,3, activation = 'relu'))
classifier.add(Dropout(rate = dout))
classifier.add(Conv2D(80,1,3, activation = 'relu'))
classifier.add(Dropout(rate = dout))

classifier.add(Flatten())

classifier.add(Dense(output_dim = 128 , activation = 'relu')) # hidden layer
classifier.add(Dropout(rate = dout))
classifier.add(Dense(output_dim = 8 , activation = 'softmax'))

#from keras import optimizers
#adamOpt = optimizers.adam(lr = 0.01)

classifier.compile(optimizer = 'adam', loss = 'categorical_crossentropy', metrics = ['accuracy'])

#logger = keras.callbacks.TensorBoard(log_dir="./logs", write_graph=True, histogram_freq=0)

classifier.summary()
print("Model Created!")


#Making CNN5 model (five-convolutional-layer)

In [0]:
import keras
from keras.models import Sequential
from keras.layers import Conv2D
from keras.layers import MaxPooling2D
from keras.layers import Flatten
from keras.layers import Dense
from keras.layers import Dropout

classifier = Sequential()

dout = 0.4
classifier.add(Conv2D(384,1,3, input_shape =  (2,128,1), activation = 'relu')) 
classifier.add(Dropout(rate = dout))
classifier.add(Conv2D(256,2,3, activation = 'relu'))
classifier.add(Dropout(rate = dout))
classifier.add(Conv2D(80,1,3, activation = 'relu'))
classifier.add(Dropout(rate = dout))
classifier.add(Conv2D(256,1,3, activation = 'relu'))
classifier.add(Dropout(rate = dout))
classifier.add(Conv2D(80,1,3, activation = 'relu'))
classifier.add(Dropout(rate = dout))

classifier.add(Flatten())

classifier.add(Dense(output_dim = 128 , activation = 'relu')) # hidden layer
classifier.add(Dropout(rate = dout))
classifier.add(Dense(output_dim = 8 , activation = 'softmax'))

#from keras import optimizers
#adamOpt = optimizers.adam(lr = 0.01)

classifier.compile(optimizer = 'adam', loss = 'categorical_crossentropy', metrics = ['accuracy'])

#logger = keras.callbacks.TensorBoard(log_dir="./logs", write_graph=True, histogram_freq=0)

classifier.summary()
print("Model Created!")

# Fitting model

In [0]:
batch = 1024
epoch = 100

history = classifier.fit(X_train, y_train, batch_size = batch, epochs = epoch, validation_data=(X_test18, y_test18))
#history = classifier.fit(X_train, y_train, batch_size = batch, epochs = epoch)

# Prediction 

In [0]:
y_pred = classifier.predict(X_test)
y_pred = np.argmax(y_pred, axis=1)

y_real = np.argmax(y_test, axis=1)
# Making the Confusion Matrix
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(y_real, y_pred)
cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

acc_test = classifier.evaluate(X_test, y_test)[1]
acc_train = classifier.evaluate(X_train, y_train)[1]

print("Acc Test : ", acc_test)
print("Acc Train : ", acc_train)


Acc Test :  0.8125
Acc Train :  0.98953125


# Plot Confusion Matrix

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

modulation_order = []
modulation_order_dict = dict()

for key,value in output.items():
    modulation_order_dict[np.argmax(value)] = str(key)
    
for i in range(8):
    modulation_order.append(modulation_order_dict[i])
    
    
    
cmDataFrame = pd.DataFrame(cm_norm, index=modulation_order, columns = modulation_order)
plt.figure(figsize=(6, 5))
ax = sns.heatmap(cmDataFrame, annot=True, annot_kws={"size": 8}, fmt='.2f', linewidths=.5, cmap="Greens")

plt.title(f"CNN Confusion Matrix (SNR={snr})")
plt.xlabel("Predicted label  \n\n TrainAcc={:.2}, TestAcc={:.2}".format(acc_train,acc_test), fontsize=8)
plt.ylabel("True lable", fontsize=8)
plt.setp(ax.get_xticklabels(), rotation=45, ha="right",rotation_mode="anchor", fontsize=8)
plt.setp(ax.get_yticklabels(), fontsize=8)
fig = ax.get_figure()

In [0]:
dic = "/content/drive/My Drive/Colab Notebooks/"
fig.savefig(dic+f"CNN2-CrossVal-132-132-ba{batch}-ep{epoch}(SNR={snr}).png", dpi=175, bbox_inches='tight')
print("Plot Saved!")

Plot Saved!
