# Transformer on spectrograms


In [1]:
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
import torch.nn.functional as F
import torch.optim as optim
from sklearn.preprocessing import StandardScaler

In [2]:
# load the combined features from the pickle file
import pickle
dir = "C:/Users/gusta/OneDrive/Skrivebord/KI & Data/Bachelor/LegeData"
with open(f"{dir}/combined_stft_features_19.pkl", "rb") as f:
    combined_stft_features = pickle.load(f)
    

# # Assuming this corresponds to one label, e.g., `label`
labels = []
for _ in range(10):
    labels.append(np.repeat([1,0],149))

    
y = np.array([labels]).flatten()
print(y.shape, y[147:152])  # Should be (1, 2980)



X = combined_stft_features
train_sample = X[:,:325].flatten()
print(train_sample.shape)



# Prepare to create the DataFrame
rows = []

# Loop through the number of segments in X
num_segments = 2980
num_features = 6175  # 19 channels * 325 features per channel
# Step 1: Pre-allocate the array for the final data
# Each row will have num_features features
flattened_data = np.zeros((num_segments, num_features))

# Step 2: Extract and flatten segments
for i in range(num_segments):
    if i % 100 == 0: 
        print(f"progress: {i}/{num_segments}")
    # Extract the current block of 325 data points across 19 channels
    segment_features = combined_stft_features[:, i * 325:(i * 325 + 325)]
    
    # Flatten the block to 1D and place it in the corresponding row
    flattened_data[i, :] = segment_features.flatten()

# Step 3: Create the DataFrame using the flattened data
# Create the DataFrame directly from the pre-allocated NumPy array
data = pd.DataFrame(flattened_data, columns=[f'feature_{j+1}' for j in range(num_features)])

# Step 4: Add labels to the DataFrame
data['label'] = y

# Check the shape of the final DataFrame
print(data.shape)  # Should be (2980, 6175 + 1)

# Display the first few rows
data.head()

patient_ids = np.repeat([1,2,3,4,5,6,7,8,9,10],298)  # Make sure to have this aligned with your epochs/labels

# Normalize per patient (within training and test sets)
data = data.iloc[:, :-1].values  # Drop the label column
data_norm = []
for patient_id in np.unique(patient_ids):
    patient_data = data[patient_ids == patient_id]
    scaler = StandardScaler()
    patient_data_scaled = scaler.fit_transform(patient_data)
    data_norm.append(patient_data_scaled)

data_norm = np.concatenate(data_norm, axis=0)
# add labels back
data_norm = np.concatenate([data_norm, y.reshape(-1, 1)], axis=1)
data = data_norm
data.shape  # Should be (2980, 6175 + 1)

# save as pickle
with open(f"{dir}/data_norm_19_spectrograms.pkl", "wb") as f:
    pickle.dump(data, f)

# drop labels   
labels = data[:, -1]
data = data[:, :-1]

    

(2980,) [1 1 0 0 0]
(6175,)
progress: 0/2980
progress: 100/2980
progress: 200/2980
progress: 300/2980
progress: 400/2980
progress: 500/2980
progress: 600/2980
progress: 700/2980
progress: 800/2980
progress: 900/2980
progress: 1000/2980
progress: 1100/2980
progress: 1200/2980
progress: 1300/2980
progress: 1400/2980
progress: 1500/2980
progress: 1600/2980
progress: 1700/2980
progress: 1800/2980
progress: 1900/2980
progress: 2000/2980
progress: 2100/2980
progress: 2200/2980
progress: 2300/2980
progress: 2400/2980
progress: 2500/2980
progress: 2600/2980
progress: 2700/2980
progress: 2800/2980
progress: 2900/2980
(2980, 6176)


