In [None]:
# ==============================
# Mount Drive and Imports
# ==============================
from google.colab import drive
drive.mount('/content/drive')

import os, glob
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from tensorflow.keras import Model, Input
from tensorflow.keras.layers import *
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.applications import MobileNetV2
from sklearn.utils import shuffle, compute_class_weight
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
!pip install tabulate  # For nice table printing
from tabulate import tabulate

# ==============================
# Dataset Loader (from your code)
# ==============================
class DataSet2(object):
    def __init__(self, images, labels):
        self._images = images
        self._labels = labels
        self._num_examples = images.shape[0]

    @property
    def images(self): return self._images
    @property
    def labels(self): return self._labels
    @property
    def num_examples(self): return self._num_examples

def load_train2(train_path, classes):
    samples, labels = [], []
    for idx, cls in enumerate(classes):
        path = os.path.join(train_path, cls, '*.npy')
        files = glob.glob(path)
        for fl in files:
            iq_samples = np.load(fl)
            real, imag = np.real(iq_samples), np.imag(iq_samples)
            iq_samples = np.ravel(np.column_stack((real, imag)))
            iq_samples = iq_samples[:1568].reshape(28, 28, 2)
            samples.append(iq_samples)
            label = np.zeros(len(classes)); label[idx] = 1.0
            labels.append(label)
    return np.array(samples), np.array(labels)

def read_train_sets2(train_path, classes, validation_size):
    class DataSets: pass
    data_sets = DataSets()
    images, labels = load_train2(train_path, classes)
    images, labels = shuffle(images, labels)
    if isinstance(validation_size, float):
        validation_size = int(validation_size * images.shape[0])
    val_images, val_labels = images[:validation_size], labels[:validation_size]
    train_images, train_labels = images[validation_size:], labels[validation_size:]
    data_sets.train = DataSet2(train_images, train_labels)
    data_sets.valid = DataSet2(val_images, val_labels)
    return data_sets

# ==============================
# Models (from your code, with variants for ablations)
# ==============================
class BaselineModel:
    def __init__(self, input_shape, num_classes):
        self.input_shape = input_shape; self.num_classes = num_classes
    def create_model(self):
        inp = Input(shape=self.input_shape)
        x = Conv2D(128, (3,3), activation='relu', padding='same')(inp); x = BatchNormalization()(x)
        x = MaxPooling2D((2,2), padding='same')(x); x = Dropout(0.1)(x)
        for _ in range(3):
            x = Conv2D(64, (3,3), activation='relu', padding='same')(x); x = BatchNormalization()(x)
            x = MaxPooling2D((2,2), padding='same')(x)
        x = Flatten()(x); x = Dense(128, activation='relu')(x); x = BatchNormalization()(x)
        out = Dense(self.num_classes, activation='softmax')(x)
        return Model(inp, out, name="Baseline_Model")

