In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import os
import numpy as np
import h5py
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import seaborn as sns

In [2]:
# Dataset Preparation
class H5Dataset(Dataset):
    def __init__(self, file_list, idx, transform=None):
        self.data = []
        self.labels = []
        for file in file_list:
            with h5py.File(file, 'r') as f:
                self.data.append(f['X'][:][idx])
                self.labels.append(f['Y'][:][idx])
        self.data = np.vstack(self.data)
        self.labels = np.vstack(self.labels)
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        x = self.data[idx]
        y = self.labels[idx]
        if self.transform:
            x = self.transform(x)
        return torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.float32)

In [3]:
# Transformer Components
class MultiHeadSelfAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super(MultiHeadSelfAttention, self).__init__()
        if embed_dim % num_heads != 0:
            raise ValueError("Embedding dimension must be divisible by number of heads")

        self.num_heads = num_heads
        self.projection_dim = embed_dim // num_heads

        self.query = nn.Linear(embed_dim, embed_dim)
        self.key = nn.Linear(embed_dim, embed_dim)
        self.value = nn.Linear(embed_dim, embed_dim)
        self.combine_heads = nn.Linear(embed_dim, embed_dim)

    def forward(self, x):
        batch_size, seq_len, embed_dim = x.size()

        query = self.query(x).view(batch_size, seq_len, self.num_heads, self.projection_dim).transpose(1, 2)
        key = self.key(x).view(batch_size, seq_len, self.num_heads, self.projection_dim).transpose(1, 2)
        value = self.value(x).view(batch_size, seq_len, self.num_heads, self.projection_dim).transpose(1, 2)

        scores = torch.matmul(query, key.transpose(-2, -1)) / np.sqrt(self.projection_dim)
        weights = torch.nn.functional.softmax(scores, dim=-1)
        attention = torch.matmul(weights, value)

        concat_attention = attention.transpose(1, 2).contiguous().view(batch_size, seq_len, embed_dim)
        output = self.combine_heads(concat_attention)
        return output

In [4]:
class TransformerBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, ff_dim, dropout_rate=0.2):
        super(TransformerBlock, self).__init__()
        self.attention = MultiHeadSelfAttention(embed_dim, num_heads)
        self.ffn = nn.Sequential(
            nn.Linear(embed_dim, ff_dim),
            nn.ReLU(),
            nn.Linear(ff_dim, embed_dim),
        )
        self.layernorm1 = nn.LayerNorm(embed_dim)
        self.layernorm2 = nn.LayerNorm(embed_dim)
        self.dropout1 = nn.Dropout(dropout_rate)
        self.dropout2 = nn.Dropout(dropout_rate)

    def forward(self, x):
        attn_output = self.attention(x)
        out1 = self.layernorm1(x + self.dropout1(attn_output))
        ffn_output = self.ffn(out1)
        return self.layernorm2(out1 + self.dropout2(ffn_output))

In [5]:
# Model Definition
class ProposedModel(nn.Module):
    def __init__(self, input_shape, num_classes, embed_dim=1024, num_heads=128, ff_dim=256):
        super(ProposedModel, self).__init__()
        self.reshape = nn.Linear(input_shape[-1], embed_dim)
        self.transformer = TransformerBlock(embed_dim, num_heads, ff_dim)
        self.pooling = nn.AdaptiveAvgPool1d(1)
        self.classifier = nn.Sequential(
            nn.BatchNorm1d(embed_dim),
            nn.AlphaDropout(0.3),
            nn.Linear(embed_dim, 128),
            nn.SELU(),
            nn.AlphaDropout(0.2),
            nn.Linear(128, 128),
            nn.SELU(),
            nn.AlphaDropout(0.2),
            nn.Linear(128, num_classes),
            nn.Softmax(dim=-1)
        )

    def forward(self, x):
        x = self.reshape(x)
        x = self.transformer(x)
        x = x.transpose(1, 2)  # For pooling
        x = self.pooling(x).squeeze(-1)
        return self.classifier(x)

In [6]:
from sklearn.metrics import ConfusionMatrixDisplay


def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs):
    train_losses, val_losses = [], []
    train_accuracies, val_accuracies = [], []

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * inputs.size(0)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += (predicted == labels.max(1)[1]).sum().item()

        train_acc = 100. * correct / total
        train_losses.append(train_loss / total)
        train_accuracies.append(train_acc)

        val_loss, val_acc = evaluate_model(model, val_loader, criterion)
        val_losses.append(val_loss)
        val_accuracies.append(val_acc)

        print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss / total:.4f}, Train Acc: {train_acc:.2f}%, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")

    # Plotting Loss and Accuracy
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(range(1, num_epochs + 1), train_losses, label='Train Loss')
    plt.plot(range(1, num_epochs + 1), val_losses, label='Val Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Loss Curve')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(range(1, num_epochs + 1), train_accuracies, label='Train Accuracy')
    plt.plot(range(1, num_epochs + 1), val_accuracies, label='Val Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.title('Accuracy Curve')
    plt.legend()

    plt.tight_layout()
    plt.show()

def evaluate_model(model, val_loader, criterion):
  model.eval()
  val_loss = 0
  correct = 0
  total = 0
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  with torch.no_grad():
      for inputs, labels in val_loader:
          inputs, labels = inputs.to(device), labels.to(device)
          outputs = model(inputs)
          loss = criterion(outputs, labels)
          val_loss += loss.item() * inputs.size(0)
          _, predicted = outputs.max(1)
          total += labels.size(0)
          correct += (predicted == labels.max(1)[1]).sum().item()

  val_loss /= total
  val_acc = 100. * correct / total
  return val_loss, val_acc

# Confusion Matrix Plot
def plot_confusion_matrix(y_true, y_pred, classes):
    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=classes)
    disp.plot(cmap=plt.cm.Blues)
    plt.show()

