In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras import models
from tensorflow.keras.layers import Reshape, Dense, Dropout, Activation, Flatten, ZeroPadding1D
from tensorflow.keras.layers import Conv1D
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from scipy.signal import welch


# Load the dataset (I cleaned the data and saved in modulation_data.pkl)
modulation_data = pd.read_pickle('modulation_data.pkl')
modulation_data = modulation_data[modulation_data['SNR'] >= -18]
#Features and Labels
X = modulation_data.iloc[:, 2:].values  # All IQ features
labels = modulation_data['Modulation'].values  # Modulation types
snr_values_full = modulation_data['SNR'].values  # SNR values

# One-Hot Encoding
unique_mods = np.unique(labels)
label_indices = np.array([np.where(unique_mods == label)[0][0] for label in labels])
Y = to_categorical(label_indices, num_classes=len(unique_mods))

# Train_test
X_train, X_test, Y_train, Y_test, snr_train, snr_test = train_test_split(X, Y, snr_values_full, test_size=0.3, random_state=2016)


In [None]:

# Function for FFT Features (Spectral Power Density)
def compute_fft_features(iq_samples):
    fft_vals = np.fft.fft(iq_samples, axis=1)  # Performing FFT along the time axis for each sample
    fft_magnitude = np.abs(fft_vals)  # magnitude of FFT
    return fft_magnitude

# Computing FFT features for both training and test sets
fft_features_train = compute_fft_features(X_train)
fft_features_test = compute_fft_features(X_test)

# Derivative Features (First Order Only)
def compute_derivative_features(iq_samples):
    first_order_derivative_0 = np.gradient(iq_samples[:, ::2], axis=1)
    first_order_derivative_1 = np.gradient(iq_samples[:, 1::2], axis=1)
    
    # Normalize
    first_order_derivative_0 = (first_order_derivative_0 - np.mean(first_order_derivative_0, axis=1, keepdims=True)) / np.std(first_order_derivative_0, axis=1, keepdims=True)
    first_order_derivative_1 = (first_order_derivative_1 - np.mean(first_order_derivative_1, axis=1, keepdims=True)) / np.std(first_order_derivative_1, axis=1, keepdims=True)
    
    return np.concatenate((first_order_derivative_0, first_order_derivative_1), axis=1)
derivative_features_train = compute_derivative_features(X_train)
derivative_features_test = compute_derivative_features(X_test)

# Integral Features
def compute_integral_features(iq_samples):
    integral_0 = np.cumsum(iq_samples[:, ::2], axis=1)
    integral_1 = np.cumsum(iq_samples[:, 1::2], axis=1)
    
    # Normalize
    integral_0 = (integral_0 - np.mean(integral_0, axis=1, keepdims=True)) / np.std(integral_0, axis=1, keepdims=True)
    integral_1 = (integral_1 - np.mean(integral_1, axis=1, keepdims=True)) / np.std(integral_1, axis=1, keepdims=True)
    
    return np.concatenate((integral_0, integral_1), axis=1)
integral_features_train = compute_integral_features(X_train)
integral_features_test = compute_integral_features(X_test)


# Adding Instantaneous Frequency Deviation for WBFM
def compute_instantaneous_frequency(iq_samples):
    phase = np.unwrap(np.angle(iq_samples[:, ::2] + 1j * iq_samples[:, 1::2]))  # Calculate phase from I and Q
    instantaneous_frequency = np.gradient(phase, axis=1)  # Take the derivative to get frequency deviation
    return instantaneous_frequency

# Compute instantaneous frequency features
instantaneous_frequency_train = compute_instantaneous_frequency(X_train)
instantaneous_frequency_test = compute_instantaneous_frequency(X_test)

# Constellation Density for QAM-64
def compute_constellation_density(iq_samples):
    magnitude = np.sqrt(iq_samples[:, ::2]**2 + iq_samples[:, 1::2]**2)  # Compute magnitude of constellation points
    stddev_magnitude = np.std(magnitude, axis=1)  # Calculate standard deviation to measure density
    return stddev_magnitude.reshape(-1, 1)  # Reshape to be consistent with other features

# Compute constellation density features for both training and test sets
constellation_density_train = compute_constellation_density(X_train)
constellation_density_test = compute_constellation_density(X_test)

# Combining Features
X_train_combined = np.concatenate((X_train, fft_features_train, derivative_features_train, integral_features_train, instantaneous_frequency_train, constellation_density_train), axis=1)
X_test_combined = np.concatenate((X_test, fft_features_test, derivative_features_test, integral_features_test, instantaneous_frequency_test, constellation_density_test), axis=1)