class EnhancedOptimizedCNN:
    def __init__(self, input_shape, num_classes, alpha=1.0, use_dropout=True):
        self.input_shape = input_shape; self.num_classes = num_classes
        self.alpha = alpha; self.use_dropout = use_dropout
    def _inverted_residual_block_v2(self, x, filters, strides, expand_ratio, alpha, dropout_rate):
        input_channels = int(x.shape[-1])
        expanded_channels = int(input_channels * expand_ratio * alpha)
        output_channels = int(filters * alpha)
        if expand_ratio != 1:
            x_exp = Conv2D(expanded_channels, (1,1), padding='same', use_bias=False)(x)
            x_exp = BatchNormalization()(x_exp); x_exp = ReLU(max_value=6.0)(x_exp)
            if self.use_dropout: x_exp = Dropout(dropout_rate)(x_exp)
        else: x_exp = x
        x_dw = DepthwiseConv2D((3,3), strides=strides, padding='same', use_bias=False)(x_exp)
        x_dw = BatchNormalization()(x_dw); x_dw = ReLU(max_value=6.0)(x_dw)
        x_proj = Conv2D(output_channels, (1,1), padding='same', use_bias=False)(x_dw)
        x_proj = BatchNormalization()(x_proj)
        if strides==1 and input_channels==output_channels:
            if self.use_dropout: x_proj = Dropout(dropout_rate/2)(x_proj)
            return Add()([x, x_proj])
        else: return x_proj
    def create_model(self, dropout_rates):
        inp = Input(shape=self.input_shape)
        x = Conv2D(int(12*self.alpha), (3,3), strides=1, padding='same', use_bias=False)(inp)
        x = BatchNormalization()(x); x = ReLU(max_value=6.0)(x)
        if self.use_dropout: x = Dropout(dropout_rates[0])(x)
        x = self._inverted_residual_block_v2(x,20,2,1,self.alpha,dropout_rates[1])
        x = self._inverted_residual_block_v2(x,24,1,2,self.alpha,dropout_rates[2])
        x = self._inverted_residual_block_v2(x,48,2,2,self.alpha,dropout_rates[3])
        x = self._inverted_residual_block_v2(x,64,1,2,self.alpha,dropout_rates[4])
        x = GlobalAveragePooling2D()(x); x = Dense(32, activation='relu')(x)
        if self.use_dropout: x = Dropout(dropout_rates[5])(x)
        out = Dense(self.num_classes, activation='softmax')(x)
        return Model(inp, out, name="EnhancedOptimizedCNN")

class MobileNetV2Adapter:
    def __init__(self, input_shape, num_classes):
        self.input_shape = input_shape; self.num_classes = num_classes
    def create_model(self):
        inp = Input(shape=self.input_shape)
        if self.input_shape[-1] == 2:
            x = Conv2D(3, (1,1), padding='same')(inp)
        else: x = inp
        x = Resizing(32,32)(x)
        base = MobileNetV2(input_shape=(32,32,3), include_top=False, weights=None)
        x = base(x); x = GlobalAveragePooling2D()(x)
        x = Dense(128, activation='relu')(x); x = Dropout(0.2)(x)
        out = Dense(self.num_classes, activation='softmax')(x)
        return Model(inp, out, name="MobileNetV2_RF")

# ==============================
# Augmentation (from your code)
# ==============================
class AdvancedRFAugmentation:
    def __init__(self, snr_range, augmentation_factor):
        self.snr_range = snr_range; self.augmentation_factor = augmentation_factor
    def add_awgn(self, signal, snr_db):
        snr_linear = 10.0**(snr_db/10.0)
        signal_power = np.mean(signal[:,:,0]**2 + signal[:,:,1]**2)
        noise_power = signal_power/snr_linear; noise_std = np.sqrt(noise_power/2)
        if np.random.random() > 0.8:
            white_noise = np.random.normal(0, noise_std, signal.shape)
            colored_noise = np.zeros_like(white_noise)
            for i in range(1, signal.shape[0]):
                colored_noise[i] = 0.7 * colored_noise[i - 1] + 0.3 * white_noise[i]
            noise = colored_noise
        else:
            noise = np.random.normal(0, noise_std, signal.shape)
        return (signal + noise).astype(np.float32)
    def add_frequency_selective_fading(self, signal):
        if signal.shape[-1] != 2: return signal
        if np.random.random() > 0.7:
            delay = np.random.randint(1, 4)
            amplitude = np.random.uniform(0.3, 0.7)
            delayed_signal = np.roll(signal, delay, axis=0) * amplitude
            return signal + delayed_signal
        return signal
    def add_phase_noise(self, signal, phase_std=0.15):
        if signal.shape[-1] != 2: return signal
        phase_noise = np.random.normal(0, phase_std, signal.shape[:2])
        for i in range(1, signal.shape[0]):
            phase_noise[i] = 0.9 * phase_noise[i - 1] + 0.1 * phase_noise[i]
        cos_noise = np.cos(phase_noise); sin_noise = np.sin(phase_noise)
        i_noisy = signal[:, :, 0] * cos_noise - signal[:, :, 1] * sin_noise
        q_noisy = signal[:, :, 0] * sin_noise + signal[:, :, 1] * cos_noise
        result = signal.copy(); result[:, :, 0] = i_noisy; result[:, :, 1] = q_noisy
        return result
    def augment_batch_enhanced(self, signals, labels):
        augmented_signals, augmented_labels = [], []
        for i in range(len(signals)):
            augmented_signals.append(signals[i]); augmented_labels.append(labels[i])
            for _ in range(self.augmentation_factor):
                snr_db = np.random.uniform(self.snr_range[0], self.snr_range[1])
                augmented_signal = self.add_awgn(signals[i], snr_db)
                if np.random.random() > 0.6:
                    augmented_signal = self.add_frequency_selective_fading(augmented_signal)
                if np.random.random() > 0.7:
                    augmented_signal = self.add_phase_noise(augmented_signal)
                augmented_signals.append(augmented_signal); augmented_labels.append(labels[i])
        return np.array(augmented_signals), np.array(augmented_labels)

