In [1]:
import torch
import torch.nn as nn
from torch.nn import functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import math
import time

In [2]:
class Pattention(nn.Module):
    """Pattention Layer.
    d1 = inputs dimension
    d2 = outpuuts dimension
    n = the number of parameters tokens representing the learnable keys and values
    """

    def __init__(
        self,
        d1,
        d2,
        n,
        param_key_init_method,
        param_value_init_method,
        norm_activation_type,
    ):
        super().__init__()

        self.param_token_num = n
        self.param_key_dim = d1
        self.param_value_dim = d2
        self.norm_activation_type = norm_activation_type
        
        self.key_param_tokens = nn.parameter.Parameter(data=torch.rand((n, d1))) # Kp shape of (n, d1) 
        self.value_param_tokens = nn.parameter.Parameter(data=torch.rand((n, d2))) # Vp shape of (n, d2)
        
        param_key_init_method(self.key_param_tokens)
        param_value_init_method(self.value_param_tokens)
    
    def nonlinear_norm_func(self, inputs, normalize_type, dim=-1):
        if normalize_type == 'softmax': 
            # NOTE: softmax = exp_l1_norm
            # outputs = F.softmax(inputs, dim=dim) * inputs.shape[dim]
            nonlinear_outputs = torch.exp(inputs)
            norm_outputs = nonlinear_outputs / torch.norm(nonlinear_outputs, p=1, dim=dim, keepdim=True) * inputs.shape[dim]
            outputs = norm_outputs
        elif normalize_type == 'gelu_l2_norm':
            nonlinear_outputs = F.gelu(inputs)
            norm_outputs = nonlinear_outputs / torch.norm(nonlinear_outputs, p=2, dim=dim, keepdim=True) * math.sqrt(nonlinear_outputs.shape[dim])
            outputs = norm_outputs
        elif normalize_type == 'l2_norm_gelu':
            norm_outputs = inputs / torch.norm(inputs, p=2, dim=dim, keepdim=True) * math.sqrt(inputs.shape[dim])
            nonlinear_outputs = F.gelu(norm_outputs)
            outputs = nonlinear_outputs
        return outputs

    def forward(self, inputs, dropout_p=0.0, attn_mask=None, scale=None):

        query = inputs
        key, value = self.key_param_tokens, self.value_param_tokens        
        L, S = query.size(-2), key.size(-2)
        scale_factor = 1 if scale is None else scale 
        # just for gelu nonlinear, set torch.zeros for softmax
        attn_bias = torch.ones(L, S, dtype=query.dtype, device=query.device)

        if attn_mask is not None:
            if attn_mask.dtype == torch.bool:
                # just for gelu nonlinear, set -inf for softmax
                attn_bias.masked_fill_(attn_mask.logical_not(), 0)
            else:
                raise NotImplementedError

        attn_weight = query @ key.transpose(-2, -1) * scale_factor
        # just for gelu nonlinear, set attn_weight += attn_bias for softmax
        attn_weight *= attn_bias
        # modified softmax
        attn_weight = self.nonlinear_norm_func(attn_weight, self.norm_activation_type, dim=-1)
        attn_weight = torch.dropout(attn_weight, dropout_p, train=True)
        output = attn_weight @ value

        return output