In [None]:
import shutil

##############################全局参数#######################################
f = h5py.File(r'dataset\GOLD_XYZ_OSC.0001_1024.hdf5','r')
dir_path = r'ExtractDataset'
if os.path.exists(dir_path):
    # Remove the directory
    shutil.rmtree(dir_path)
    print(f"Deleted: {dir_path}")
    # Recreate the directory
    os.mkdir(dir_path)
modu_snr_size = 1200
############################################################################

for modu in range(24):
	X_list = []
	Y_list = []
	Z_list = []
	print('part ',modu)
	start_modu = modu*106496
	for snr in range(26):
		start_snr = start_modu + snr*4096
		idx_list = np.random.choice(range(0,4096),size=modu_snr_size,replace=False)
		X = f['X'][start_snr:start_snr+4096][idx_list]
		#X = X[:,0:768,:]
		X_list.append(X)
		Y_list.append(f['Y'][start_snr:start_snr+4096][idx_list])
		Z_list.append(f['Z'][start_snr:start_snr+4096][idx_list])

	filename = dir_path + '/part' + str(modu) + '.h5'
	fw = h5py.File(filename,'w')
	fw['X'] = np.vstack(X_list)
	fw['Y'] = np.vstack(Y_list)
	fw['Z'] = np.vstack(Z_list)
	print('X shape:',fw['X'].shape)
	print('Y shape:',fw['Y'].shape)
	print('Z shape:',fw['Z'].shape)
	fw.close()
f.close()

Deleted: ExtractDataset
part  0
X shape: (31200, 1024, 2)
Y shape: (31200, 24)
Z shape: (31200, 1)
part  1
X shape: (31200, 1024, 2)
Y shape: (31200, 24)
Z shape: (31200, 1)
part  2
X shape: (31200, 1024, 2)
Y shape: (31200, 24)
Z shape: (31200, 1)
part  3
X shape: (31200, 1024, 2)
Y shape: (31200, 24)
Z shape: (31200, 1)
part  4
X shape: (31200, 1024, 2)
Y shape: (31200, 24)
Z shape: (31200, 1)
part  5
X shape: (31200, 1024, 2)
Y shape: (31200, 24)
Z shape: (31200, 1)
part  6
X shape: (31200, 1024, 2)
Y shape: (31200, 24)
Z shape: (31200, 1)
part  7
X shape: (31200, 1024, 2)
Y shape: (31200, 24)
Z shape: (31200, 1)
part  8
X shape: (31200, 1024, 2)
Y shape: (31200, 24)
Z shape: (31200, 1)
part  9
X shape: (31200, 1024, 2)
Y shape: (31200, 24)
Z shape: (31200, 1)
part  10
X shape: (31200, 1024, 2)
Y shape: (31200, 24)
Z shape: (31200, 1)
part  11
X shape: (31200, 1024, 2)
Y shape: (31200, 24)
Z shape: (31200, 1)
part  12
X shape: (31200, 1024, 2)
Y shape: (31200, 24)
Z shape: (31200, 1

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [8]:
# Main
if __name__ == "__main__":
# File paths and dataset split
    drivePath = 'drive/MyDrive/Colab Notebooks/dataset/'
    f = h5py.File(drivePath + r'part0.h5')
    sample_num = f['X'].shape[0]
    file_list = [drivePath + f"part{i}.h5" for i in range(1)]
    idx = np.random.choice(range(0,sample_num),size=60000)

    train_size = int(0.8 * len(idx))
    train_idx, test_idx = idx[:train_size], idx[train_size:]

    train_dataset = H5Dataset(file_list, train_idx)
    test_dataset = H5Dataset(file_list, test_idx)

    train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)


In [None]:
# Model, Loss, Optimizer
input_shape = train_dataset[0][0].shape
num_classes = 24
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ProposedModel(input_shape, num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train and Evaluate
train_model(model, train_loader, test_loader, criterion, optimizer, num_epochs=1)

# Confusion Matrix
y_true = []
y_pred = []
model.eval()
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs, labels
        outputs = model(inputs)
        _, predicted = outputs.max(1)
        y_true.extend(labels.max(1)[1].cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

plot_confusion_matrix(y_true, y_pred, classes=["32PSK", "16APSK", "32QAM", "FM", "GMSK", "32APSK", "OQPSK", "8ASK", "BPSK", "8PSK", "AM-SSB-SC", "4ASK", "16PSK", "64APSK", "128QAM", "128APSK", "AM-DSB-SC", "AM-SSB-WC", "64QAM", "QPSK", "256QAM", "AM-DSB-WC", "OOK", "16QAM"])