In [1]:
import datasets
import transformers
import torch
import torch.nn.functional as F
from datasets import load_dataset
from sklearn.model_selection import train_test_split
import torch.nn as nn

from collections import defaultdict
from collections.abc import (
    Callable,
    Iterable
)
import numpy as np

from util_110724 import (
    to_ensembled
)

In [2]:
config = {
    "seed": 0, 
    "device": "mps", 
    "features_dtype": torch.float32,
    "hidden_layer_dim" : 18, 
    "ensemble_shape": (3,5), 
    "n_patches": [2, 4, 8],  # 3 values for n_patches
    "hidden_layer_dim": [12, 15, 18, 21, 24], # 5 values for hidden layer dimensions ( they are all divisible by 3 = n_head
    "n_heads": 3
}

In [3]:
torch.manual_seed(config["seed"])

<torch._C.Generator at 0x169b221d0>

In [4]:
chess_features, chess_labels = torch.load('data/sample_dataset.pt')

  chess_features, chess_labels = torch.load('data/sample_dataset.pt')


In [5]:
features_train, features_valid, labels_train, labels_valid = train_test_split(
    chess_features, chess_labels, test_size=0.2, random_state=42
)

In [6]:
n, h, w, c = features_train.shape 
print(n,h,w,c)

5984 8 8 9