In [3]:
class EpochTransformer(nn.Module):
    def __init__(self, n_positions, n_embedding, n_datapoints, num_heads, num_layers):
        super(EpochTransformer, self).__init__()
        self.L = 10000
        self.n_positions = n_positions
        self.n_embedding = n_embedding
        self.n_datapoints = n_datapoints
        encoder_layers = nn.TransformerEncoderLayer(d_model=self.n_embedding, nhead=num_heads)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers=num_layers)

    def positional_encoding(self, n_positions, n_datapoints, L=10000):
        """
        Generates positional encodings directly to match the size of the data matrix.

        Args:
            n_positions (int): Number of positions (sequence length).
            n_datapoints (int): Number of datapoints (embedding size per epoch).
            L (int): Maximum sequence length (default 10000).

        Returns:
            torch.Tensor: Positional encoding matrix of shape (n_positions, n_datapoints).
        """
        n = torch.arange(n_positions)[:, None]  # (n_positions, 1)
        i = torch.arange(n_datapoints)[None, :]  # (1, n_datapoints)

        # Compute positional encodings using the vectorized formula
        angle_rates = 1 / (L ** (i / n_datapoints))
        angle_rads = n * angle_rates

        # Apply sin to even indices and cos to odd indices
        encoding = torch.zeros_like(angle_rads)
        encoding[:, 0::2] = torch.sin(angle_rads[:, 0::2])
        encoding[:, 1::2] = torch.cos(angle_rads[:, 1::2])

        return encoding

    def add_positional_encoding(self, data_matrix, L=10000):
        """
        Adds positional encoding to the data matrix directly, matching the size of the matrix.

        Args:
            data_matrix (torch.Tensor): The data matrix of shape (batch_size, seq_len, n_datapoints).
            L (int): Maximum sequence length (default 10000).

        Returns:
            torch.Tensor: Data matrix with added positional encoding.
        """
        # Extract the sequence length (seq_len) and embedding size (n_datapoints)
        batch_size, n_positions, n_datapoints = data_matrix.shape  # Handle batch_size

        # Generate positional encoding for the sequence length and embedding size
        pos_enc = self.positional_encoding(n_positions, n_datapoints, L).to(data_matrix.device)
        
        # Add positional encoding to each sequence in the batch
        pos_enc = pos_enc.unsqueeze(0).expand(batch_size, -1, -1)  # Expand to match batch size
        
        # Add the positional encoding to the data matrix
        data_matrix_with_pos_enc = data_matrix + pos_enc
        
        return data_matrix_with_pos_enc


    def forward(self, x):
        x = self.add_positional_encoding(x).float() # convert to float
        output = self.transformer_encoder(x)
        return output  # Compress into a feature vector


In [4]:
class SequenceTransformer(nn.Module):
    def __init__(self, seq_len, embedding_dim, num_heads, num_layers):
        super(SequenceTransformer, self).__init__()
        self.seq_len = seq_len
        self.embedding_dim = embedding_dim
        encoder_layers = nn.TransformerEncoderLayer(d_model=embedding_dim, nhead=num_heads)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers=num_layers)
    
    def add_sequence_positional_encoding(self, epoch_matrix, seq_len, embedding_dim):
        """
        Adds positional encoding to the sequence of epochs.
        Args:
            epoch_matrix (torch.Tensor): The sequence of epoch feature vectors (batch_size, seq_len, embedding_dim)
            seq_len (int): Length of the sequence (number of epochs).
            embedding_dim (int): Embedding dimension per epoch.
        Returns:
            torch.Tensor: Sequence with added positional encoding.
        """
        n = torch.arange(seq_len)[:, None]
        i = torch.arange(embedding_dim)[None, :]
        angle_rates = 1 / (10000 ** (i / embedding_dim))
        pos_enc = torch.zeros((seq_len, embedding_dim))
        pos_enc[:, 0::2] = torch.sin(n * angle_rates[:, 0::2])
        pos_enc[:, 1::2] = torch.cos(n * angle_rates[:, 1::2])
        
        return epoch_matrix + pos_enc


    def forward(self, epoch_output_seq):
        # Add positional encoding to the sequence of epoch outputs
        encoded_seq = self.add_sequence_positional_encoding(epoch_output_seq, self.seq_len, self.embedding_dim).float()
        # Pass through the transformer encoder
        output_seq = self.transformer_encoder(encoded_seq)
        return output_seq