In [3]:
class SelfAttention(nn.Module):
    def __init__(self, hidden_size, num_attention_heads, attention_dropout=0.1):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_attention_heads = num_attention_heads
        self.head_dim = hidden_size // num_attention_heads
        #print(hidden_size)
        #print(num_attention_heads)

        assert hidden_size % num_attention_heads == 0, "hidden_size must be divisible by num_attention_heads"

        

        # Query, Key, and Value projections
        self.query = Pattention(
            d1=hidden_size,
            d2=hidden_size,
            #n=hidden_size // num_attention_heads,
            n=10,
            param_key_init_method=torch.nn.init.xavier_uniform_,
            param_value_init_method=torch.nn.init.xavier_uniform_,
            norm_activation_type="l2_norm_gelu"
        )
        self.key = Pattention(
            d1=hidden_size,
            d2=hidden_size,
            #n=hidden_size // num_attention_heads,
            n=10,
            param_key_init_method=torch.nn.init.xavier_uniform_,
            param_value_init_method=torch.nn.init.xavier_uniform_,
            norm_activation_type="l2_norm_gelu"
        )
        self.value = Pattention(
            d1=hidden_size,
            d2=hidden_size,
            #n=hidden_size // num_attention_heads,
            n=10,
            param_key_init_method=torch.nn.init.xavier_uniform_,
            param_value_init_method=torch.nn.init.xavier_uniform_,
            norm_activation_type="l2_norm_gelu"
        )
        self.out_proj = Pattention(
            d1=hidden_size,
            d2=hidden_size,
            #n=hidden_size,
            n=10,
            param_key_init_method=torch.nn.init.xavier_uniform_,
            param_value_init_method=torch.nn.init.xavier_uniform_,
            norm_activation_type="l2_norm_gelu"
        )

        self.attention_dropout = nn.Dropout(attention_dropout)
        self.norm_factor = math.sqrt(self.head_dim)

    def forward(self, hidden_states, attention_mask=None):

        # Proceed with attention mechanism
        batch_size, seq_len, _ = hidden_states.size()

        query_layer = self.query(hidden_states).view(
            batch_size, seq_len, self.num_attention_heads, self.head_dim
        )
        #print("qshape: ", query_layer.shape)
        key_layer = self.key(hidden_states).view(
            batch_size, seq_len, self.num_attention_heads, self.head_dim
        )
        #print("kshape: ", key_layer.shape)
        value_layer = self.value(hidden_states).view(
            batch_size, seq_len, self.num_attention_heads, self.head_dim
        )
        #print("vshape: ", value_layer.shape)
        query_layer = query_layer.transpose(1, 2)
        key_layer = key_layer.transpose(1, 2)
        value_layer = value_layer.transpose(1, 2)

        attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))
        attention_scores /= self.norm_factor

        if attention_mask is not None:
            attention_scores += attention_mask

        attention_probs = torch.softmax(attention_scores, dim=-1)
        attention_probs = self.attention_dropout(attention_probs)

        context_layer = torch.matmul(attention_probs, value_layer)
        context_layer = context_layer.transpose(1, 2).contiguous().view(
            batch_size, seq_len, self.hidden_size
        )

        output = self.out_proj(context_layer)
        return output

In [3]:
class SelfAttention(nn.Module):
    def __init__(self, hidden_size, num_attention_heads, attention_dropout=0.1):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_attention_heads = num_attention_heads
        self.head_dim = hidden_size // num_attention_heads

        assert hidden_size % num_attention_heads == 0, "hidden_size must be divisible by num_attention_heads"

        # Query, Key, and Value projections
        self.query = Pattention(
            d1=hidden_size,
            d2=hidden_size,
            n=10,
            param_key_init_method=torch.nn.init.xavier_uniform_,
            param_value_init_method=torch.nn.init.xavier_uniform_,
            norm_activation_type="l2_norm_gelu"
        )
        self.key = Pattention(
            d1=hidden_size,
            d2=hidden_size,
            n=10,
            param_key_init_method=torch.nn.init.xavier_uniform_,
            param_value_init_method=torch.nn.init.xavier_uniform_,
            norm_activation_type="l2_norm_gelu"
        )
        self.value = Pattention(
            d1=hidden_size,
            d2=hidden_size,
            n=10,
            param_key_init_method=torch.nn.init.xavier_uniform_,
            param_value_init_method=torch.nn.init.xavier_uniform_,
            norm_activation_type="l2_norm_gelu"
        )
        self.out_proj = Pattention(
            d1=hidden_size,
            d2=hidden_size,
            n=10,
            param_key_init_method=torch.nn.init.xavier_uniform_,
            param_value_init_method=torch.nn.init.xavier_uniform_,
            norm_activation_type="l2_norm_gelu"
        )

        self.attention_dropout = nn.Dropout(attention_dropout)
        self.norm_factor = math.sqrt(self.head_dim)

    def forward(self, hidden_states, attention_mask=None):
        batch_size, seq_len, _ = hidden_states.size()

        # Compute query, key, and value
        query_layer = self.query(hidden_states).view(
            batch_size, seq_len, self.num_attention_heads, self.head_dim
        ).transpose(1, 2)  # Shape: (batch_size, num_heads, seq_len, head_dim)
        
        key_layer = self.key(hidden_states).view(
            batch_size, seq_len, self.num_attention_heads, self.head_dim
        ).transpose(1, 2)  # Shape: (batch_size, num_heads, seq_len, head_dim)
        
        value_layer = self.value(hidden_states).view(
            batch_size, seq_len, self.num_attention_heads, self.head_dim
        ).transpose(1, 2)  # Shape: (batch_size, num_heads, seq_len, head_dim)

        # Compute attention scores
        attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))  # Shape: (batch_size, num_heads, seq_len, seq_len)
        attention_scores /= self.norm_factor

        if attention_mask is not None:
            attention_scores += attention_mask

        # Compute attention probabilities
        attention_probs = torch.softmax(attention_scores, dim=-1)
        attention_probs = self.attention_dropout(attention_probs)

        # Compute context layer
        context_layer = torch.matmul(attention_probs, value_layer)  # Shape: (batch_size, num_heads, seq_len, head_dim)
        context_layer = context_layer.transpose(1, 2).contiguous().view(
            batch_size, seq_len, self.hidden_size
        )  # Shape: (batch_size, seq_len, hidden_size)

        # Final projection
        output = self.out_proj(context_layer)
        return output