# new shapes
print("New X_train shape:", X_train_combined.shape)
print("New X_test shape:", X_test_combined.shape)

dr = 0.6  # Dropout rate
in_shp = X_train_combined.shape[1] 


# Model
in_shp = X_train_combined.shape[1]
from tensorflow.keras.layers import BatchNormalization
model = models.Sequential()
model.add(Reshape((in_shp, 1), input_shape=(in_shp,)))  # Optional, depending on the input shape compatibility with Conv1D
model.add(Conv1D(256, 3, padding='same', activation="relu", name="conv1", kernel_initializer='glorot_uniform'))
model.add(BatchNormalization())
model.add(Dropout(dr))
model.add(Conv1D(128, 3, padding="same", activation="relu", name="conv2", kernel_initializer='glorot_uniform'))
model.add(BatchNormalization())
model.add(Dropout(dr))
model.add(Conv1D(64, 3, padding="same", activation="relu", name="conv3", kernel_initializer='glorot_uniform'))
model.add(BatchNormalization())
model.add(Dropout(0.4))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(BatchNormalization())
model.add(Dense(128, activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(0.6))
model.add(Dense(len(unique_mods)))
model.add(Activation('softmax'))

model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()


#training
callbacks = [
    tf.keras.callbacks.ModelCheckpoint('best_model.h5', monitor='val_accuracy', verbose=1, save_best_only=True, mode='auto'),
    tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=20, verbose=1, mode='auto'),
    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6, verbose=1)
]

In [None]:
with tf.device('/GPU:1'):
    history = model.fit(X_train_combined,
                        Y_train,
                        batch_size=256,
                        epochs=100,
                        verbose=1,
                        validation_data=(X_test_combined, Y_test),
                        callbacks=callbacks)


In [None]:
#Plots
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, accuracy_score
plt.figure()
plt.title('Training Performance')
plt.plot(history.epoch, history.history['loss'], label='Train Loss')
plt.plot(history.epoch, history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Confusion Matrix
test_Y_hat = model.predict(X_test_combined, batch_size=256)
conf_matrix = confusion_matrix(np.argmax(Y_test, axis=1), np.argmax(test_Y_hat, axis=1))
classes = unique_mods
def plot_confusion_matrix(cm, title='Confusion Matrix', cmap=plt.cm.Blues, labels=[]):
    plt.figure(figsize=(10, 4))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(labels))
    plt.xticks(tick_marks, labels, rotation=45)
    plt.yticks(tick_marks, labels)
    plt.tight_layout()
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')

plot_confusion_matrix(conf_matrix, labels=classes)
plt.show()

#SNR vs Accuracy
snr_test = snr_test.astype(float)  # SNR values - float
sorted_snrs = np.sort(np.unique(snr_test))
x_axis = []
y_axis = []

for snr in sorted_snrs:
    idx = np.where(snr_test == snr)
    accuracy = accuracy_score(np.argmax(Y_test[idx], axis=1), np.argmax(test_Y_hat[idx], axis=1))
    x_axis.append(snr)
    y_axis.append(accuracy * 100)

plt.figure()
plt.xlabel('SNR (dB)')
plt.ylabel('Accuracy (%)')
plt.title('Classification Accuracy vs. SNR')
plt.plot(x_axis, y_axis, 'ro--')
plt.grid(True)
plt.show()


In [None]:
# Visualizing Constellation for QPSK, QAM-16, QAM-64, and PAM-4 Signals at SNR = 16 dB
# Assuming we have filtered IQ samples for each modulation type at SNR = 16 dB
def plot_constellations_subplot(iq_samples_list, titles):
    plt.figure(figsize=(10, 8))
    for i, (iq_samples, title) in enumerate(zip(iq_samples_list, titles)):
        plt.subplot(2, 2, i + 1)
        plt.scatter(iq_samples[:, ::2], iq_samples[:, 1::2], s=10, alpha=0.5)
        plt.title(title)
        plt.xlabel('In-phase Component')
        plt.ylabel('Quadrature Component')
        plt.grid(True)
        plt.axis('equal')
    plt.tight_layout()
    plt.show()

# Filter the samples based on SNR = 16 and modulation type
snr_16_idx = np.where(snr_train == 16)
qpsk_samples = X_train[snr_16_idx][np.argmax(Y_train[snr_16_idx], axis=1) == list(classes).index('QPSK')]
qam16_samples = X_train[snr_16_idx][np.argmax(Y_train[snr_16_idx], axis=1) == list(classes).index('QAM16')]
qam64_samples = X_train[snr_16_idx][np.argmax(Y_train[snr_16_idx], axis=1) == list(classes).index('QAM64')]
pam4_samples = X_train[snr_16_idx][np.argmax(Y_train[snr_16_idx], axis=1) == list(classes).index('PAM4')]