# ==============================
# Training Pipeline (from your code, with use_augmentation flag)
# ==============================
class ClassWeightedTrainingPipeline:
    def __init__(self, model, augmentor, classes, use_augmentation=True):
        self.model = model; self.augmentor = augmentor; self.classes = classes
        self.use_augmentation = use_augmentation; self.history=None
    def compute_class_weights(self, y_train):
        y_train_labels = np.argmax(y_train, axis=1)
        class_weights = compute_class_weight('balanced', classes=np.unique(y_train_labels), y=y_train_labels)
        return dict(enumerate(class_weights))
    def create_enhanced_callbacks(self, patience_es, patience_lr, min_delta):
        return [
            EarlyStopping(monitor='val_loss', patience=patience_es, restore_best_weights=True, min_delta=min_delta),
            ReduceLROnPlateau(monitor='val_loss', factor=0.3, patience=patience_lr, min_lr=1e-8)
        ]
    def train_model(self, X_train, y_train, Xtest, Ytest,
                    epochs, batch_size, lr, weight_decay,
                    beta_1, beta_2, patience_es, patience_lr, min_delta):
        if self.use_augmentation: X_train_aug, y_train_aug = self.augmentor.augment_batch_enhanced(X_train, y_train)
        else: X_train_aug, y_train_aug = X_train, y_train
        class_weights = self.compute_class_weights(y_train_aug)
        callbacks = self.create_enhanced_callbacks(patience_es, patience_lr, min_delta)
        optimizer = tf.keras.optimizers.AdamW(learning_rate=lr, weight_decay=weight_decay, beta_1=beta_1, beta_2=beta_2)
        self.model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
        self.history = self.model.fit(X_train_aug, y_train_aug, validation_data=(Xtest,Ytest),
                                      epochs=epochs, batch_size=batch_size, class_weight=class_weights,
                                      callbacks=callbacks, verbose=1, shuffle=True)
        return self.history

# ==============================
# Evaluation Functions
# ==============================
def evaluate_model(model, Xtest, Ytest):
    y_pred_probs = model.predict(Xtest)
    y_pred = np.argmax(y_pred_probs, axis=1); y_true = np.argmax(Ytest, axis=1)
    acc = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred, average="weighted")
    return acc, f1

def create_noisy_test_set(Xtest, snr_db, augmentor):
    noisy_X = []
    for signal in Xtest:
        noisy_signal = augmentor.add_awgn(signal, snr_db)  # Only AWGN for SNR-specific eval
        noisy_X.append(noisy_signal)
    return np.array(noisy_X)

# ==============================
# Main Execution
# ==============================
train_path ='/content/drive/MyDrive/rfsc-dataset/Dataset_Deep_Radio/Dataset_Deep_Radio/training_data'
classes = [d for d in os.listdir(train_path) if os.path.isdir(os.path.join(train_path, d))]
num_classes = len(classes)

# Load data (80/20 split for all except baseline)
data = read_train_sets2(train_path, classes, validation_size=0.2)
Xtrain, Ytrain, Xtest, Ytest = data.train.images, data.train.labels, data.valid.images, data.valid.labels

