# Radar Emitter Recognition Based on the Short Time Fourier Transform and Convolutional Neural Networks


### 1 Setup

#### 1.1 Imports

In [None]:
import torch
import torch.nn as nn
from torchinfo import summary
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
import h5py
import numpy as np
from sklearn.preprocessing import LabelEncoder
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from tqdm import tqdm
from sklearn.metrics import (
    confusion_matrix,
    ConfusionMatrixDisplay,
    classification_report,
)
import cv2
import seaborn as sns
import os

#### 1.2 Device Selection

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

#### 1.3 Data Input

In [None]:
def load_spectrogram_h5s(root_folder, mod_types):
    """
    Loads all .h5 spectrogram files from the specified modulation types
    and returns a dict structured as:
      {
          "lfm_up": {
              0: <np.ndarray>,
              1: <np.ndarray>,
              ...
          },
          ...
      }

    Parameters:
    - root_folder (str): Path to the top-level "spectrograms" directory.
    - mod_types (list): List of modulation families to include, e.g. ['FM', 'PM']

    Returns:
    - Dictionary containing all spectrograms, indexed by modulation name and integer index.
    """
    spectrogram_dict = {}

    for mod_type in mod_types:
        mod_path = os.path.join(root_folder, mod_type)
        if not os.path.exists(mod_path):
            print(f"⚠️ Warning: {mod_path} does not exist. Skipping.")
            continue

        print(f"📂 Loading from {mod_type}...")
        files = [f for f in os.listdir(mod_path) if f.endswith(".h5")]

        for file in tqdm(files, desc=f"   {mod_type}", unit="file"):
            mod_name = file[:-3]  # Strip '.h5'
            file_path = os.path.join(mod_path, file)
            spectrogram_dict[mod_name] = {}

            try:
                with h5py.File(file_path, "r") as h5f:
                    for key in h5f.keys():
                        idx = int(key)  # Convert string index to int
                        spectrogram_dict[mod_name][idx] = np.array(h5f[key])
            except Exception as e:
                print(f"❌ Failed to load {file_path}: {e}")

    return spectrogram_dict


In [None]:
# Ensure the main directory exists
data_path = "C:/Users/scocks/Documents/hehehehehehhe/images/"
os.makedirs(data_path, exist_ok=True)

img_res = 224
img_count = 1000
snr = 0


folder_name = f"General_Images_res_{img_res}_sz_{img_count}_SNR_{snr}"
folder_path = data_path+folder_name

modulation_types = [
    "FM",
    "PM",
    "HYBRID",
]

data = load_spectrogram_h5s(folder_path, modulation_types)

##### 1.3.6 Conversion

In [None]:
def convert_spectrogram_dict_to_xy(data_dict):
    """
    Converts a dictionary of spectrograms into (X, y) format for ML.

    Parameters:
    - data_dict: Output from load_spectrogram_h5s(), e.g.
        {
            "lfm_up": {0: np.array, 1: np.array, ...},
            "bpsk":   {0: np.array, ...},
            ...
        }

    Returns:
    - X: np.ndarray of shape (N, H, W, C)
    - y: np.ndarray of shape (N,) with string labels like 'lfm_up'
    """
    X_list = []
    y_list = []

    for label, spectros in data_dict.items():
        for idx in sorted(spectros.keys()):
            X_list.append(spectros[idx])
            y_list.append(label)

    X = np.array(X_list)
    y = np.array(y_list)

    return X, y


In [None]:
X, y = convert_spectrogram_dict_to_xy(data)

In [None]:
print(X.shape)
print(y.shape)

#### 1.4 Label Encoder

In [None]:
label_encoder = LabelEncoder()

#### 1.5 Data Loader