# Plotting the constellations for the four modulation types
plot_constellations_subplot(
    [qpsk_samples, qam16_samples, qam64_samples, pam4_samples],
    ['QPSK Constellation (SNR=16 dB)', 'QAM-16 Constellation (SNR=16 dB)', 'QAM-64 Constellation (SNR=16 dB)', 'PAM-4 Constellation (SNR=16 dB)']
)

In [None]:
# Visualizing Plot Power Spectral Density (PSD) for WBFM, AM-DSB, GFSK, 8PSK Signals at SNR = 16 dB
def plot_psd_subplot(iq_samples_list, titles, fs=1.0):
    plt.figure(figsize=(12, 12))
    for i, (iq_samples, title) in enumerate(zip(iq_samples_list, titles)):
        plt.subplot(2, 2, i + 1)
        f, Pxx = welch(iq_samples, fs=fs, nperseg=256)
        plt.plot(f, 10 * np.log10(Pxx))
        plt.title(title)
        plt.xlabel('Frequency (Hz)')
        plt.ylabel('PSD (dB/Hz)')
        plt.grid(True)
    plt.tight_layout()
    plt.show()

# Filter the samples based on SNR = 16 and modulation type
snr_16_idx = np.where(snr_train == 16)
wbfm_sample = X_train[snr_16_idx][np.argmax(Y_train[snr_16_idx], axis=1) == list(classes).index('WBFM')][0]  # Select one sample
amdsb_sample = X_train[snr_16_idx][np.argmax(Y_train[snr_16_idx], axis=1) == list(classes).index('AM-DSB')][0]  # Select one sample
gfsk_sample = X_train[snr_16_idx][np.argmax(Y_train[snr_16_idx], axis=1) == list(classes).index('GFSK')][0]  # Select one sample
eightpsk_sample = X_train[snr_16_idx][np.argmax(Y_train[snr_16_idx], axis=1) == list(classes).index('8PSK')][0]  # Select one sample

# Plotting the PSD for the four modulation types
plot_psd_subplot(
    [wbfm_sample, amdsb_sample, gfsk_sample, eightpsk_sample],
    ['WBFM PSD (SNR=16 dB)', 'AM-DSB PSD (SNR=16 dB)', 'GFSK PSD (SNR=16 dB)', '8PSK PSD (SNR=16 dB)']
)


In [None]:

_snr = 16  # Desired SNR value
offset = 0  # Offset to choose a specific sample
modtypes = ['WBFM', 'AM-DSB', 'GFSK', '8PSK']  # List of modulation types

fig, axs = plt.subplots(2, 2, figsize=(12, 8)) 
axs = axs.flatten()  

# Loop through each modulation type and plot the PSD
for n, mod in enumerate(modtypes):
    # Filter a sample based on modulation type and SNR value
    snr_16_idx = np.where(snr_train == _snr)
    mod_idx = np.argmax(Y_train[snr_16_idx], axis=1) == list(classes).index(mod)
    I = X_train[snr_16_idx][mod_idx][offset, ::2]  # In-phase 
    Q = X_train[snr_16_idx][mod_idx][offset, 1::2]  # Quadrature 
    
    #complex signal from I and Q
    cmplx = [complex(I[k], Q[k]) for k in range(len(I))]

    #Power Spectral Density (PSD)
    axs[n].psd(cmplx, NFFT=128, Fs=1.0)  # Fs can be adjusted as needed
    axs[n].set_title(f'{mod} PSD (SNR={_snr} dB)')  

plt.tight_layout()  
plt.show() 


In [None]:
import numpy as np
import matplotlib.pyplot as plt

_snr = 16
offset = 0  # Select a specific sample
modtypes = ['WBFM', 'AM-DSB']

snr_16_idx = np.where(snr_train == _snr)
wbfm_idx = np.argmax(Y_train[snr_16_idx], axis=1) == list(classes).index('WBFM')
wbfm_sample = X_train[snr_16_idx][wbfm_idx][offset]  # one WBFM sample

amdsb_idx = np.argmax(Y_train[snr_16_idx], axis=1) == list(classes).index('AM-DSB')
amdsb_sample = X_train[snr_16_idx][amdsb_idx][offset]  # one AM-DSB sample

