In [1]:
from sklearn.preprocessing import LabelBinarizer as LB
from sklearn.preprocessing import normalize
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score
# --------------------
from keras.layers import Conv2D, ZeroPadding2D
from keras.layers import Reshape, Flatten, Dropout
from keras.callbacks import EarlyStopping
from keras.models import Sequential
from keras.models import load_model
from keras.layers import Dense
from keras import metrics
# --------------------
from pandas import DataFrame as df
# --------------------
import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np
# --------------------
import tarfile
import pickle
import random
import keras
import sys
import gc


In [5]:
import tensorflow as tf

# Load the model saved in .h5 format
model = tf.keras.models.load_model('ann.h5')

# Convert the Keras model to TensorFlow Lite format
converter = tf.lite.TFLiteConverter.from_keras_model(model)

# (Optional) You can optimize the model for better performance on mobile devices
converter.optimizations = [tf.lite.Optimize.DEFAULT]  # This can reduce the model size and improve performance

# Convert the model
tflite_model = converter.convert()

# Save the TensorFlow Lite model to a .tflite file
with open('model.tflite', 'wb') as f:
    f.write(tflite_model)



Saved artifact at '/tmp/tmphif5m5wn'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 2, 128), dtype=tf.float32, name='input_layer')
Output Type:
  TensorSpec(shape=(None, 10), dtype=tf.float32, name=None)
Captures:
  134351692701488: TensorSpec(shape=(), dtype=tf.resource, name=None)
  134351682966400: TensorSpec(shape=(), dtype=tf.resource, name=None)
  134351682973616: TensorSpec(shape=(), dtype=tf.resource, name=None)
  134351683142576: TensorSpec(shape=(), dtype=tf.resource, name=None)
  134351683146624: TensorSpec(shape=(), dtype=tf.resource, name=None)
  134351683147504: TensorSpec(shape=(), dtype=tf.resource, name=None)
  134351683156656: TensorSpec(shape=(), dtype=tf.resource, name=None)
  134351683157360: TensorSpec(shape=(), dtype=tf.resource, name=None)


In [6]:
import numpy as np
import tensorflow as tf

# Load the TensorFlow Lite model
interpreter = tf.lite.Interpreter(model_path='model.tflite')

# Allocate tensors for the interpreter
interpreter.allocate_tensors()

# Get input and output tensors
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

# Example input data (make sure the shape matches your model's input)
input_data = np.random.random(input_details[0]['shape']).astype(np.float32)

# Set the input tensor
interpreter.set_tensor(input_details[0]['index'], input_data)

# Run inference
interpreter.invoke()

# Get the output tensor
output_data = interpreter.get_tensor(output_details[0]['index'])

# Print the output
print(output_data)

[[0. 0. 0. 0. 0. 1. 0. 0. 0. 0.]]


In [None]:
!wget -O RML2016.10b.tar.bz2 https://f002.backblazeb2.com/file/deepsig-datasets/2016.10/RML2016.10b.tar.bz2?Authorization=3_20241206084830_dddbbd738b38ebb6b1f15094_a918d6f63b20317c8be078d142c50eef58692dc4_002_20241206094830_0027_dnld
!tar jxf RML2016.10b.tar.bz2

In [36]:
file = open("RML2016.10b.dat",'rb')
Xd = pickle.load(file, encoding = 'bytes')
snrs, mods = map(lambda j: sorted(list(set(map(lambda x: x[j], Xd.keys())))), [1,0])
X = []
lbl = []
for mod in mods:
    for snr in snrs:
        X.append(Xd[(mod,snr)])
        for i in range(Xd[(mod,snr)].shape[0]):  lbl.append((mod,snr))
X = np.vstack(X)
file.close()

## Create Features Space

In [None]:
features = {}

**Raw Time Feature**

In [None]:
features['raw']        = X[:,0], X[:,1]

**First derivative in time**

In [34]:
features['derivative'] = normalize(np.gradient(X[:,0], axis = 1)), normalize(np.gradient(X[:,1], axis = 1))

**Integral in time**

