In [None]:
import os
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import layers, Model, models
from tensorflow.keras.layers import Input, Conv2D, ReLU, Flatten, Dense, Reshape, Conv2DTranspose, Activation
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, UpSampling2D, Flatten, Dense, Reshape
import matplotlib.pyplot as plt
from tensorflow.keras.utils import to_categorical
from tensorflow import keras
from tensorflow.keras import regularizers
from tensorflow.keras.optimizers import Adam
import numpy as np
import seaborn as sns
from tensorflow.keras.models import load_model
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import pdb
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
from tensorflow.keras.optimizers import SGD,RMSprop
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.ensemble import RandomForestClassifier

In [None]:
# Load pretrained models
signal_model = load_model(r'D:\SDRChallenge-main\SDR_data\ANN_tuned.h5')
scalogram_model = load_model(r'D:\SDRChallenge-main\SDR_data\Model_spectograms.h5')


In [None]:
# Freeze pretrained models
signal_model.trainable = False
scalogram_model.trainable = False

In [None]:
# Remove top layers of both models
signal_feature_extractor = Model(inputs=signal_model.input, outputs=signal_model.layers[-5].output)
scalogram_feature_extractor = Model(inputs=scalogram_model.input, outputs=scalogram_model.layers[-2].output)


In [None]:
signal_feature_extractor.summary()

In [None]:
scalogram_feature_extractor.summary()

In [None]:
# Define inputs for the unified model
signal_input = Input(shape=(614400,), name='signal_input')  # Adjust shape if needed
scalogram_input = Input(shape=(256, 256, 3), name='scalogram_input')  # Shape matches scalogram images


In [None]:
# Extract features
signal_features = signal_feature_extractor(signal_input)
scalogram_features = scalogram_feature_extractor(scalogram_input)

In [None]:
from tensorflow.keras.layers import Input, Concatenate, Dense, Dropout

In [None]:
combined_features = Concatenate()([signal_features, scalogram_features])


In [None]:
# Add ANN layers for classification
x = Dense(128, activation='relu')(combined_features)
x = Dropout(0.5)(x)
x = Dense(64, activation='relu')(x)
x = Dropout(0.5)(x)
output = Dense(4, activation='softmax')(x)

In [None]:
# Build the unified model
unified_model = Model(inputs=[signal_input, scalogram_input], outputs=output)