# Hyperparams (from your example)
input_shape=(28,28,2); dropout_rates=[0.05,0.09,0.14,0.19,0.20,0.37]
snr_range=(5,25); augmentation_factor=6
epochs=50; batch_size=32; lr=1e-4; weight_decay=2e-4; beta_1=0.86; beta_2=0.99  # Reduced epochs for speed
patience_es=6; patience_lr=3; min_delta=5e-4

# Augmentor
augmentor = AdvancedRFAugmentation(snr_range, augmentation_factor)

# Train Baseline (70/30 split, as in your code)
data_base = read_train_sets2(train_path, classes, validation_size=0.3)
Xtrain_base, Ytrain_base, Xtest_base, Ytest_base = data_base.train.images, data_base.train.labels, data_base.valid.images, data_base.valid.labels
baseline = BaselineModel(input_shape, num_classes).create_model()
baseline.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])
history_base = baseline.fit(Xtrain_base, Ytrain_base, validation_data=(Xtest_base,Ytest_base),
                            epochs=epochs, batch_size=batch_size, verbose=1)

# Train MobileNetV2 (80/20)
mobilenet = MobileNetV2Adapter(input_shape, num_classes).create_model()
mobilenet.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])
history_mobilenet = mobilenet.fit(Xtrain, Ytrain, validation_data=(Xtest,Ytest),
                                  epochs=epochs, batch_size=batch_size, verbose=1)

# Train Proposed Full (with aug, α=1.0, dropout=True)
print("Training Proposed Full (with Aug, α=1.0, Dropout=True)...")
enhanced_full = EnhancedOptimizedCNN(input_shape, num_classes, alpha=1.0, use_dropout=True).create_model(dropout_rates)
pipeline_full = ClassWeightedTrainingPipeline(enhanced_full, augmentor, classes, use_augmentation=True)
history_full = pipeline_full.train_model(Xtrain,Ytrain,Xtest,Ytest,epochs,batch_size,lr,weight_decay,
                                         beta_1,beta_2,patience_es,patience_lr,min_delta)

# Ablation: No Augmentation (use_augmentation=False)
print("Training Ablation: No Augmentation...")
enhanced_noaug = EnhancedOptimizedCNN(input_shape, num_classes, alpha=1.0, use_dropout=True).create_model(dropout_rates)
pipeline_noaug = ClassWeightedTrainingPipeline(enhanced_noaug, augmentor, classes, use_augmentation=False)
history_noaug = pipeline_noaug.train_model(Xtrain,Ytrain,Xtest,Ytest,epochs,batch_size,lr,weight_decay,
                                           beta_1,beta_2,patience_es,patience_lr,min_delta)

# Ablation: α=0.85 (with aug, dropout=True)
print("Training Ablation: α=0.85...")
enhanced_alpha085 = EnhancedOptimizedCNN(input_shape, num_classes, alpha=0.85, use_dropout=True).create_model(dropout_rates)
pipeline_alpha085 = ClassWeightedTrainingPipeline(enhanced_alpha085, augmentor, classes, use_augmentation=True)
history_alpha085 = pipeline_alpha085.train_model(Xtrain,Ytrain,Xtest,Ytest,epochs,batch_size,lr,weight_decay,
                                                 beta_1,beta_2,patience_es,patience_lr,min_delta)

# Ablation: α=1.20 (with aug, dropout=True)
print("Training Ablation: α=1.20...")
enhanced_alpha120 = EnhancedOptimizedCNN(input_shape, num_classes, alpha=1.20, use_dropout=True).create_model(dropout_rates)
pipeline_alpha120 = ClassWeightedTrainingPipeline(enhanced_alpha120, augmentor, classes, use_augmentation=True)
history_alpha120 = pipeline_alpha120.train_model(Xtrain,Ytrain,Xtest,Ytest,epochs,batch_size,lr,weight_decay,
                                                 beta_1,beta_2,patience_es,patience_lr,min_delta)

# Ablation: No Dropout (with aug, use_dropout=False)
print("Training Ablation: No Dropout...")
enhanced_nodrop = EnhancedOptimizedCNN(input_shape, num_classes, alpha=1.0, use_dropout=False).create_model(dropout_rates)
pipeline_nodrop = ClassWeightedTrainingPipeline(enhanced_nodrop, augmentor, classes, use_augmentation=True)
history_nodrop = pipeline_nodrop.train_model(Xtrain,Ytrain,Xtest,Ytest,epochs,batch_size,lr,weight_decay,
                                              beta_1,beta_2,patience_es,patience_lr,min_delta)

