In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, BatchNormalization

def apply_channel(symbols, SNR_dB):
    Kr = 10
    power = 1
    s = np.sqrt((Kr / 2) * (Kr + 1) * power)
    sigma = power / np.sqrt(2 * (Kr + 1))
    avg_path_gains = np.array([0, -0.9, -4.9, -8, -7.8, -23.9])
    n_tap = len(avg_path_gains)

    tap_var = 10 ** (avg_path_gains / 10)
    tap_var = tap_var / np.sum(tap_var)

    hn1 = (1 / np.sqrt(2)) * (1 / np.sqrt(n_tap)) * sigma * np.sqrt(tap_var) * \
           ((np.random.randn(1, n_tap) + s) + 1j * (np.random.randn(1, n_tap) + s))

    faded_signal = np.zeros(len(symbols), dtype=complex)
    for i in range(n_tap):
        faded_signal += hn1[0, i] * np.roll(symbols, i)

    noise = (1 / np.sqrt(2)) * (np.random.randn(len(symbols)) + 1j * np.random.randn(len(symbols)))
    SNR_linear = 10 ** (SNR_dB / 10.0)
    noise_power = np.var(faded_signal) / SNR_linear
    noisy_symbols = faded_signal + np.sqrt(noise_power) * noise
    return noisy_symbols

def generate_psk(num_samples, SNR_dB, M):
    bits = np.random.randint(0, M, num_samples)
    angles = 2 * np.pi * bits / M
    symbols = np.exp(1j * angles)
    return apply_channel(symbols, SNR_dB)