# Compile the model
# Compile the model with a low learning rate
unified_model.compile(
    optimizer=Adam(learning_rate=1e-5),  # Start with a small learning rate
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

In [None]:
folder_path= r"D:\SDRChallenge-main\SDR_data\I_Q_data.Numpy\SNR0"

def load_npy_data(folder_path, label):
    data_list = []
    #c = 0  
    for subdir, _, files in os.walk(folder_path):
        for file in files:
            if file.endswith(".npy"):
                file_path = os.path.join(subdir, file)
                data = np.load(file_path)  # Load the .npy file
                # print(data.shape)
                data = np.nan_to_num(data, nan=0.0) # Keep rows with no NaN value
                # print(data.shape)

                # Assuming data has I values in the first column and Q values in the second column
                I_values = data[:, 0]
                Q_values = data[:, 1]
                stacked_IQ_values = np.concatenate((I_values, Q_values), axis=0).reshape(-1, 1)
                # print(stacked_IQ_values.shape)
                transposed_IQ_values = stacked_IQ_values.T
                # print(transposed_IQ_values.shape)
                labeled_data = np.append(transposed_IQ_values, label).reshape(1, -1)
                              
                # Check for NaN or Inf values and handle them
                # if np.isnan(data).any() or np.isinf(data).any():
                #     data = np.nan_to_num(data)  # Replace NaN and Inf with 0
                # # Add labeled row to the data list
                data_list.append(labeled_data)
                # print(labeled_data.shape)
                # print(len(data_list))
    return np.vstack(data_list) if data_list else None


radar_data = load_npy_data(r'D:\SDRChallenge-main\SDR_data\I_Q_data.Numpy\SNR0\Radar', label=0)
g5_data = load_npy_data(r'D:\SDRChallenge-main\SDR_data\I_Q_data.Numpy\SNR0\5G_Only', label=1)
radar_5g_data = load_npy_data(r'D:\SDRChallenge-main\SDR_data\I_Q_data.Numpy\SNR0\5G+Radar', label=2)
Noise = load_npy_data(r'D:\SDRChallenge-main\SDR_data\I_Q_data.Numpy\SNR0\Noise', label=3)

In [None]:
# Combine data if available
data_to_combine = [d for d in [radar_data, g5_data, radar_5g_data, Noise] if d is not None]
# data_to_combine = [d for d in [radar_data, g5_data] if d is not None]
print(len(data_to_combine))
if data_to_combine:
    combined_data = np.vstack(data_to_combine)
    # Print shape of the combined data
    print(f"Combined data shape: {combined_data.shape}")
    # Display first few rows to verify the format
    print("First few rows of combined data:")
    print(combined_data[:5])
else:
    print("No data loaded from the specified folders.")

In [None]:
X = combined_data[:, :-1]  # Features (all columns except the last)
y = combined_data[:, -1]   # Labels (last column)

In [None]:
scaler = StandardScaler()
X = scaler.fit_transform(X)

In [None]:
y = to_categorical(y, num_classes=4)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
# # Step 2: Split the training data into training and validation sets (e.g., 70% train, 10% validation)
# X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.125, random_state=42)

# # Print the sizes of each set
# print(f"Training set: {X_train.shape[0]} samples")
# print(f"Validation set: {X_val.shape[0]} samples")
# print(f"Test set: {X_test.shape[0]} samples")


In [None]:
path = r'D:\SDRChallenge-main\SDR_data\SNR0'

In [None]:
batch_size = 32
img_height = 256
img_width = 256

In [None]:
train_ds = tf.keras.utils.image_dataset_from_directory(
    path,
    validation_split=0.2, # 20% of the data for validation
    subset="training",    # 80% of the data for training
    seed=123,
    image_size=(img_height, img_width),
    batch_size=batch_size,
    label_mode='categorical',    
)
# Load validation/testing dataset (remaining 30% of the data)
val_ds = tf.keras.utils.image_dataset_from_directory(
    path,
    validation_split=0.2, # 20% for validation/testing
    subset="validation",
    seed=123,
    image_size=(img_height, img_width),
    batch_size=batch_size,
    label_mode='categorical',
)

In [None]:
class_names = train_ds.class_names
print(class_names)

In [None]:
AUTOTUNE = tf.data.AUTOTUNE

# train_ds = augmented_train_ds.cache().shuffle(1000).prefetch(buffer_size=AUTOTUNE)
# val_ds = augmented_val_ds.cache().prefetch(buffer_size=AUTOTUNE)
# test_ds = augmented_test_ds.cache().prefetch(buffer_size=AUTOTUNE)

train_ds = train_ds.cache().shuffle(1000).prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)
# test_ds= test_ds.cache().prefetch(buffer_size=AUTOTUNE)
# test_ds = test_ds.cache().prefetch(buffer_size=AUTOTUNE)

In [None]:
def load_signal_and_scalogram(signal_path, scalogram_path, label):
    signal_files = {os.path.splitext(f)[0]: f for f in os.listdir(signal_path) if f.endswith('.npy')}
    scalogram_files = {os.path.splitext(f)[0]: f for f in os.listdir(scalogram_path) if f.endswith('.png')}
    
    matched_data = []
    
    for name in signal_files.keys():
        if name in scalogram_files:
            # Load signal
            signal = np.load(os.path.join(signal_path, signal_files[name]))
            signal = np.nan_to_num(signal, nan=0.0)

            # Load scalogram
            scalogram = tf.keras.utils.load_img(
                os.path.join(scalogram_path, scalogram_files[name]),
                target_size=(256, 256)  # Resize to desired dimensions
            )
            scalogram = tf.keras.utils.img_to_array(scalogram) / 255.0  # Normalize to [0, 1]
            
            # Append (signal, scalogram, label)
            matched_data.append((signal, scalogram, label))
        else:
            print(f"Scalogram not found for signal: {name}.npy")
    
    return matched_data