In [5]:
class FinalModel(nn.Module):
    def __init__(self, n_positions, n_embedding, seq_len, num_heads, num_layers):
        super(FinalModel, self).__init__()
        # Initialize Epoch Transformer and Sequence Transformer
        self.epoch_transformer = EpochTransformer(n_positions, n_embedding, n_embedding, num_heads, num_layers)
        self.sequence_transformer = SequenceTransformer(seq_len, n_embedding, num_heads, num_layers)
        # Fully connected layers for final classification
        self.fc1 = nn.Linear(n_embedding, 128)
        self.fc2 = nn.Linear(128, 2)  # Binary classification

    def forward(self, x, target_idx):
        """
        Forward pass for the model.
        Args:
            x: Input of shape (batch_size, seq_len, n_datapoints)
            target_idx: The index of the target epoch to classify.
        """
        # Process the batch of epochs through the Epoch Transformer
        batch_size, seq_len, n_datapoints = x.shape
        
        # Pass all epochs in the sequence through the Epoch Transformer
        epoch_outputs = self.epoch_transformer(x)  # Shape: [batch_size, seq_len, n_embedding]
        
        # Process the entire sequence of epochs through the Sequence Transformer
        sequence_output = self.sequence_transformer(epoch_outputs)  # Shape: [batch_size, seq_len, n_embedding]
        
        # Extract the specific epoch representation (e.g., epoch 6) from the sequence
        target_epoch_output = sequence_output[:, target_idx]  # Shape: [batch_size, n_embedding]
        
        # Pass the target epoch's output through fully connected layers for classification
        out = F.relu(self.fc1(target_epoch_output))
        out = self.fc2(out)
        
        return F.softmax(out, dim=-1)



In [6]:
model = FinalModel(n_positions=2980, n_embedding=6175, seq_len=11, num_heads=5, num_layers=3)
optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.BCELoss()

print(model)



FinalModel(
  (epoch_transformer): EpochTransformer(
    (transformer_encoder): TransformerEncoder(
      (layers): ModuleList(
        (0-2): 3 x TransformerEncoderLayer(
          (self_attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=6175, out_features=6175, bias=True)
          )
          (linear1): Linear(in_features=6175, out_features=2048, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
          (linear2): Linear(in_features=2048, out_features=6175, bias=True)
          (norm1): LayerNorm((6175,), eps=1e-05, elementwise_affine=True)
          (norm2): LayerNorm((6175,), eps=1e-05, elementwise_affine=True)
          (dropout1): Dropout(p=0.1, inplace=False)
          (dropout2): Dropout(p=0.1, inplace=False)
        )
      )
    )
  )
  (sequence_transformer): SequenceTransformer(
    (transformer_encoder): TransformerEncoder(
      (layers): ModuleList(
        (0-2): 3 x TransformerEncoderLayer(
          (self_attn

In [7]:
# Create a batch of 10 sequences, each with 11 epochs and 6175 features
batch_size = 5
seq_len = 11  # We use 11 epochs in total (5 before, 1 target, 5 after)
n_datapoints = 6175

# Initialize the model
model = FinalModel(n_positions=2980, n_embedding=6175, seq_len=seq_len, num_heads=5, num_layers=3)

# Random input tensor of shape [batch_size, seq_len, n_datapoints]
x = torch.randn(batch_size, seq_len, n_datapoints)

# We want to classify epoch 6 
target_idx = 5

# Forward pass
output = model(x, target_idx)
print(output.shape)  # Should be [batch_size, 2] for binary classification







torch.Size([5, 2])