def generate_qam(num_samples, SNR_dB, M):
    k = int(np.log2(M))
    sqrt_M = int(np.sqrt(M))
    data = np.random.randint(0, 2, size=(num_samples, k))
    symbols = np.dot(data, 2 ** np.arange(k)[::-1])
    x_vals = (2 * (symbols % sqrt_M) - (sqrt_M - 1))
    y_vals = (2 * (symbols // sqrt_M) - (sqrt_M - 1))
    qam_symbols = x_vals + 1j * y_vals
    return apply_channel(qam_symbols, SNR_dB)

def extract_cnn_features(signal, block_size=16):
    real = np.real(signal)
    imag = np.imag(signal)

    norm_factor = np.max(np.abs(signal))
    real /= norm_factor
    imag /= norm_factor

    num_blocks = len(signal) // block_size
    features = []

    for i in range(num_blocks):
        start = i * block_size
        end = start + block_size
        block = np.stack([real[start:end], imag[start:end]], axis=1)
        features.append(block)

    return np.array(features)

def build_psk_dataset(num_samples, SNR_dB, block_size=16):
    PSK_mods = [("BPSK", 2), ("QPSK", 4), ("8PSK", 8),
                ("16PSK", 16), ("32PSK", 32),
                ("64PSK", 64), ("128PSK", 128), ("256PSK", 256)]
    signals, labels = [], []
    for idx, (_, M) in enumerate(PSK_mods):
        sig = generate_psk(num_samples, SNR_dB, M)
        feats = extract_cnn_features(sig, block_size)
        signals.append(feats)
        labels.append(np.full(len(feats), idx))
    return np.vstack(signals), np.hstack(labels)


def build_qam_dataset(num_samples, SNR_dB, block_size=16):
    QAM_mods = [("32QAM", 32), ("64QAM", 64),
                ("128QAM", 128), ("256QAM", 256)]
    signals, labels = [], []
    for idx, (_, M) in enumerate(QAM_mods):
        sig = generate_qam(num_samples, SNR_dB, M)
        feats = extract_cnn_features(sig, block_size)
        signals.append(feats)
        labels.append(np.full(len(feats), idx))
    return np.vstack(signals), np.hstack(labels)


def build_meta_dataset(num_samples, SNR_dB, block_size=16):
    X_psk, y_psk = build_psk_dataset(num_samples, SNR_dB, block_size)
    X_qam, y_qam = build_qam_dataset(num_samples, SNR_dB, block_size)

    X = np.vstack([X_psk, X_qam])
    y = np.hstack([np.zeros(len(X_psk)), np.ones(len(X_qam))])
    return X, y


def create_cnn(block_size, num_classes):
    model = Sequential([
        Conv1D(64, 3, activation='relu', input_shape=(block_size, 2)),
        BatchNormalization(),
        MaxPooling1D(2),
        Conv1D(128, 3, activation='relu'),
        BatchNormalization(),
        MaxPooling1D(2),
        Flatten(),
        Dense(256, activation='relu'),
        Dropout(0.4),
        Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

if __name__ == "__main__":
    num_samples = 4000
    block_size = 16
    SNR_dB = 15

    print("Building Meta (PSK vs QAM) dataset...")
    X_meta, y_meta = build_meta_dataset(num_samples, SNR_dB, block_size)
    X_train, X_test, y_train, y_test = train_test_split(X_meta, y_meta, test_size=0.2, random_state=42)

    meta_model = create_cnn(block_size, num_classes=2)
    print("Training Meta-classifier (PSK vs QAM)...")
    meta_model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=20, batch_size=64, verbose=1)

    print("\nBuilding PSK dataset...")
    X_psk, y_psk = build_psk_dataset(num_samples, SNR_dB, block_size)
    X_train_p, X_test_p, y_train_p, y_test_p = train_test_split(X_psk, y_psk, test_size=0.2, random_state=42)

    psk_model = create_cnn(block_size, num_classes=8)
    print("Training PSK classifier...")
    psk_model.fit(X_train_p, y_train_p, validation_data=(X_test_p, y_test_p), epochs=25, batch_size=64, verbose=1)
    print("\nBuilding QAM dataset...")
    X_qam, y_qam = build_qam_dataset(num_samples, SNR_dB, block_size)
    X_train_q, X_test_q, y_train_q, y_test_q = train_test_split(X_qam, y_qam, test_size=0.2, random_state=42)

    qam_model = create_cnn(block_size, num_classes=4)
    print("Training QAM classifier...")
    qam_model.fit(X_train_q, y_train_q, validation_data=(X_test_q, y_test_q), epochs=25, batch_size=64, verbose=1)

    print("\nEvaluating Hierarchical System...")
    X_eval, y_eval = build_meta_dataset(1000, SNR_dB, block_size)
    meta_preds = np.argmax(meta_model.predict(X_eval), axis=1)

    correct = 0
    for i, pred in enumerate(meta_preds):
        if pred == 0:
            sub_pred = np.argmax(psk_model.predict(X_eval[i:i+1]), axis=1)
        else:
            sub_pred = np.argmax(qam_model.predict(X_eval[i:i+1]), axis=1)
        if pred == y_eval[i]:
            correct += 1

    print(f"Overall Meta Accuracy (Family detection): {correct / len(y_eval):.4f}")


Building Meta (PSK vs QAM) dataset...


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Training Meta-classifier (PSK vs QAM)...
Epoch 1/20
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 116ms/step - accuracy: 0.6692 - loss: 0.7797 - val_accuracy: 0.3117 - val_loss: 0.7436
Epoch 2/20
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - accuracy: 0.7825 - loss: 0.4937 - val_accuracy: 0.3100 - val_loss: 0.9513
Epoch 3/20
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - accuracy: 0.8171 - loss: 0.4089 - val_accuracy: 0.3100 - val_loss: 1.3290
Epoch 4/20
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - accuracy: 0.8387 - loss: 0.3614 - val_accuracy: 0.3100 - val_loss: 1.9990
Epoch 5/20
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - accuracy: 0.8568 - loss: 0.3344 - val_accuracy: 0.3100 - val_loss: 2.6249
Epoch 6/20
[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - accuracy: 0.8949 - loss: 0.2746 - val_accuracy: 0.3100 - val_loss: 2.7