In [7]:
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
import pickle as pkl
from sklearn.preprocessing import StandardScaler

# load data
data = pd.read_pickle("RML2016.10a_dict.pkl", compression='infer')
qpsk_2_data_all = data[('QPSK', 2)]
bpsk_2_data_all = data[('BPSK', 2)]

# labels
qpsk_labels = [0] * 1000  # QPSK = 0
bpsk_labels = [1] * 1000  # BPSK = 1

# combine the data lables
data_combined = np.concatenate((qpsk_2_data_all, bpsk_2_data_all), axis=0)

print(data_combined.shape)

# combine the data lables
data_combined = np.concatenate((qpsk_2_data_all, bpsk_2_data_all), axis=0)

labels_combined = qpsk_labels + bpsk_labels

# convert labels to NumPy array and then to PyTorch tensor with Long data type
labels_combined = np.array(labels_combined, dtype=np.int64)

# Scale data
data = np.array(data_combined, dtype=np.float32)
labels = np.array(labels_combined, dtype=np.float32)

# calculate min and max values
min_vals = np.min(data, axis=(1,2), keepdims=True)
max_vals = np.max(data, axis=(1,2), keepdims=True)

# normalize data
epsilon = 1e-10
normalized_data = 2 * (data - min_vals) / (max_vals - min_vals + epsilon) - 1

# Create training data
X_train, X_test, y_train, y_test = train_test_split(
    normalized_data, labels, test_size=.2, random_state=42)

# create the model
model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(2, 128)),  # Transpose the shape to (None, 128, 2)
    tf.keras.layers.Permute((2, 1)),  # Transpose the input data
    tf.keras.layers.Conv1D(filters=64, kernel_size=3, strides=1, padding='same', activation='relu'),
    tf.keras.layers.MaxPool1D(pool_size=2, strides=2, padding='same'),
    tf.keras.layers.Conv1D(filters=64, kernel_size=3, strides=1, padding='same', activation='relu'),
    tf.keras.layers.MaxPool1D(pool_size=2, strides=2, padding='same'),
    tf.keras.layers.Conv1D(filters=64, kernel_size=3, strides=1, padding='same', activation='relu'),
    tf.keras.layers.MaxPool1D(pool_size=2, strides=2, padding='same'),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(2, activation='softmax')
])

# compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# train the model
epochs = 10
model.fit(X_train, to_categorical(y_train), epochs=epochs)

# evaluate the model
results = model.evaluate(X_test, to_categorical(y_test))

# print results
print(f'Test Loss: {results[0]} \nTest Accuracy: {results[1]}')


(2000, 2, 128)
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Test Loss: 0.06348280608654022 
Test Accuracy: 0.9750000238418579