In [None]:
def prepare_dataloader(X, y, batch_size=32, shuffle=False, num_workers=2, device="cpu"):
    # Convert NumPy arrays to PyTorch tensors
    if isinstance(X, np.ndarray):
        X = torch.tensor(X, dtype=torch.float32)
    elif not isinstance(X, torch.Tensor):
        raise TypeError("Input X must be a NumPy array or PyTorch tensor")

    if isinstance(y, np.ndarray):
        y = torch.tensor(y, dtype=torch.long)
    elif not isinstance(y, torch.Tensor):
        raise TypeError("Labels y must be a NumPy array or PyTorch tensor")

    # Ensure X has four dimensions (N, C, H, W)
    if X.ndim == 3:  # If (N, H, W), add a channel dimension
        X = X.unsqueeze(1)  # (N, 1, H, W)
    elif X.ndim == 4 and X.shape[-1] in [1, 3]:  # (N, H, W, C) case
        X = X.permute(0, 3, 1, 2)  # Convert to (N, C, H, W)

    # Move data to the correct device
    X, y = X.to(device), y.to(device)

    # Create dataset and dataloader
    dataset = TensorDataset(X, y)
    loader = DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=shuffle,
        num_workers=num_workers,
        pin_memory=(device == "cuda"),
    )

    return loader

### 2 Pre-processing

In [None]:
def pre_processing(images):
    iterations = 6
    processed_images = []
    target_size = (40, 40)  # Define the target size
    
    for image in images:
        # Resize the image to 40x40
        resized_image = cv2.resize(image, target_size, interpolation=cv2.INTER_AREA)
        
        # Convert to float32 for proper numerical stability
        processed_image = resized_image.astype(np.float32)
    
        for _ in range(iterations):
            mean = np.mean(processed_image)
            std = np.var(processed_image)
            
            processed_image = (processed_image - mean) / std
        
        processed_image[processed_image < 0] = 0
        
        processed_images.append(processed_image)

    # Return as a numpy array
    return np.array(processed_images, dtype=np.float32)


In [None]:
y_encoded = label_encoder.fit_transform(y)

In [None]:
X_pre_processed = pre_processing(X)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    X_pre_processed, y_encoded, test_size=0.2, stratify=y_encoded, random_state=42
)

In [None]:
unique_train, counts_train = np.unique(y_train, return_counts=True)
unique_test, counts_test = np.unique(y_test, return_counts=True)

print("Class Distribution in Training Set:")
for label, count in zip(unique_train, counts_train):
    print(f"Class {label}: {count} samples")

print("\nClass Distribution in Test Set:")
for label, count in zip(unique_test, counts_test):
    print(f"Class {label}: {count} samples")

#### 2.1 Pre-processing Display

In [None]:
# selected_display = np.random.randint(len(X_train))
selected_display = 0

In [None]:
print("Train Images shape:", X_pre_processed.shape)
print("Train Metadata shape:", y_encoded.shape)
print(f"{y[selected_display]} = {y_encoded[selected_display]}")

In [None]:
plt.imshow(X[selected_display])
plt.show()

In [None]:
plt.imshow(X_pre_processed[selected_display], cmap="grey")
plt.show()
print(X_pre_processed[selected_display].shape)


### 3 Training

#### 3.1 Training Setup

In [None]:
def train_model(
    model,
    train_loader,
    device,
    criterion,
    optimizer,
    scheduler=None,  # 🔧 Optional scheduler added
    epochs=10,
    patience=3,
    min_delta=0.0,
):

    model.to(device)
    model.train()

    loss_history = []
    best_loss = float("inf")
    patience_counter = 0

    for epoch in range(epochs):
        total_loss = 0.0

        # Progress bar for each epoch
        progress_bar = tqdm(
            train_loader,
            desc=f"Epoch {epoch+1}/{epochs}",
            leave=True,
            dynamic_ncols=True,
        )

        for inputs, labels in progress_bar:
            inputs = inputs.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()

            # Forward pass: Ignore output_image, focus only on output_class
            output_class = model(inputs)

            # Classification loss
            loss = criterion(output_class, labels)

            # Backpropagation
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

            # Live loss display
            progress_bar.set_postfix({"Loss": f"{loss.item():.4f}"})

        # Average loss for the epoch
        avg_loss = total_loss / len(train_loader)
        loss_history.append(avg_loss)

        print(f"Epoch {epoch+1} average loss: {avg_loss:.4f}")

        # 🔄 Scheduler step
        if scheduler:
            scheduler.step()

        # Early stopping
        if avg_loss < best_loss - min_delta:
            best_loss = avg_loss
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"Early stopping triggered at epoch {epoch+1}")
                break  # Stop training early

    return loss_history