In [7]:
#patching the images
def patchify(images, n_patches):
    '''
    n is the number of images, 
    c is the number of channels, in our case it will be 9, 
    h is the height of the image and w is the width of the image, both be 8 in our case
    '''
    n, h, w, c = images.shape 

    assert h == w, "Patchify method is implemented for square images only"
    
    patches = torch.zeros(n, n_patches ** 2, h * w * c// n_patches ** 2)
    patch_size = h // n_patches

    for idx, image in enumerate(images):
        for i in range(n_patches):
            for j in range(n_patches):
                patch = image[i * patch_size: (i + 1) * patch_size, j * patch_size: (j + 1) * patch_size, :]
                patches[idx, i * n_patches + j] = patch.flatten()
    return patches

In [8]:
#getting positional embeddings for each token. Here we used sin cos function (work by Vaswani et).
def get_positional_embeddings(sequence_length, d):
    result = torch.ones(sequence_length, d)
    for i in range(sequence_length):
        for j in range(d):
            result[i][j] = np.sin(i / (10000 ** (j / d))) if j % 2 == 0 else np.cos(i / (10000 ** ((j - 1) / d)))
    return result

Idea for two move: encode can_move element of d vector to 0 (restricting movement to only piece selected)

class PreViT(nn.Module):
  "Here we have initialization of the model and patching"
  def __init__(self, chw=(9, 8, 8), n_patches=4, hidden_layer_dim = 18):
    # Super constructor
    super(PreViT, self).__init__()

    # Attributes
    self.chw = chw # (C, H, W)
    self.n_patches = n_patches
    self.hidden_layer_dim = hidden_layer_dim

    assert chw[1] % n_patches == 0, "Input shape not entirely divisible by number of patches"
    assert chw[2] % n_patches == 0, "Input shape not entirely divisible by number of patches"
    
    self.patch_size = (chw[1] / n_patches, chw[2] / n_patches)
    
    # mapping to a linear vector
    self.input_vector_dim = int(chw[0] * self.patch_size[0] * self.patch_size[1])
    self.linear_mapper = nn.Linear(self.input_vector_dim, self.hidden_layer_dim)
    
    # create a classification token
    self.class_token = nn.Parameter(torch.rand(1, self.hidden_layer_dim))
    
    
  def forward(self, images):
    patches = patchify(images, self.n_patches)
    tokens = self.linear_mapper(patches)
    
    tokens = torch.stack([torch.vstack((self.class_token, tokens[i])) for i in range(len(tokens))])
    
    return tokens

We have two hyperparameters for processing the data, "n_patches" and "hidden_layer_dimension", we will attempt to try different combinations of them using ensemble. 

In [10]:
class PreViT(nn.Module):
  "Here we have initialization of the model and patching"
  def __init__(self, chw=(9, 8, 8), config = None):
    # Super constructor
    super(PreViT, self).__init__()

    assert config is not None, "Config must provide ensemble shape, n_patches, and hidden_layer_dim"
    # Attributes
    self.chw = chw # (C, H, W)
    self.ensemble_shape = config["ensemble_shape"]
    self.n_patches_values = config["n_patches"]  # List of 3 values for n_patches
    self.hidden_layer_dims = config["hidden_layer_dim"]

    assert len(self.n_patches_values) == self.ensemble_shape[0], "n_patches must have 3 values"
    assert len(self.hidden_layer_dims) == self.ensemble_shape[1], "hidden_layer_dim must have 5 values"

    for n_patches in self.n_patches_values:
      assert chw[1] % n_patches == 0, f"Height {chw[1]} is not divisible by n_patches={n_patches}"
      assert chw[2] % n_patches == 0, f"Width {chw[2]} is not divisible by n_patches={n_patches}"
    
    self.patch_size = (chw[1] / n_patches, chw[2] / n_patches)

    # Patching and Linear Mapping (to a vector of hidden_dim) "Tokenize"
    self.linear_mappers = nn.ModuleList([
            nn.ModuleList([
                nn.Linear(
                    int(chw[0] * (chw[1] / n_patches) * (chw[2] / n_patches)), hidden_dim
                )
                for hidden_dim in self.hidden_layer_dims
            ])
            for n_patches in self.n_patches_values
        ])
    
    # Add the special token for the start of each block
    self.class_tokens = nn.ParameterList([
            nn.ParameterList([
                nn.Parameter(torch.rand(1, hidden_dim))
                for hidden_dim in self.hidden_layer_dims
            ])
            for _ in self.n_patches_values
        ])
    
    # Add Positional Embeddings 
    self.pos_embeddings = nn.ParameterList([
    nn.ParameterList([
        nn.Parameter(
            torch.tensor(get_positional_embeddings(n_patches ** 2 + 1, hidden_dim))
        )
        for hidden_dim in self.hidden_layer_dims
    ])
    for n_patches in self.n_patches_values
])
    # Make sure that the Positional Embeddings are not learnable. 
    for param_list in self.pos_embeddings:
        for param in param_list:
            param.requires_grad = False
    
  def forward(self, images, ensemble_idx):
        """
            images: Input images of shape (# of games, C, H, W)
            ensemble_idx: A tuple (n_patches_idx, hidden_layer_dim_idx) indicating which ensemble configuration to use
        """
        n_patches_idx, hidden_layer_dim_idx = ensemble_idx

        # Select the configuration
        n_patches = self.n_patches_values[n_patches_idx]
        linear_mapper = self.linear_mappers[n_patches_idx][hidden_layer_dim_idx]
        class_token = self.class_tokens[n_patches_idx][hidden_layer_dim_idx]
        pos_embedding = self.pos_embeddings[n_patches_idx][hidden_layer_dim_idx]

        # Patching the inputs
        patches = patchify(images, n_patches)  

        # Apply the linear mapper to the patches
        tokens = linear_mapper(patches)

        # Add the classification token
        tokens = torch.stack([torch.vstack((class_token, tokens[i])) for i in range(len(tokens))])

        # Add Positional Embeddings
        positional_embed = pos_embedding.repeat(n, 1, 1)
        out = tokens + positional_embed
        return out

In [11]:
model = PreViT(chw=(9, 8, 8), config=config)
ensemble_idx = (1, 2)  # Second n_patches (4), third hidden_layer_dim (18) (indexes hyperparameters)
tokens = model(features_train, ensemble_idx)
print(tokens.shape)

  torch.tensor(get_positional_embeddings(n_patches ** 2 + 1, hidden_dim))


torch.Size([5984, 17, 18])


In [12]:
len(config["ensemble_shape"])

2

In [14]:
#Layer Normalziation which allows different hyperparameters to be applied per ensemble dimension or configuration. 
class LayerNorm(nn.Module):
    def __init__(
        self,
        config: dict,
        normalized_shape: int | tuple[int],
        bias=True,
        elementwise_affine=True,
        epsilon=1e-5,
        normalized_offset=0
    ):
        super().__init__()

        if hasattr(normalized_shape, "__int__"):
            self.normalized_shape = (normalized_shape,)
        else:
            self.normalized_shape = normalized_shape

        self.ensemble_shape = config["ensemble_shape"]
        self.epsilon = epsilon
        self.normalized_offset = normalized_offset

        if elementwise_affine:
            self.scale = torch.nn.Parameter(torch.ones(
                self.ensemble_shape + self.normalized_shape + (1,) * normalized_offset,
                device=config["device"],
                dtype=config["features_dtype"]
            ))
            if bias:
                self.bias = torch.nn.Parameter(torch.zeros_like(self.scale))
            else:
                self.bias = None

        else:
            self.bias, self.scale = None, None


    def forward(self, features: dict) -> dict:

        ensemble_dim = len(self.ensemble_shape)
        features = to_ensembled(self.ensemble_shape, features)

        normalized_dim = len(self.normalized_shape)

        batch_dim = len(features.shape) - ensemble_dim - normalized_dim - self.normalized_offset
        normalized_range = tuple(range(
            ensemble_dim,
            ensemble_dim + batch_dim
        )) + tuple(range(
            -normalized_dim - self.normalized_offset,
            -self.normalized_offset
        ))

        features = features - features.mean(dim=normalized_range, keepdim=True)
        features = features / features.std(dim=normalized_range, keepdim=True)

        if self.scale is not None:
            scale = self.scale.unflatten(
                ensemble_dim,
                (1,) * batch_dim + self.normalized_shape[:1]
            )

            features = features * scale

            if self.bias is not None:
                bias = self.bias.unflatten(
                    ensemble_dim,
                    (1,) * batch_dim + self.normalized_shape[:1]
                )
                features = features + bias

        return features

In [15]:
# Here we apply multi-head self-attention to the treated tokens (input here has shape (N, 17, 18 = d))
class MyMSA(nn.Module):
    def __init__(self, config = None):
        super(MyMSA, self).__init__()

        assert config is not None, "Config dictionary must be provided"

        self.ensemble_shape = config["ensemble_shape"] 
        self.hidden_layer_dims = config["hidden_layer_dim"]
        self.n_heads = config["n_heads"]

        assert self.hidden_layer_dims % self.n_heads == 0, f"Can't divide dimension {self.hidden_layer_dims} into {self.n_heads} heads" #checker function make sure the n_head is good

        self.ensemble_shape = config["ensemble_shape"]

            
        #creating que, key, and value mappings.
        self.q_mappings = nn.ModuleList([
            nn.ModuleList([nn.Linear(int(self.hidden_layer_dims[i] / self.n_heads), int(self.hidden_layer_dims[i] / self.n_heads)) for _ in range(self.n_heads)])
            for i in range(self.ensemble_shape[1])  # Create a separate set of heads per ensemble
        ])

        self.k_mappings = nn.ModuleList([
            nn.ModuleList([nn.Linear(int(self.hidden_layer_dims[i] / self.n_heads), int(self.hidden_layer_dims[i] / self.n_heads)) for _ in range(self.n_heads)])
            for i in range(self.ensemble_shape[1])  # Create a separate set of heads per ensemble
        ])

        self.v_mappings = nn.ModuleList([
            nn.ModuleList([nn.Linear(int(self.hidden_layer_dims[i] / self.n_heads), int(self.hidden_layer_dims[i] / self.n_heads)) for _ in range(self.n_heads)])
            for i in range(self.ensemble_shape[1])  # Create a separate set of heads per ensemble
        ])

        self.softmax = nn.Softmax(dim=-1)

    def forward(self, sequences, ensemble_idx):
        # Sequences has shape (N, seq_length, token_dim)
        # Ensemble_idx has shape tuple (n_patches_idx, hidden_layer_dim_idx)
        # We go into shape    (N, seq_length, n_heads, token_dim / n_heads)
        # And come back to    (N, seq_length, item_dim)  (through concatenation)
        n_patches_idx, hidden_layer_dim_idx = ensemble_idx

        self.d_head = int(self.hidden_layer_dims[hidden_layer_dim_idx] / self.n_heads)

        result = []
        q_mapping_idx = self.q_mappings[hidden_layer_dim_idx]
        k_mapping_idx = self.k_mappings[hidden_layer_dim_idx]
        v_mapping_idx = self.v_mappings[hidden_layer_dim_idx]
        for sequence in sequences:
            seq_result = []
            for head in range(self.n_heads):
                q_mapping = q_mapping_idx[head]
                k_mapping = k_mapping_idx[head]
                v_mapping = v_mapping_idx[head]

                seq = sequence[:, head * self.d_head: (head + 1) * self.d_head]
                q, k, v = q_mapping(seq), k_mapping(seq), v_mapping(seq)

                attention = self.softmax(q @ k.T / (self.d_head ** 0.5))
                seq_result.append(attention @ v)
            result.append(torch.hstack(seq_result))
        return torch.cat([torch.unsqueeze(r, dim=0) for r in result])


Having several heads per layer is similar to having several kernels in convolution.

Having several heads per layer allows one model to try out several pathways at once.

In [None]:
torch.save(
    {
        "train_features": features_train,
        "train_labels": labels_train,
        "valid_features": features_valid,
        "valid_labels": labels_valid
    },
    "preprocessed_train_valid_data"
)