# ==============================
# Generate Ablation Table
# ==============================
# Evaluate all ablations
acc_full, f1_full = evaluate_model(pipeline_full.model, Xtest, Ytest)
acc_noaug, f1_noaug = evaluate_model(pipeline_noaug.model, Xtest, Ytest)
acc_alpha085, f1_alpha085 = evaluate_model(pipeline_alpha085.model, Xtest, Ytest)
acc_alpha120, f1_alpha120 = evaluate_model(pipeline_alpha120.model, Xtest, Ytest)
acc_nodrop, f1_nodrop = evaluate_model(pipeline_nodrop.model, Xtest, Ytest)

# Ablation table data
ablation_data = [
    ["Full Model", f"{acc_full:.3f}", f"{f1_full:.3f}", "Baseline for ablations"],
    ["No Augmentation", f"{acc_noaug:.3f}", f"{f1_noaug:.3f}", "Significant drop at low SNR"],
    ["α = 0.85", f"{acc_alpha085:.3f}", f"{f1_alpha085:.3f}", "Lower FLOPs, slight accuracy trade-off"],
    ["α = 1.20", f"{acc_alpha120:.3f}", f"{f1_alpha120:.3f}", "Higher accuracy but increased FLOPs"],
    ["No Dropout", f"{acc_nodrop:.3f}", f"{f1_nodrop:.3f}", "Reduced robustness to fading"]
]

print("Ablation Table:")
print(tabulate(ablation_data, headers=["Configuration", "Accuracy", "Weighted F1", "Notes"], tablefmt="grid"))

# ==============================
# Generate SNR-Wise Table
# ==============================
snr_levels = [5, 10, 15, 20, 25]
snr_data = [["SNR (dB)", "Proposed Accuracy", "Baseline [13] Accuracy", "MobileNetV2 Accuracy"]]

for snr in snr_levels:
    noisy_Xtest = create_noisy_test_set(Xtest, snr, augmentor)
    acc_proposed, _ = evaluate_model(pipeline_full.model, noisy_Xtest, Ytest)
    acc_baseline, _ = evaluate_model(baseline, noisy_Xtest, Ytest)  # Assuming baseline test set size matches
    acc_mobilenet, _ = evaluate_model(mobilenet, noisy_Xtest, Ytest)
    snr_data.append([snr, f"{acc_proposed:.3f}", f"{acc_baseline:.3f}", f"{acc_mobilenet:.3f}"])

print("\nSNR-Wise Table:")
print(tabulate(snr_data, headers="firstrow", tablefmt="grid"))

# ==============================
# Additional Outputs (Curves and Confusion Matrices, as in your pasted code)
# ==============================
# (Code for plots here – same as your pasted example, omitted for brevity but can be added if needed)

Mounted at /content/drive
Epoch 1/50
[1m66/66[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 124ms/step - accuracy: 0.4831 - loss: 1.4641 - val_accuracy: 0.2678 - val_loss: 3.3866
Epoch 2/50
[1m66/66[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step - accuracy: 0.8565 - loss: 0.3938 - val_accuracy: 0.1289 - val_loss: 6.1865
Epoch 3/50
[1m66/66[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 7ms/step - accuracy: 0.8841 - loss: 0.3085 - val_accuracy: 0.1289 - val_loss: 8.4931
Epoch 4/50
[1m66/66[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step - accuracy: 0.9231 - loss: 0.2207 - val_accuracy: 0.1289 - val_loss: 9.0100
Epoch 5/50
[1m66/66[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step - accuracy: 0.9077 - loss: 0.2358 - val_accuracy: 0.1289 - val_loss: 9.5266
Epoch 6/50
[1m66/66[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 7ms/step - accuracy: 0.9380 - loss: 0.1893 - val_accuracy: 0.2678 - val_loss: 5.0994
Epoch 7/50