In [35]:
features['integral']   = normalize(np.cumsum(X[:,0], axis = 1)), normalize(np.cumsum(X[:,1], axis = 1))

**All Togetheer Feature Space**

In [None]:
def extract_features(*arguments):

    desired = ()
    for arg in arguments:
        desired += features[arg]

    return np.stack(desired, axis = 1)

## Train and Test Data Split

** features Choice **

In [None]:
data = extract_features('raw')
labels = np.array(lbl)

In [None]:
in_shape = data[0].shape
out_shape = tuple([1]) + in_shape

** data shuffling & splitting **

In [None]:
np.random.seed(10)

n_examples = labels.shape[0]

r = np.random.choice(range(n_examples), n_examples, replace = False)

train_examples = r[:n_examples//2]
test_examples =  r[n_examples//2:]

In [None]:
X_train = data[train_examples]
X_test = data[test_examples]

y_train = LB().fit_transform(labels[train_examples][:,0])
y_test = LB().fit_transform(labels[test_examples][:,0])

snr_train = labels[train_examples][:,1].astype(int)
snr_test = labels[test_examples][:,1].astype(int)

## Model for Fully Connected Nueral Networks

In [None]:
model = Sequential()
model.add(Dense(128, activation ='relu', input_shape = in_shape))
model.add(Dense(256, activation ='relu'))
model.add(Dense(128, activation ='relu'))
model.add(Flatten())
model.add(Dense(10, activation ='softmax'))

In [None]:
model.compile(optimizer = 'adam', loss = 'categorical_crossentropy',  metrics = ['accuracy'])
model.summary()

In [None]:
model.fit(X_train, y_train, epochs = 100, validation_split = 0.05, batch_size = 2048, callbacks=[EarlyStopping(patience = 15, restore_best_weights = True)])

In [None]:
model.save('ann.h5')

## "AbRo" Model for Convolution Connected Neural Networks

In [None]:
dr = 0.5
model = Sequential()
model.add(Reshape(out_shape, input_shape = in_shape))
model.add(ZeroPadding2D((0, 2), data_format = 'channels_first'))
model.add(Conv2D(256, (1, 3), padding = 'valid', activation = "relu", name="conv1", kernel_initializer='glorot_uniform', data_format="channels_first"))
model.add(Dropout(dr))
model.add(ZeroPadding2D((0,2), data_format = 'channels_first'))
model.add(Conv2D(80, (2, 3), activation="relu", name="conv3", padding="valid", kernel_initializer="glorot_uniform", data_format="channels_first"))
model.add(Dropout(dr))
model.add(Flatten())
model.add(Dense(256, activation="relu", name="dense1", kernel_initializer="he_normal"))
model.add(Dropout(dr))
model.add(Dense(10, name="dense3", kernel_initializer="he_normal", activation = 'softmax'))
model.add(Reshape([len(mods)]))

In [None]:
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics = ['accuracy'])
model.summary()

In [None]:
model.fit(X_train, y_train, epochs = 80, batch_size = 1024, validation_split = 0.05, callbacks=[EarlyStopping(patience = 15, restore_best_weights = True)])

In [None]:
model.save("cnn.h5")

## Evaluation

In [None]:
def plot_results(predicted_labels, true_labels, snrs):

  sorted_snrs = np.sort(np.unique(snrs))

  x_axis = []
  y_axis = []
  for snr in sorted_snrs:
    idx = np.where(snrs == snr)
    #print('snr =', snr, '-->', accuracy_score(np.argmax(true_labels[idx], axis = 1), np.argmax(predicted_labels[idx], axis = 1)))
    x_axis.append(snr)
    y_axis.append(accuracy_score(np.argmax(true_labels[idx], axis = 1), np.argmax(predicted_labels[idx], axis = 1)))

  plt.xlabel('SNR')
  plt.ylabel('Accuracy')
  plt.title('Classification Accuracy over different SNRs')
  plt.plot(x_axis, np.array(y_axis) * 100, 'ro--')
  plt.grid(True)

In [None]:
def print_results(predicted_labels, true_labels, snrs):

  sorted_snrs = np.sort(np.unique(snrs))

  x_axis = []
  y_axis = []
  for snr in sorted_snrs:
    idx = np.where(snrs == snr)
    #print('snr =', snr, '-->', accuracy_score(np.argmax(true_labels[idx], axis = 1), np.argmax(predicted_labels[idx], axis = 1)))
    x_axis.append(snr)
    y_axis.append(accuracy_score(np.argmax(true_labels[idx], axis = 1), np.argmax(predicted_labels[idx], axis = 1)))

  return df(data = np.array(y_axis).reshape(1, -1) * 100,  columns = sorted_snrs, index = ['accuracy']).round(2)

In [None]:
def plot_confusion_matrix(y_true, y_pred, classes, cmap=plt.cm.Blues):
  y_true, y_pred = np.argmax(y_true, axis = 1), np.argmax(y_pred, axis = 1)
  cm = confusion_matrix(y_true, y_pred)
  cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
  fig, ax = plt.subplots()
  im = ax.imshow(cm, interpolation='nearest', cmap=cmap)
  ax.figure.colorbar(im, ax=ax)
  # We want to show all ticks...
  ax.set(xticks=np.arange(cm.shape[1]),
         yticks=np.arange(cm.shape[0]),
         # ... and label them with the respective list entries
         xticklabels=classes, yticklabels=classes,
         title = 'Confusion Matrix',
         ylabel = 'True label',
         xlabel = 'Predicted label')

  # Rotate the tick labels and set their alignment.
  plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
           rotation_mode="anchor")