In [4]:
class TokenformerLayer(nn.Module):
    """A single Tokenformer layer implementing token-token and token-parameter interactions."""

    def __init__(
        self,
        hidden_size,
        vocab_size,
        num_attention_heads,
        max_seq_len,
        attention_dropout=0.1,
        hidden_dropout=0.1,
    ):
        """
        Args:
            hidden_size (int): The size of the hidden dimension.
            num_attention_heads (int): Number of attention heads for multi-head attention.
            num_param_tokens (int): Number of parameter tokens for the feed-forward Pattention layer.
            attention_dropout (float): Dropout probability for attention weights.
            hidden_dropout (float): Dropout probability for residual connections.
        """
        super().__init__()
        self.hidden_size = hidden_size
        self.num_attention_heads = num_attention_heads
        self.head_dim = hidden_size // num_attention_heads

        assert (
            hidden_size % num_attention_heads == 0
        ), "hidden_size must be divisible by num_attention_heads"

        # Layer normalizations
        self.input_layernorm = nn.LayerNorm(hidden_size)
        self.post_attention_layernorm = nn.LayerNorm(hidden_size)

        # Token and positional embeddings
        self.token_embedding = nn.Embedding(vocab_size, hidden_size)
        self.position_embedding = nn.Embedding(max_seq_len, hidden_size)

        # Self-attention using Pattention
        self.attention = self.attention = SelfAttention(
            #vocab_size=30522,  # Provide a valid vocab_size if needed
            hidden_size=hidden_size,
            num_attention_heads=num_attention_heads,
            #max_seq_len=max_seq_len,
            attention_dropout=attention_dropout,
        )

        # Feed-forward network using Pattention
        self.mlp = Pattention(
            d1=hidden_size,
            d2=hidden_size,
            n=10,
            param_key_init_method=torch.nn.init.xavier_uniform_,
            param_value_init_method=torch.nn.init.xavier_uniform_,
            norm_activation_type="l2_norm_gelu"
        )

        self.hidden_dropout = hidden_dropout
        self.dropout = nn.Dropout(hidden_dropout)

    def forward(self, x, attention_mask=None):
        """
        Forward pass for the Tokenformer layer.
        
        Args:
            x (torch.Tensor): Input tensor of shape [batch_size, seq_len, hidden_size].
            attention_mask (torch.Tensor, optional): Attention mask for self-attention.

        Returns:
            torch.Tensor: Output tensor of the same shape as the input.
        """
        #print(f"Input shape before LayerNorm: {x.shape}")
        # Residual connection and pre-normalization for attention
        # Word embedding
        x = self.token_embedding(x)  # [batch_size, seq_len, hidden_size]
        #print("token emb: ", x.shape)
        # Positional embedding
        seq_len = x.size(1)
        position_ids = torch.arange(seq_len, device=x.device).unsqueeze(0)  # [1, seq_len]
        #print("Position: ", position_ids.shape)
        x += self.position_embedding(position_ids)  # Add positional embeddings
        #print( "emb + pos", x.shape)

        residual = x
        normed_input = self.input_layernorm(x)
        #print("norme: ", normed_input.shape)

        # Self-attention
        attention_output = self.attention(normed_input, attention_mask)
        #print("attention: ", attention_output.shape)
        attention_output = self.dropout(attention_output) + residual

        # Residual connection and pre-normalization for feed-forward
        residual = attention_output
        normed_attention_output = self.post_attention_layernorm(attention_output)

        # Feed-forward network (Pattention)
        mlp_output = self.mlp(normed_attention_output)
        #print("feed: ",  mlp_output.shape)
        output = self.dropout(mlp_output) + residual

        return output