# In-phase (I) and Quadrature (Q) components
I_wbfm = wbfm_sample[::2]
Q_wbfm = wbfm_sample[1::2]

I_amdsb = amdsb_sample[::2]
Q_amdsb = amdsb_sample[1::2]

# complex signals from I and Q
wbfm_complex = I_wbfm + 1j * Q_wbfm
amdsb_complex = I_amdsb + 1j * Q_amdsb

# instantaneous amplitude and phase
amplitude_wbfm = np.abs(wbfm_complex)
phase_wbfm = np.angle(wbfm_complex)

amplitude_amdsb = np.abs(amdsb_complex)
phase_amdsb = np.angle(amdsb_complex)

# Plotting the instantaneous amplitude and phase for WBFM and AM-DSB
fig, axs = plt.subplots(2, 2, figsize=(14, 10))

# for WBFM
axs[0, 0].plot(amplitude_wbfm, color='b')
axs[0, 0].set_title('WBFM Instantaneous Amplitude')
axs[0, 0].set_xlabel('Sample Index')
axs[0, 0].set_ylabel('Amplitude')

axs[1, 0].plot(phase_wbfm, color='r')
axs[1, 0].set_title('WBFM Instantaneous Phase')
axs[1, 0].set_xlabel('Sample Index')
axs[1, 0].set_ylabel('Phase (radians)')

# for AM-DSB
axs[0, 1].plot(amplitude_amdsb, color='b')
axs[0, 1].set_title('AM-DSB Instantaneous Amplitude')
axs[0, 1].set_xlabel('Sample Index')
axs[0, 1].set_ylabel('Amplitude')

axs[1, 1].plot(phase_amdsb, color='r')
axs[1, 1].set_title('AM-DSB Instantaneous Phase')
axs[1, 1].set_xlabel('Sample Index')
axs[1, 1].set_ylabel('Phase (radians)')

plt.tight_layout()
plt.show()


In [None]:
_snr = 8  
offset = 2  
modtypes = ['WBFM', 'AM-DSB']
snr_16_idx = np.where(snr_train == _snr)
wbfm_idx = np.argmax(Y_train[snr_16_idx], axis=1) == list(classes).index('WBFM')
wbfm_sample = X_train[snr_16_idx][wbfm_idx][offset]  
amdsb_idx = np.argmax(Y_train[snr_16_idx], axis=1) == list(classes).index('AM-DSB')
amdsb_sample = X_train[snr_16_idx][amdsb_idx][offset]  

I_wbfm = wbfm_sample[::2]
Q_wbfm = wbfm_sample[1::2]
I_amdsb = amdsb_sample[::2]
Q_amdsb = amdsb_sample[1::2]

wbfm_complex = I_wbfm + 1j * Q_wbfm
amdsb_complex = I_amdsb + 1j * Q_amdsb

# instantaneous amplitude
amplitude_wbfm = np.abs(wbfm_complex)
amplitude_amdsb = np.abs(amdsb_complex)

# instantaneous frequency
phase_wbfm = np.unwrap(np.angle(wbfm_complex)) 
inst_freq_wbfm = np.gradient(phase_wbfm)  # derivative of the unwrapped phase

phase_amdsb = np.unwrap(np.angle(amdsb_complex))  
inst_freq_amdsb = np.gradient(phase_amdsb)  # derivative of the unwrapped phase

# Plotting the instantaneous amplitude and frequency for WBFM and AM-DSB
fig, axs = plt.subplots(2, 2, figsize=(14, 10))

# for WBFM
axs[0, 0].plot(amplitude_wbfm, color='b')
axs[0, 0].set_title('WBFM Instantaneous Amplitude')
axs[0, 0].set_xlabel('Sample Index')
axs[0, 0].set_ylabel('Amplitude')

axs[1, 0].plot(inst_freq_wbfm, color='r')
axs[1, 0].set_title('WBFM Instantaneous Frequency')
axs[1, 0].set_xlabel('Sample Index')
axs[1, 0].set_ylabel('Frequency (radians/sample)')

# for AM-DSB
axs[0, 1].plot(amplitude_amdsb, color='b')
axs[0, 1].set_title('AM-DSB Instantaneous Amplitude')
axs[0, 1].set_xlabel('Sample Index')
axs[0, 1].set_ylabel('Amplitude')

axs[1, 1].plot(inst_freq_amdsb, color='r')
axs[1, 1].set_title('AM-DSB Instantaneous Frequency')
axs[1, 1].set_xlabel('Sample Index')
axs[1, 1].set_ylabel('Frequency (radians/sample)')

plt.tight_layout()
plt.show()