** Fully Connected Neural Network Evaluation **

In [None]:
ann_model = keras.models.load_model('ann.h5')

In [None]:
y_pred_ann = ann_model.predict(X_test)

In [None]:
plot_results(y_pred_ann, y_test, snr_test)

In [None]:
print_results(y_pred_ann, y_test, snr_test)

In [None]:
plot_confusion_matrix(y_test, y_pred_ann, mods)

** Convolutional Neural Network Evaluation **

In [None]:
cnn_model = keras.models.load_model('cnn.h5')

In [None]:
y_pred_cnn = model.predict(X_test)

In [None]:
plot_results(y_pred_cnn, y_test, snr_test)

In [None]:
print_results(y_pred_cnn, y_test, snr_test)

In [None]:
plot_confusion_matrix(y_test, y_pred_cnn, mods)

In [None]:
def train_once(size = 5000):
  dr = 0.5
  model = Sequential()
  model.add(Reshape(out_shape, input_shape = in_shape))
  model.add(ZeroPadding2D((0, 2), data_format = 'channels_first'))
  model.add(Conv2D(256, (1, 3), padding = 'valid', activation = "relu", name="conv1", kernel_initializer='glorot_uniform', data_format="channels_first"))
  model.add(Dropout(dr))
  model.add(ZeroPadding2D((0,2), data_format = 'channels_first'))
  model.add(Conv2D(80, (2, 3), activation="relu", name="conv3", padding="valid", kernel_initializer="glorot_uniform", data_format="channels_first"))
  model.add(Dropout(dr))
  model.add(Flatten())
  model.add(Dense(256, activation="relu", name="dense1", kernel_initializer="he_normal"))
  model.add(Dropout(dr))
  model.add(Dense(10, name="dense3", kernel_initializer="he_normal", activation = 'softmax'))
  model.add(Reshape([len(mods)]))
  model.compile(loss='categorical_crossentropy', optimizer='adam', metrics = ['accuracy'])
  #model.summary()
  model.fit(X_train[:size], y_train[:size], epochs = 100, batch_size = 1024, callbacks=[EarlyStopping(patience = 15, restore_best_weights = True)])
  train_pred = model.evaluate(X_train[:size], y_train[:size])
  test_pred = model.evaluate(X_test, y_test)
  return train_pred, test_pred


In [None]:
list(range(5000, 100000, 5000))

In [None]:
train_scores, test_scores = [], []
rang = range(5000, 100000, 5000)
for i in rang:
  trn, tst = train_once(i)
  train_scores.append(trn)
  test_scores.append(tst)

plt.imshow(rang, train_scores, 'ro--', rang, test_scores, 'bo--')