In [5]:
# Dataset class remains unchanged
class PokemonDataset(Dataset):
    def __init__(self, file_path, vocab_size, max_seq_len):
        with open(file_path, 'r') as f:
            self.text = f.read().lower().split()
        
        # Create vocabulary
        self.vocab = {word: idx for idx, word in enumerate(set(self.text))}
        self.vocab_size = len(self.vocab)
        self.max_seq_len = max_seq_len

        # Convert text to token indices
        self.tokens = [self.vocab[word] for word in self.text]
        self.data = [
            self.tokens[i : i + max_seq_len] for i in range(len(self.tokens) - max_seq_len)
        ]
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        x = self.data[idx][:-1]
        y = self.data[idx][1:]
        return torch.tensor(x), torch.tensor(y)

In [7]:
def train():
    # Configurations
    device = 'mps' if torch.backends.mps.is_available() else 'cpu'
    vocab_size=10000    # Valeur temporaire, sera mise à jour avec vocab_size réel
    hidden_dim=32
    num_heads=4
    #num_layers=2,
    #num_tokens=10,
    max_seq_len=50

    # Préparer les données
    dataset = PokemonDataset(file_path='training/pokemon.txt', vocab_size=vocab_size, max_seq_len=max_seq_len)
    vocab_size = dataset.vocab_size  # Mettre à jour vocab_size après avoir créé le vocabulaire
    dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

    # Instancier le modèle
    model = TokenformerLayer(hidden_dim, vocab_size, num_heads, max_seq_len)
    model = model.to(device)


    # Définir la fonction de perte et l'optimiseur
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    print("debut de l'entrainement")
    num_epochs=10
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for x_batch, y_batch in dataloader:
            device = next(model.parameters()).device

            # Déplacer les données sur le même appareil que le modèle (GPU ou CPU)
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)

            # Forward pass
            outputs = model(x_batch)  # Shape: (batch_size, seq_len, vocab_size)
            #print("outputs:", outputs.shape)

            # Aplatir les logits pour la CrossEntropyLoss
            outputs = outputs.view(-1, hidden_dim)  # Shape: (batch_size * seq_len, vocab_size)
            #print("outputs aplati:", outputs.shape)

            # Aplatir les labels
            y_batch = y_batch.view(-1)  # Shape: (batch_size * seq_len)

            # Calculer la perte
            loss = criterion(outputs, y_batch)

            # Backpropagation et optimisation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        # Afficher la perte moyenne par epoch
        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss / len(dataloader)}")

In [8]:
train()

debut de l'entrainement
Epoch 1/10, Loss: 0.0038630802127174193
Epoch 2/10, Loss: 0.0010241141640873182
Epoch 3/10, Loss: 0.0003865549773005633
Epoch 4/10, Loss: 0.00018665068661852992
Epoch 5/10, Loss: 0.00010370207787913885
Epoch 6/10, Loss: 7.745456663046468e-05
Epoch 7/10, Loss: 5.489678321330518e-05
Epoch 8/10, Loss: 4.0524181558033846e-05
Epoch 9/10, Loss: 3.2102053304061427e-05
Epoch 10/10, Loss: 2.751963793182544e-05