In [None]:
signal_path = r'D:\SDRChallenge-main\SDR_data\I_Q_data.Numpy\SNR_10'
scalogram_path = r'D:\SDRChallenge-main\SDR_data\SNR-10'

radar_data = load_signal_and_scalogram(signal_path, scalogram_path, label=0)

In [None]:
class_labels = {
    'Radar': 0,
    '5G_Only': 1,
    '5G+Radar': 2,
    'Noise': 3
}

In [None]:
def create_paired_dataset_for_class(signal_base_path, scalogram_base_path, class_name, label):
    signal_path = os.path.join(signal_base_path, class_name)
    scalogram_path = os.path.join(scalogram_base_path, class_name)
    
    # Get filenames without extensions
    signal_files = {os.path.splitext(f)[0]: f for f in os.listdir(signal_path) if f.endswith('.npy')}
    scalogram_files = {os.path.splitext(f)[0]: f for f in os.listdir(scalogram_path) if f.endswith('.png')}
    
    paired_data = []
    
    for name in signal_files.keys():
        if name in scalogram_files:
            # Load signal
            signal = np.load(os.path.join(signal_path, signal_files[name]))
            signal = np.nan_to_num(signal, nan=0.0)

            # Load scalogram
            scalogram = tf.keras.utils.load_img(
                os.path.join(scalogram_path, scalogram_files[name]),
                target_size=(256, 256)
            )
            scalogram = tf.keras.utils.img_to_array(scalogram) / 255.0  # Normalize
            
            # Append (signal, scalogram, label)
            paired_data.append((signal, scalogram, label))
        else:
            print(f"Unmatched file in class '{class_name}': {name}")
    
    return paired_data

In [None]:
def create_full_dataset(signal_base_path, scalogram_base_path, class_labels):
    full_data = []
    for class_name, label in class_labels.items():
        class_data = create_paired_dataset_for_class(signal_base_path, scalogram_base_path, class_name, label)
        full_data.extend(class_data)
        print(f"Class '{class_name}' has {len(class_data)} paired samples.")
    return full_data

# Paths to the signal and scalogram directories
signal_base_path = r'C:\Users\KhanShafiUllah\OneDrive - UT Arlington\SNR_10'
scalogram_base_path = r'C:\Users\KhanShafiUllah\OneDrive - UT Arlington\SNR-10'

# Create dataset
all_data = create_full_dataset(signal_base_path, scalogram_base_path, class_labels)

print(f"Total paired samples: {len(all_data)}")

In [None]:
# Separate signals, scalograms, and labels
signals = np.array([item[0] for item in all_data])
scalograms = np.array([item[1] for item in all_data])
labels = np.array([item[2] for item in all_data])

In [None]:
# One-hot encode labels
labels = tf.keras.utils.to_categorical(labels, num_classes=len(class_labels))

In [None]:
# Split into training and validation sets
X_train_signal, X_val_signal, X_train_scalogram, X_val_scalogram, y_train, y_val = train_test_split(
    signals, scalograms, labels, test_size=0.2, random_state=42
)

print(f"Training samples: {X_train_signal.shape[0]}")
print(f"Validation samples: {X_val_signal.shape[0]}")

In [None]:
scaler = StandardScaler()
X_train_signal = scaler.fit_transform(X_train_signal)
X_train_signal = scaler.transform(X_val_signal)

# Normalize scalograms (if not already normalized)
X_train_scalogram = X_train_scalogram / 255.0
X_val_scalogram = X_val_scalogram / 255.0

In [None]:
history = unified_model.fit(
    {'signal_input': X_train_signal, 'scalogram_input': X_train_scalogram},
    y_train,
    validation_data=(
        {'signal_input': X_train_signal, 'scalogram_input': X_val_scalogram},
        y_val
    ),
    epochs=10,
    batch_size=16
)