##### Loss Curve

In [None]:
def plot_loss_curve(loss_history, title="Training Loss Over Epochs"):
    epochs = len(loss_history)

    plt.figure(figsize=(8, 5))
    plt.plot(range(1, epochs + 1), loss_history, marker="o", label="Training Loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.title(title)
    plt.legend()
    plt.grid()
    plt.show()

##### Conf Matirx

In [None]:
def display_confusion_matrix(
    model, data_loader, device, class_names=None, title="Confusion Matrix"
):
    """
    Generate and display a normalized confusion matrix for a trained model.
    
    Parameters:
        model (torch.nn.Module): Trained PyTorch model.
        data_loader (torch.utils.data.DataLoader): DataLoader for evaluation dataset.
        device (torch.device): Device to run evaluation on (CPU/GPU).
        class_names (list, optional): List of class names. If None, uses numeric indices.
        title (str): Title of the confusion matrix plot.
    """
    # Switch model to evaluation mode
    model.to(device)
    model.eval()

    all_preds = []
    all_labels = []

    # Disable gradient calculations for inference
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            # Forward pass: Focus only on output_class
            output_class = model(inputs)

            # Get predicted class labels
            preds = torch.argmax(output_class, dim=1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Compute confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    num_classes = cm.shape[0]
    
    # Normalize confusion matrix to percentages
    cm_normalized = cm.astype(np.float32) / cm.sum(axis=1, keepdims=True) * 100

    # If class_names isn't provided, use numeric class indices
    if class_names is None:
        class_names = [str(i) for i in range(num_classes)]

    # Plotting the confusion matrix
    plt.figure(figsize=(max(10, num_classes * 0.8), max(8, num_classes * 0.6)))  # Dynamic size
    im = plt.imshow(cm_normalized, interpolation="nearest", cmap="Blues")
    plt.title(title, fontsize=14)
    plt.colorbar(im, label="Percentage")  # Add colorbar with label

    # Create tick marks for class labels
    tick_marks = np.arange(num_classes)
    plt.xticks(tick_marks, class_names, rotation=45, ha="right", va="top", fontsize=max(8, 12 - num_classes // 5))
    plt.yticks(tick_marks, class_names, fontsize=max(8, 12 - num_classes // 5))

    # Annotate the matrix cells with percentage values
    thresh = cm_normalized.max() / 2.0
    for i in range(num_classes):
        for j in range(num_classes):
            plt.text(
                j,
                i,
                f"{cm_normalized[i, j]:.1f}",
                ha="center",
                va="center",
                color="white" if cm_normalized[i, j] > thresh else "black",
                fontsize=max(8, 12 - num_classes // 5),
            )

    plt.ylabel("True Label", fontsize=12, labelpad=10)
    plt.xlabel("Predicted Label", fontsize=12, labelpad=10)
    
    # Adjust layout with extra bottom margin for rotated labels
    plt.tight_layout()
    plt.subplots_adjust(bottom=0.2 + num_classes * 0.005)  # Dynamic bottom margin
    
    plt.show()

#### 3.2 Model

##### Main Model

In [None]:
class MainModel(nn.Module):
    def __init__(self, num_classes):
        super(MainModel, self).__init__()

        # C1
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=6, kernel_size=(5, 5), stride=1)

        self.relu = nn.ReLU()
        
        # S1
        self.pool1 = nn.AvgPool2d(kernel_size=(2, 2), stride=2)

        # C2
        self.conv2 = nn.Conv2d(in_channels=6, out_channels=12, kernel_size=(5, 5), stride=1)
        
        # S2
        self.pool2 = nn.AvgPool2d(kernel_size=(2, 2), stride=2)

        # Flatten
        self.flat = nn.Flatten()

        # FC
        self.fc = nn.Linear(in_features=588, out_features=num_classes)

    def forward(self, x):

        x = self.conv1(x)
        x = self.relu(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.pool2(x)
        x = self.flat(x)
        x = self.fc(x)

        return x

##### Model Summary

In [None]:
model = MainModel(6)
summary(model , input_size=(1, 3, 40, 40))

#### 3.3 Actual Training

In [None]:
# Prepare DataLoaders
train_loader = prepare_dataloader(
    X_train,
    y_train,
    batch_size=32,
    shuffle=True,
)

In [None]:
num_classes = len(np.unique(y_encoded))

model = MainModel(num_classes=num_classes).to(device)


# Define loss function and optimizer
criterion = nn.CrossEntropyLoss().to(device)

In [None]:
learning_rate = 1e-3
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=45, gamma=0.1)

In [None]:
# Train the model
epoch_count = 1
loss_history = train_model(
    model=model,
    train_loader=train_loader,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    device=device,
    epochs=epoch_count,
    patience=50,
)

In [None]:
plot_loss_curve(loss_history)

In [None]:
display_confusion_matrix(model, train_loader, device)

#### 3.4 Save Model to File

In [None]:
mds = modulation_types[0]
if len(modulation_types) == 3:
    mds = "ALL"

model_file_name = f"STFT_CNN_model_e{epoch_count}_lr{learning_rate}_snr_{snr}_mds_{mds}"

In [None]:
torch.save(model, model_file_name + ".pth")

### 4 Testing

In [None]:
def evaluate_model(model, test_loader, label_encoder, device):
    """
    Evaluates the trained model and displays accuracy, confusion matrix, and F1-score.

    Args:
        model: Trained PyTorch model.
        test_loader: DataLoader for test set.
        label_encoder: Label encoder to decode class names.
        device: 'cuda' or 'cpu' where evaluation happens.
    """
    model.to(device)  # Ensure model is on correct device
    model.eval()  # Set to evaluation mode

    y_true = []
    y_pred = []
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            # Model returns logits
            output_class = model(inputs)

            # Get predicted class (argmax over logits)
            preds = torch.argmax(output_class, dim=1)

            correct += (preds == labels).sum().item()
            total += labels.size(0)

            y_true.extend(labels.cpu().tolist())  # Move to CPU for metrics
            y_pred.extend(preds.cpu().tolist())

    # Compute Accuracy
    accuracy = 100 * correct / total
    print(f"Accuracy: {accuracy:.2f}%")

    # Compute & Display Confusion Matrix
    class_names = label_encoder.classes_  # Decode label names
    cm = confusion_matrix(y_true, y_pred)
    # Normalize confusion matrix to percentages
    cm_normalized = cm.astype(np.float32) / cm.sum(axis=1, keepdims=True) * 100
    num_classes = len(class_names)

    # Plot confusion matrix
    fig, ax = plt.subplots(figsize=(max(10, num_classes * 0.8), max(8, num_classes * 0.6)))  # Dynamic size
    disp = ConfusionMatrixDisplay(confusion_matrix=cm_normalized, display_labels=class_names)
    disp.plot(cmap="Blues", values_format=".1f", ax=ax)  # Use 1 decimal place for percentages

    # Adjust x-axis label alignment and font sizes
    ax.set_xticklabels(class_names, rotation=45, ha="right", va="top", fontsize=max(8, 12 - num_classes // 5))
    ax.set_yticklabels(class_names, rotation=0, fontsize=max(8, 12 - num_classes // 5))
    ax.set_xlabel("Predicted Label", fontsize=12, labelpad=10)
    ax.set_ylabel("True Label", fontsize=12, labelpad=10)
    ax.set_title("Confusion Matrix (Percentage)", fontsize=14)

    # Adjust layout with extra bottom margin for rotated labels
    plt.tight_layout()
    plt.subplots_adjust(bottom=0.2 + num_classes * 0.005)  # Dynamic bottom margin

    plt.show()

    # Print Classification Report (Precision, Recall, F1-score)
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=class_names))

In [None]:
# Prepare DataLoaders
test_loader = prepare_dataloader(
    X_test,
    y_test,
    batch_size=32,
)

In [None]:
# Evaluate the model
evaluate_model(model, test_loader, label_encoder, device)