In [2]:
class Pattention(nn.Module):
    """Pattention Layer.
    d1 = inputs dimension
    d2 = outpuuts dimension
    n = the number of parameters tokens representing the learnable keys and values
    """

    def __init__(
        self,
        d1,
        d2,
        n,
        param_key_init_method,
        param_value_init_method,
        norm_activation_type,
    ):
        super().__init__()

        self.param_token_num = n
        self.param_key_dim = d1
        self.param_value_dim = d2
        self.norm_activation_type = norm_activation_type
        
        self.key_param_tokens = nn.parameter.Parameter(data=torch.rand((n, d1))) # Kp shape of (n, d1) 
        self.value_param_tokens = nn.parameter.Parameter(data=torch.rand((n, d2))) # Vp shape of (n, d2)
        
        param_key_init_method(self.key_param_tokens)
        param_value_init_method(self.value_param_tokens)

    def add_new_params(self, num_new_tokens, key_init=torch.nn.init.zeros_, value_init=torch.nn.init.zeros_):
        # Create new key and value tokens
        new_key_tokens = nn.Parameter(torch.zeros((num_new_tokens, self.param_key_dim)))
        new_value_tokens = nn.Parameter(torch.zeros((num_new_tokens, self.param_value_dim)))
        
        key_init(new_key_tokens)
        value_init(new_value_tokens)
        
        # Concatenate with existing tokens
        self.key_param_tokens = nn.Parameter(
            torch.cat([self.key_param_tokens, new_key_tokens], dim=0)
        )
        self.value_param_tokens = nn.Parameter(
            torch.cat([self.value_param_tokens, new_value_tokens], dim=0)
        )
    
    def nonlinear_norm_func(self, inputs, normalize_type, dim=-1):
        if normalize_type == 'softmax': 
            # NOTE: softmax = exp_l1_norm
            # outputs = F.softmax(inputs, dim=dim) * inputs.shape[dim]
            nonlinear_outputs = torch.exp(inputs)
            norm_outputs = nonlinear_outputs / torch.norm(nonlinear_outputs, p=1, dim=dim, keepdim=True) * inputs.shape[dim]
            outputs = norm_outputs
        elif normalize_type == 'gelu_l2_norm':
            nonlinear_outputs = F.gelu(inputs)
            norm_outputs = nonlinear_outputs / torch.norm(nonlinear_outputs, p=2, dim=dim, keepdim=True) * math.sqrt(nonlinear_outputs.shape[dim])
            outputs = norm_outputs
        elif normalize_type == 'l2_norm_gelu':
            norm_outputs = inputs / torch.norm(inputs, p=2, dim=dim, keepdim=True) * math.sqrt(inputs.shape[dim])
            nonlinear_outputs = F.gelu(norm_outputs)
            outputs = nonlinear_outputs
        return outputs

    def forward(self, inputs, dropout_p=0.0, attn_mask=None, scale=None):

        query = inputs
        key, value = self.key_param_tokens, self.value_param_tokens        
        L, S = query.size(-2), key.size(-2)
        scale_factor = 1 if scale is None else scale 
        # just for gelu nonlinear, set torch.zeros for softmax
        attn_bias = torch.ones(L, S, dtype=query.dtype, device=query.device)

        if attn_mask is not None:
            if attn_mask.dtype == torch.bool:
                # just for gelu nonlinear, set -inf for softmax
                attn_bias.masked_fill_(attn_mask.logical_not(), 0)
            else:
                raise NotImplementedError

        attn_weight = query @ key.transpose(-2, -1) * scale_factor
        # just for gelu nonlinear, set attn_weight += attn_bias for softmax
        attn_weight *= attn_bias
        # modified softmax
        attn_weight = self.nonlinear_norm_func(attn_weight, self.norm_activation_type, dim=-1)
        attn_weight = torch.dropout(attn_weight, dropout_p, train=True)
        output = attn_weight @ value

        return output

In [6]:
def train_with_scaling():
    # Initial Configurations
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    vocab_size = 1000  # Smaller vocab for testing
    hidden_dim = 32
    num_heads = 4
    max_seq_len = 50

    # Create the dataset and dataloader
    dataset = PokemonDataset(file_path='training/pokemon.txt', vocab_size=vocab_size, max_seq_len=max_seq_len)
    vocab_size = dataset.vocab_size
    dataloader = DataLoader(dataset, batch_size=16, shuffle=True)

    # Initialize the model
    model = TokenformerLayer(hidden_dim, vocab_size, num_heads, max_seq_len).to(device)

    # Define loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    num_epochs = 10
    scale_steps = [3, 6]  # Epochs to scale the model
    num_new_tokens = 5  # Add 5 new tokens each time

    print("Starting training...")
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0

        # Scale model at specified epochs
        if epoch in scale_steps:
            scale_model(model, num_new_tokens)

        for x_batch, y_batch in dataloader:
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)

            # Forward pass
            outputs = model(x_batch)

            # Reshape for loss computation
            outputs = outputs.view(-1, hidden_dim)
            y_batch = y_batch.view(-1)

            # Compute loss
            loss = criterion(outputs, y_batch)

            # Backpropagation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss / len(dataloader):.4f}")

In [7]:
train_with_scaling()

Starting training...


IndexError: Target 1811 is out of bounds.