The workflow is adopted from the pytorch [tutorial](https://pytorch.org/tutorials/beginner/transformer_tutorial.html), which is downloaded as `transformer_tutorial.ipynb`. The code is modified to fit the task of this project.

## Define the model




Let's train a ``nn.TransformerEncoder`` model following the pytroch tutorial, but with the modifications:
1.  No embedding layer for transforming the input tokens to vectors in vocab space. 
    Our sequence is a spectrum, not text tokens.
1.  A positional encoding layer to account for the order of the channels remains. 
    (see the next paragraph for more details). 
1.  No attention masking and decoder layer because we are not dealing with causal language modeling. 
1.  Instead of using ``CrossEntropyLoss`` as the loss function, we use ``MSELoss`` because we are dealing with regression, not classification.
    
The ``nn.TransformerEncoder`` consists of multiple layers of
[nn.TransformerEncoderLayer](https://pytorch.org/docs/stable/generated/torch.nn.TransformerEncoderLayer.html).




In [36]:
import math
import os
from tempfile import TemporaryDirectory
from typing import Tuple

import torch
from torch import nn, Tensor
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.utils.data import dataset

class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Arguments:
            x: Tensor, shape ``[seq_len, batch_size, embedding_dim]``
            embedding_dim is 1 in our case
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

class TransformerModel(nn.Module):

    #def __init__(self, ntoken: int, d_model: int, nhead: int, d_hid: int,
    def __init__(self, d_model: int, nhead: int, d_hid: int,
                 nlayers: int, dropout: float = 0.5):
        super().__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, d_hid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        # no need to embed token because we are not dealing with words
        #self.embedding = nn.Embedding(ntoken, d_model)
        #self.d_model = d_model
        # no need to linearly transform the output because we aim for reconstructing the masked spectrum
        #self.linear = nn.Linear(d_model, ntoken)

        #self.init_weights()

   # these weights are not used in the current version
    #def init_weights(self) -> None:
    #    initrange = 0.1
    #    self.embedding.weight.data.uniform_(-initrange, initrange)
    #    self.linear.bias.data.zero_()
    #    self.linear.weight.data.uniform_(-initrange, initrange)

    #def forward(self, src: Tensor, src_mask: Tensor = None) -> Tensor:
    def forward(self, src: Tensor) -> Tensor:
        """
        Arguments:
            src: Tensor, shape ``[seq_len, batch_size]``

        Returns:
            output Tensor of shape ``[seq_len, batch_size, ntoken]``
            the ntoken is 1 because we are not dealing with words
        """
        #src = self.embedding(src) * math.sqrt(self.d_model)
        src = src * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        #if src_mask is None:
        #    """Generate a square causal mask for the sequence. The masked positions are filled with float('-inf').
        #    Unmasked positions are filled with float(0.0).
        #    """
        #    src_mask = nn.Transformer.generate_square_subsequent_mask(len(src)).to(device)
        #output = self.transformer_encoder(src, src_mask)
        #output = self.transformer_encoder(src)
        #output = self.linear(output)
        return src

# Read data

In [2]:
import os
import numpy as np
import pandas as pd
from torch.utils.data import Dataset

class CustomImageDataset(Dataset):
    # We don't need the labels and transform for now
    def __init__(self, annotations_file, input_dir, target_dir, mask_dir):
        """
        input_dir: directory with masked spe files
        target_dir: directory with original spe files
        mask_dir: directory with boolean mask files
        """
        self.spe_info = pd.read_csv(annotations_file)
        self.input_dir = input_dir
        self.target_dir = target_dir
        self.mask_dir = mask_dir
        
    def __len__(self):
        return len(self.spe_info)

    def __getitem__(self, idx):
        input_path = os.path.join(self.input_dir, self.spe_info.iloc[idx, 0])
        target_path = os.path.join(self.target_dir, self.spe_info.iloc[idx, 0])
        mask_path = os.path.join(self.mask_dir, self.spe_info.iloc[idx, 0])

        input_spe = np.loadtxt(input_path, delimiter=',', dtype=int)
        target_spe = np.loadtxt(target_path, delimiter=',', dtype=int)
        mask = np.loadtxt(mask_path, delimiter=',', dtype=int)

        output = {'input_spe': input_spe,
                  'target_spe': target_spe,
                  'mask': mask}
  
        return output

In [27]:
from torch import Generator
from torch.utils.data import random_split
from torch.utils.data import DataLoader

dataset = CustomImageDataset('data/info_20231121.csv', 'data/masked', 'data/spe', 'data/mask')
data_train, data_test = random_split(dataset, [0.8, 0.2], generator=Generator().manual_seed(24))

train_dataloader = DataLoader(data_train, batch_size=4, shuffle=True)
batch = next(iter(train_dataloader))
print(batch['input_spe'].size())
print(batch)


torch.Size([4, 2048])
{'input_spe': tensor([[99999999,        0, 99999999,  ...,        0,        0,        0],
        [       0,        0, 99999999,  ...,        0,        0,        0],
        [       0,        0,        0,  ...,        0, 99999999, 99999999],
        [       0, 99999999,        0,  ...,        0, 99999999,        0]]), 'target_spe': tensor([[0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0]]), 'mask': tensor([[1, 0, 1,  ..., 0, 0, 0],
        [0, 0, 1,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 1, 1],
        [0, 1, 0,  ..., 0, 1, 0]])}


0 in mask is not masked. 1 in mask is masked.

# Test run

## Initiate an instance




The model hyperparameters are defined below, which is identical to BERT-base. The ``vocab`` size is
equal to the length of the vocab object.




In [38]:
#ntokens = len(vocab)  # size of vocabulary
spe_len = 2048  # spectrum length
d_hid = 768  # dimension of the feedforward network model in ``nn.TransformerEncoder``
nlayers = 12  # number of ``nn.TransformerEncoderLayer`` in ``nn.TransformerEncoder``
nhead = 8  # number of heads in ``nn.MultiheadAttention``
dropout = 0.1  # dropout probability
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TransformerModel(spe_len, nhead, d_hid, nlayers, dropout).to(device)

AssertionError: embed_dim must be divisible by num_heads

In [29]:
print(model)

TransformerModel(
  (pos_encoder): PositionalEncoding(
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-11): 12 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=2048, out_features=2048, bias=True)
        )
        (linear1): Linear(in_features=2048, out_features=768, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=768, out_features=2048, bias=True)
        (norm1): LayerNorm((2048,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((2048,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
    )
  )
)


In [30]:
output = model(batch['input_spe'].to(device))
print(output.size())

torch.Size([4, 4, 2048])


In [37]:
d_model = 1
dropout = 0.1
pos_encoder = PositionalEncoding(d_model, dropout)
src = batch['input_spe'].view(2048, -1, 1)
# modify src to have the shape [seq_len, batch_size, embedding_dim]


src = src * math.sqrt(d_model)
print(src.size())
src = pos_encoder(src)
print(src.size())

torch.Size([2048, 4, 1])
torch.Size([2048, 4, 1])


# Short note

There are two errors that I have encountered. The first one is `embed_dim` (sequence length), which needs to be divisible by the number of heads. The planned length is 2048, which can not be divided by the BERT-base default heads, 12. It's not as critical as the second one...

The second one is the `d_model`, the number of expected features in the input. It is relevent to the `vocab_size` in BERT. Every tocken in BERT is represented by a vector of `d_model` features, so the expected input shape is [sequence length, vocab size] (regardless the batch size). The masking is applied on the sequence-wise, i.e. masking tokens. In my case, the input data is a 1D spectrum, which has the shape [spectrum length] only.  The spectrum consists 2048 integers recording the counts across the wavelength range. Since my goal is training a model that is able to reconstruct the masked parts of a spectrum, I should treat the spectrum as a sequence in BERT perspective. However, this makes the input feature's shape become 1. This is, hence, rejected by the transformer architecture. The attention mechanism can’t calculate [1,1] vector.

And Hsuan-Tien recommend me to use the architecture of ViT and treat my spectrum as an 1D image. The pixel values is relevent to the counts of the spectrum. They face the same problem as I do, but they solve it by cutting the pixels into patches. I think I can try this approach.