<a href="https://colab.research.google.com/github/MindFigment/information-theory-project/blob/main/information_theory.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
import copy

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
class MLPBlock(nn.Module):
    def __init__(self, d_model, d_hidden=192, dropout=None, last_bias=False):
        """
        MLP blocks containing two linear layers with GELU non-linearity in between

        Args:
            d_model: latent vector size
            d_hidden: hidden vector size
            dropout: dropout rate, if set to zero then no dropout is applied
            last_bias: if we are not in the last encoder block, then set this to False
                                      because the output fc2 will be input layer norm in the next encoder
                                      block which already has bias
        """
        super(MLPBlock, self).__init__()
        self.fc1 = nn.Linear(d_model, d_hidden, bias=True)
        self.fc1_dropout = nn.Dropout(dropout) if dropout is not None else None
        self.fc2 = nn.Linear(d_hidden, d_model, bias=last_bias)

    def forward(self, x):
        x = F.gelu(self.fc1(x))
        if self.fc1_dropout is not None:
            x = self.fc1_dropout(x)
        x = F.gelu(self.fc2(x))
        return x


class EncoderBlock(nn.Module):
    def __init__(self, n_heads, d_model, d_k, d_v, dropout=None, last_bias=False):
        """
        Encoder block cosisting of multi-headed attention block followed by 
        mlp block composed of two linear layers with GELU non-linearity in between

        Args:
            n_heads: number of heads for multi-headed attention block
            d_model: latent vector size
            d_hidden: hidden vector size for mlp block
            d_k: dimensionality of query and key vectors
            d_v: dimensionality of value vectors
            dropout: dropout rate, if set to None no dropout is applied
        """
        super(EncoderBlock, self).__init__()
        
        self.attn_layer_norm = nn.LayerNorm(d_model)
        self.multi_headed_attn = MultiHeadedAttention(n_heads, d_model, d_k, d_v, dropout)
        self.mlp_layer_norm = nn.LayerNorm(d_model)
        self.mlp = MLPBlock(d_model, d_model, dropout, last_bias)
        
    def forward(self, x):
        x = self.multi_headed_attn(self.attn_layer_norm(x)) + x
        x = self.mlp(self.mlp_layer_norm(x)) + x
        return x


class TransformerEncoder(nn.Module):
    def __init__(self, n_heads, d_model, d_k, d_v, n_layers, dropout=None):
        """
        Stacks n_layers encoders on top of each other where each one is composed of 
        multi-headed attention block followed by mlp block

        Args:
            n_heads: number of heads for multi-headed attention block
            d_model: latent vector size
            d_k: dimensionality of query and key vectors
            d_v: dimensionality of value vectors
            n_layers: how many encoders to stack on top of each other
            dropout: dropout rate, if set to None no dropout is applied
        """
        super(TransformerEncoder, self).__init__()
        self.layers = nn.ModuleList([EncoderBlock(n_heads, d_model, d_k, d_v, dropout, i == (n_layers - 1)) for i in range(n_layers)])

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x


class ScaledDotProductAttention(nn.Module):
    def __init__(self, dropout=None):
        """
        The input consists of queries, keys and values. We compute the dot
        products of the query with all keys, divide by dimension of key,
        and apply a softmax function to obtain the weights on the values.
        Attention function is computed on a set of queries simultaneously,
        packed tohether into a matrix Q. The keys and values are also packed
        together into matrix K and V.

        Args:
            dropout: dropout rate, if set to None no dropout is applied
        """
        super(ScaledDotProductAttention, self).__init__()
        self.attn_dropout = nn.Dropout(dropout) if dropout is not None else None

    def forward(self, Q, K, V):
        scores = torch.bmm(Q, K.transpose(-2, -1)) / np.sqrt(Q.size(-1))
        attention = F.softmax(scores, dim=-1)
        output = torch.bmm(attention, V)
        if self.attn_dropout is not None:
            output = self.attn_dropout(output)
        return output


class MultiHeadedAttention(nn.Module):
    def __init__(self, n_heads, d_model, d_k, d_v, dropout=None):
        """
        Instead of performing a single attention function with d_model 
        dimensional keys, values and queries, they are linearly projected h
        times with different, learned linear projection to d_k, d_k and d_v
        dimensions, respectively. On each of those projected versions of queries,
        keys and values the attention function is performed in parallel, yielding
        d_v dimensional output values. There are then concatenated and once again
        projected, resulting in the final values.
        
        Multi-head attention allows the model to jointly attend to information
        from different representation subspaces at different positions.

        Args:
            n_heads: number of attention heads used
            d_model: constant latent vector size used throught the Transformer
            d_k: dimensionality of query and key vectors
            d_v: dimensionality of value vectors
            dropout: dropout rate, if set to None no dropout is applied
        """
        super(MultiHeadedAttention, self).__init__()
        self.n_heads = n_heads
        self.d_k = d_k
        self.d_v = d_v
        self.d_model = d_model

        self.project_query = nn.Linear(d_model, n_heads * d_k, bias=True)
        self.project_key = nn.Linear(d_model, n_heads * d_k, bias=True)
        self.project_value = nn.Linear(d_model, n_heads * d_v, bias=True)

        self.project_output = nn.Linear(n_heads * d_v, d_model, bias=True)
        self.attention = ScaledDotProductAttention(dropout)

    def forward(self, x):
        batch_size = x.size(0)
        n_patches = x.size(1)
        n_heads = self.n_heads
        d_k = self.d_k
        d_v = self.d_v

        # x: (batch_size, n_patches, n_heads * n_k) ---(view)--> (batch_size, n_patches, n_heads, d_k) ---(permute)--> (n_heads, batch_size, n_patches, d_k)
        queries = self.project_query(x).view(batch_size, n_patches, n_heads, d_k).permute(2, 0, 1, 3)
        keys = self.project_key(x).view(batch_size, n_patches, n_heads, d_k).permute(2, 0, 1, 3)
        values = self.project_value(x).view(batch_size, n_patches, n_heads, d_k).permute(2, 0, 1, 3)

        # heads_outputs: (n_heads, batch_size, n_patches, d_v)
        heads_outputs = torch.stack([self.attention(queries[i], keys[i], values[i]) for i in range(n_heads)], dim=0)

        # print("head outputs", heads_outputs.shape, heads_outputs[0, :5, 0, 0])

        # heads_contatenated: (batch_size, n_patches, n_heads * d_v)
        heads_concatenated = heads_outputs.permute(1, 2, 0, 3).contiguous().view(batch_size, n_patches, n_heads * d_v)

        # x: (batch_size, n_patches, d_model)
        x = self.project_output(heads_concatenated)
        assert(x.shape == (batch_size, n_patches, self.d_model)), f"Ups, wrong output shape! Should be ({batch_size}, {n_patches}, {self.d_model}) not {x.size(0)}{x.size(1)}{x.size(2)}!"

        return x


class AddPositionEmbeddings(nn.Module):
    def __init__(self, n_patches, d_model):
        """
        Position embeddings are added to the patch embeddings 
        to retrain positional information. There are set as
        learnable parameters. The resulting sequence of embedding 
        vectors serves as input to the encoder. We use n+1 because
        we prepend embedded patches with an extra learnable [class] token
        used for classification. 

        Args:
            n_patches: number of patches
            d_model: constant latent vector size used throught the Transformer
        """
        super(AddPositionEmbeddings, self).__init__()
        self.position_embeddings = nn.Parameter(torch.randn(n_patches + 1, d_model))


    def forward(self, x):
        return x + self.position_embeddings


class PreparePatches(object):
    def __init__(self, n, h, w, c):
        """
        Visual Transformer should receive as input a sequence of n 1D patches.
        Here we flatten a batch of 2D images into patches of size (n, p^2*c).
        (h,w) is the resolution of the original image, and c is the number
        of channels. (p,p) is the resolution of each image patch, and n = h*w/p^2
        is the resulting number of patches, which also serve as the effective input
        sequence length for the Transformer.

        Args:
            n: number of patches into which we segment one image
            h: height of an image
            w: width of an image

        Returns:
            batch of sequence of image patches of size (n, p^2 * c)
        """
        self.n = n
        
        assert((h * w) / n == (h * w // n)), f"Can't divide into equally sized patches {(h * w) / n} != {(h * w) // n}"
        p_squared = (h * w) // n
        
        self.patch_dim = p_squared * c

    def __call__(self, imgs):
        batch_size = imgs.size(0)
        imgs = imgs.view(batch_size, self.n, self.patch_dim)
        return imgs


class VisionTransformer(nn.Module):
    def __init__(self, d_model, d_k, d_v, n_patches, h, w, c, n_heads, n_layers, n_classes, dropout):
        """
        Vision Transformer

        Args:
            d_model: latent vector size
            d_k: dimensionality of query and key vectors
            d_v: dimensionality of value vectors
            n_patches:
            h: height of original input image
            w: width of original input image
            c: number of channals of original input image
            n_heads: number of heads for multi-headed attention block
            n_layers: how many encoders to stack on top of each other 
            n_classes: how many classes are there in classification task
            dropout: dropout rate, if set to zero then no dropout is applied
        """
        super(VisionTransformer, self).__init__()

        # Create save name from some parameters
        self.name = f'vit-p{n_patches}-h{n_heads}-l{n_layers}-d{d_model}'
        
        self.prepare_patches = PreparePatches(n_patches, h, w, c)
        patch_dim = self.prepare_patches.patch_dim
        
        self.linear_projection = nn.Linear(patch_dim, d_model, bias=False)
        self.add_pos_emb = AddPositionEmbeddings(n_patches, d_model)
        
        self.transformer_encoder = TransformerEncoder(n_heads, d_model, d_k, d_v, n_layers, dropout)
        self.mlp_head = nn.Linear(d_model, n_classes, bias=True)
        
        self.cls = nn.Parameter(torch.randn(d_model))

    def forward(self, imgs):
        flattened_2d_patches = self.prepare_patches(imgs)
        patch_emb = self.linear_projection(flattened_2d_patches)
        patch_emb = torch.cat((self.cls.repeat(patch_emb.size(0), 1, 1), patch_emb), dim=1)
        patch_emb = self.add_pos_emb(patch_emb)

        # print(f"patch_emb shape: {patch_emb.shape}")

        x = self.transformer_encoder(patch_emb)
        # just take class embeddings for classification purposes
        # print(f"x shape: {x.shape} {x[:5, 0, 0]}")
        x = self.mlp_head(x[:, 0, :])
        return x

In [None]:
!pip install einops



In [None]:
from einops import rearrange, reduce

class ScaledDotProductAttentionEINOPS(nn.Module):
    def __init__(self, dropout=None):
        """
        attention implemented using einops
        """
        super(ScaledDotProductAttentionEINOPS, self).__init__()
        self.attn_dropout = nn.Dropout(dropout) if dropout is not None else None

    def forward(self, Q, K, V):
        scores = torch.einsum('bqd,bkd->bqk', Q, K) / np.sqrt(Q.size(-1))
        attention = F.sigmoid(scores, dim=-1)
        output = torch.einsum('bqk,bkd->bqd', attention, V)
        if self.attn_droppout is not None:
            output = self.attn_dropout(output)
        return output


class MultiHeadedAttention2(nn.Module):
    def __init__(self, n_heads, d_model, d_k, d_v, dropout=None):
        """
        Instead of performing a single attention function with d_model 
        dimensional keys, values and queries, they are linearly projected h
        times with different, learned linear projection to d_k, d_k and d_v
        dimensions, respectively. On each of those projected versions of queries,
        keys and values the attention function is performed in parallel, yielding
        d_v dimensional output values. There are then concatenated and once again
        projected, resulting in the final values.
        
        Multi-head attention allows the model to jointly attend to information
        from different representation subspaces at different positions.

        Args:
            n_heads: number of attention heads used
            d_model: constant latent vector size used throught the Transformer
            d_k: dimensionality of query and key vectors
            d_v: dimensionality of value vectors
            dropout: dropout rate, if set to None no dropout is applied
        """
        super(MultiHeadedAttention2, self).__init__()
        self.n_heads = n_heads
        self.d_k = d_k
        self.d_v = d_v
        self.d_model = d_model

        self.project_query = nn.Linear(d_model, n_heads * d_k, bias=True)
        self.project_key = nn.Linear(d_model, n_heads * d_k, bias=True)
        self.project_value = nn.Linear(d_model, n_heads * d_v, bias=True)

        self.project_output = nn.Linear(n_heads * d_v, d_model, bias=True)
        self.attention = ScaledDotProductAttention(dropout)

    def forward(self, x):
        batch_size = x.size(0)
        n_patches = x.size(1)
        n_heads = self.n_heads
        d_k = self.d_k
        d_v = self.d_v

        # project x: (batch_size, n_patches, n_heads * n_k) 
        # ---(view)--> (batch_size, n_patches, n_heads, d_k)
        # ---(permute)--> (n_heads, batch_size, n_patches, d_k).contiguous()
        # ---(view) --> (n_heads * batch_size, n_patches, d_k)
        queries = self.project_query(x).view(batch_size, n_patches, n_heads, d_k).permute(2, 0, 1, 3).contiguous().view(n_heads * batch_size, n_patches, d_k)
        keys = self.project_key(x).view(batch_size, n_patches, n_heads, d_k).permute(2, 0, 1, 3).contiguous().view(n_heads * batch_size, n_patches, d_k)
        values = self.project_value(x).view(batch_size, n_patches, n_heads, d_k).permute(2, 0, 1, 3).contiguous().view(n_heads * batch_size, n_patches, d_k)

        # heads_outputs: (n_heads * batch_size, n_patches, d_v)
        heads_outputs = self.attention(queries, keys, values)
        # print("head outputs", heads_outputs.shape, heads_outputs[0, :5, 0, 0])

        # heads_outputs: (n_heads * batch_size, n_patches, d_v)
        # ---(view)--> (n_heads, batch_size, n_patches, d_v)
        # ---(permute)--> (batch_size, n_patches, n_heads, d_v).contiguous()
        # ---(view)--> (batch_size, n_patches, n_heads * d_v)
        heads_concatenated = heads_outputs.view(n_heads, batch_size, n_patches, d_v).permute(1, 2, 0, 3).contiguous().view(batch_size, n_patches, n_heads * d_v)

        # x: (batch_size, n_patches, d_model) ||| d_model: (n_heads * d_v)
        x = self.project_output(heads_concatenated)
        assert(x.shape == (batch_size, n_patches, self.d_model)), f"Ups, wrong output shape! Should be ({batch_size}, {n_patches}, {self.d_model}) not {x.size(0)}{x.size(1)}{x.size(2)}!"

        return x


class MultiHeadedAttentionEINOPS(nn.Module):
    def __init__(self, n_heads, d_model, d_k, d_v, dropout=None):
        """
        Simplifing code for multi-headed attention with einops!

        Args:
            n_heads: number of attention heads used
            d_model: constant latent vector size used throught the Transformer
            d_k: dimensionality of query and key vectors
            d_v: dimensionality of value vectors
            dropout: dropout rate, if set to None no dropout is applied
        """
        super(MultiHeadedAttentionEINOPS, self).__init__()
        self.n_heads = n_heads
        self.d_k = d_k
        self.d_v = d_v
        self.d_model = d_model

        self.project_query = nn.Linear(d_model, n_heads * d_k, bias=True)
        self.project_key = nn.Linear(d_model, n_heads * d_k, bias=True)
        self.project_value = nn.Linear(d_model, n_heads * d_v, bias=True)

        self.project_output = nn.Linear(n_heads * d_v, d_model, bias=True)
        self.attention = ScaledDotProductAttention(dropout)

    def forward(self, x):
        batch_size = x.size(0)
        n_patches = x.size(1)
        
        # project x: (batch_size, n_patches, n_heads * n_k) 
        # ---(rearrange)--> (n_heads * batch_size, n_patches, d_k)
        queries = rearrange(self.project_query(x), 'b p (h k) -> (h b) p k', h=self.n_heads)
        keys = rearrange(self.project_key(x), 'b p (h k) -> (h b) p k', h=self.n_heads)
        values = rearrange(self.project_value(x), 'b p (h k) -> (h b) p k', h=self.n_heads)

        # heads_outputs: (n_heads * batch_size, n_patches, d_v)
        heads_outputs = self.attention(queries, keys, values)
        # print("head outputs", heads_outputs.shape, heads_outputs[0, :5, 0, 0])

        # heads_outputs: (n_heads * batch_size, n_patches, d_v)
        # ---(rearrange)--> (batch_size, n_patches, n_heads * d_v)
        print(heads_outputs.shape, "aaaaaaaaa")
        heads_concatenated = rearrange(heads_outputs, '(h b) p v -> b p (h v)', h=self.n_heads)

        # x: (batch_size, n_patches, d_model) ||| d_model: (n_heads * d_v)
        x = self.project_output(heads_concatenated)
        assert(x.shape == (batch_size, n_patches, self.d_model)), f"Ups, wrong output shape! Should be ({batch_size}, {n_patches}, {self.d_model}) not {x.size(0)}{x.size(1)}{x.size(2)}!"

        return x

In [None]:
def check_new_implementation(old_module_class, new_module_class, params_dict, sample):
    
    old = nn.Sequential(old_module_class(**params_dict))
    state_dict = old.state_dict()

    new = nn.Sequential(new_module_class(**params_dict))
    new.load_state_dict(state_dict)

    old.eval()
    new.eval()

    with torch.no_grad():
        old_output = old(sample)
        new_output = new(sample)

    print(list(old.children())[0].project_query.weight)
    print(list(new.children())[0].project_query.weight)

    print(old)
    print(new)

    return torch.eq(old_output, new_output).to(dtype=torch.float32).mean()

params_dict = {
    'n_heads': 3,
    'd_model': 12,
    'd_k': 12 // 3,
    'd_v': 12 // 3,
    'dropout': None
}
sample = torch.randn(2, 5, 12)
result = check_new_implementation(MultiHeadedAttention, MultiHeadedAttentionEINOPS, params_dict, sample)
print(result)

torch.Size([6, 5, 4]) aaaaaaaaa
Parameter containing:
tensor([[-0.2796, -0.2156,  0.1463,  0.0160, -0.2588,  0.2331,  0.0547, -0.2382,
         -0.1987,  0.2463,  0.2800, -0.0996],
        [ 0.2558,  0.0043, -0.2612,  0.0423, -0.0645, -0.2765, -0.1909, -0.2776,
          0.1005,  0.1680,  0.1862,  0.0153],
        [ 0.2115,  0.0877, -0.2420,  0.0349, -0.2743, -0.1967, -0.1431, -0.2024,
          0.2511,  0.2809, -0.0984,  0.1540],
        [ 0.2006,  0.2094, -0.0953, -0.0296, -0.1103,  0.2791,  0.0975,  0.2856,
          0.2387, -0.0825, -0.1601,  0.0969],
        [ 0.0832,  0.2233,  0.1305, -0.2109,  0.2101,  0.0972, -0.1802, -0.0716,
         -0.0103, -0.0791, -0.1291, -0.1474],
        [-0.0659, -0.1646,  0.0794, -0.2437,  0.1015,  0.0448, -0.2178, -0.2415,
          0.2269,  0.2790, -0.0552,  0.1904],
        [ 0.0685,  0.0406,  0.0651,  0.2556, -0.1349,  0.1567,  0.1195, -0.1533,
          0.1392, -0.0089,  0.1588, -0.1638],
        [-0.1028,  0.0628, -0.0570,  0.0087, -0.0279, -0.

In [None]:
import torch.optim as optim
from tqdm.auto import tqdm
import os

def train(model, optimizer, criterion, train_generator, test_generator, epochs, test_every=1, save_path=None):

    train_loss_record = []
    train_acc_record = []
    test_loss_record = []
    test_acc_record = []
    best_test_acc = 0

    for epoch in range(epochs):
        
        model.train()

        train_loss = 0
        correct = 0
        train_size = 0

        for (imgs, targets) in tqdm(train_generator):
            # patches = patch_imgs(imgs)
            imgs = imgs.to(device)
            targets = targets.to(device)

            optimizer.zero_grad()
            outputs = model(imgs)

            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            # for p in list(filter(lambda p: p.grad is not None, model.parameters())):
            #     print(p.grad.data)

            predicted = outputs.argmax(1)  

            # print(targets, predicted)

            train_loss += loss.item()
            correct += (predicted == targets).sum().item()
            train_size += imgs.size(0)

        train_loss /= train_size
        train_acc = correct / train_size
        train_loss_record.append(train_loss)
        train_acc_record.append(train_acc)

        print('Epoch {} train => loss {:.3f},  acc: {:.3f}'.format(epoch + 1, train_loss, train_acc))

        if (epoch + 1) % test_every == 0:
            test_loss, test_acc = test(model, criterion, test_generator)
            test_loss_record.append(test_loss)
            test_acc_record.append(test_acc)

            if test_acc > best_test_acc:
                best_test_acc = test_acc

                if save_path is not None:
                    # path = os.path.join(save_path, model.name + '-epoch' + str(epoch) + '.pt')
                    path = os.path.join(save_path, model.name + '.pt')
                    torch.save({
                        'epoch': epoch,
                        'model_state_dict': model.state_dict(),
                        'optimizer_state_dict': optimizer.state_dict(),
                        'loss': test_loss,
                        'acc': test_acc
                    }, path)

                    print('Saving best model to {} with loss: {:.4f}, acc: {:.4f}!'.format(path, test_loss, test_acc))
            
            print('\t test => loss: {:.5f}, acc: {:.5f}'.format(test_loss, test_acc))

    out_dict = {
        'model': model,
        'optimizer': optimizer,
        'train_losses': train_loss_record,
        'train_accs': train_acc_record,
        'test_losses': test_loss_record,
        'test_accs': test_acc_record
    }

    return out_dict


def test(model, criterion, test_generator):
    model.eval()
    correct = 0
    test_size = 0
    test_loss = 0

    with torch.no_grad():
        for (imgs, targets) in tqdm(test_generator):
            # patches = patch_imgs(imgs)
            imgs = imgs.to(device)
            targets = targets.to(device)

            outputs = model(imgs)
            test_loss += criterion(outputs, targets).item()
            predicted = outputs.argmax(1)

            correct += (predicted == targets).sum().item()
            test_size += imgs.size(0)

    return test_loss / test_size, correct / test_size

In [None]:
import torchvision
import torchvision.transforms as transforms

train_transforms = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261))                        
])

test_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261))                        
])

train_dataset = torchvision.datasets.CIFAR10(root="./drive/MyDrive/datasets/",
                                             train=True,
                                             download=True,
                                             transform=train_transforms)

test_dataset = torchvision.datasets.CIFAR10(root="./drive/MyDrive/datasets/",
                                            train=False,
                                            download=True,
                                            transform=test_transforms)

train_generator = torch.utils.data.DataLoader(train_dataset,
                                              batch_size=512,
                                              shuffle=True,
                                              num_workers=4)

test_generator = torch.utils.data.DataLoader(test_dataset,
                                             batch_size=521,
                                             shuffle=False,
                                             num_workers=4)

Files already downloaded and verified
Files already downloaded and verified


In [None]:
n_heads = 2
vit_params = {
    'd_model': 192,
    'd_k': 192 // n_heads,
    'd_v': 192 // n_heads,
    'n_patches': 16,
    'h': 32,
    'w': 32,
    'c': 3,
    'n_heads': n_heads,
    'n_layers': 5,
    'n_classes': 10,
    'dropout': None
}

device = 'cuda' if torch.cuda.is_available() else 'cpu'

print(device)

vit = VisionTransformer(**vit_params)

criterion = nn.CrossEntropyLoss(reduction='sum')
optimizer = optim.Adam(vit.parameters(), lr=1e-3, betas=(0.9, 0.999))

# for name, param in vit.named_parameters():
#     print(name, param.shape, param.requires_grad)

# classes = ('plane', 'car', 'bird', 'cat', 'deer',
#            'dog', 'frog', 'horse', 'ship', 'truck')

vit.to(device)

cuda


VisionTransformer(
  (linear_projection): Linear(in_features=192, out_features=192, bias=False)
  (add_pos_emb): AddPositionEmbeddings()
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0): EncoderBlock(
        (attn_layer_norm): LayerNorm((192,), eps=1e-05, elementwise_affine=True)
        (multi_headed_attn): MultiHeadedAttention(
          (project_query): Linear(in_features=192, out_features=192, bias=True)
          (project_key): Linear(in_features=192, out_features=192, bias=True)
          (project_value): Linear(in_features=192, out_features=192, bias=True)
          (project_output): Linear(in_features=192, out_features=192, bias=True)
          (attention): ScaledDotProductAttention(
            (attn_dropout): Dropout(p=0.2, inplace=False)
          )
        )
        (mlp_layer_norm): LayerNorm((192,), eps=1e-05, elementwise_affine=True)
        (mlp): MLPBlock(
          (fc1): Linear(in_features=192, out_features=192, bias=True)
          (

In [None]:
train_params = {
    'model': vit,
    'optimizer': optimizer,
    'criterion': criterion,
    'train_generator': train_generator,
    'test_generator': test_generator,
    'epochs': 100,
    'test_every': 3,
    'save_path': './drive/MyDrive/information-theory/'
}

out_dict = train(**train_params)

HBox(children=(FloatProgress(value=0.0, max=98.0), HTML(value='')))


Epoch 1 train => loss 1.905,  acc: 0.305


HBox(children=(FloatProgress(value=0.0, max=98.0), HTML(value='')))


Epoch 2 train => loss 1.647,  acc: 0.399


HBox(children=(FloatProgress(value=0.0, max=98.0), HTML(value='')))


Epoch 3 train => loss 1.552,  acc: 0.438


HBox(children=(FloatProgress(value=0.0, max=20.0), HTML(value='')))


Saving best model to ./drive/MyDrive/information-theory/vit-p16-h2-l5-d192.pt with loss: 1.4614, acc: 0.4767!
	 test => loss: 1.46139, acc: 0.47670


HBox(children=(FloatProgress(value=0.0, max=98.0), HTML(value='')))


Epoch 4 train => loss 1.488,  acc: 0.458


HBox(children=(FloatProgress(value=0.0, max=98.0), HTML(value='')))


Epoch 5 train => loss 1.440,  acc: 0.476


HBox(children=(FloatProgress(value=0.0, max=98.0), HTML(value='')))


Epoch 6 train => loss 1.405,  acc: 0.489


HBox(children=(FloatProgress(value=0.0, max=20.0), HTML(value='')))


Saving best model to ./drive/MyDrive/information-theory/vit-p16-h2-l5-d192.pt with loss: 1.3116, acc: 0.5274!
	 test => loss: 1.31158, acc: 0.52740


HBox(children=(FloatProgress(value=0.0, max=98.0), HTML(value='')))


Epoch 7 train => loss 1.364,  acc: 0.505


HBox(children=(FloatProgress(value=0.0, max=98.0), HTML(value='')))

KeyboardInterrupt: ignored

In [None]:
def prepare_for_display(img_tensor):
    min = img_tensor.min()
    max = img_tensor.max()
    img = (img_tensor - min) / (max - min)
    return img.permute(1, 2, 0).numpy()

imgs, labels = next(iter(train_generator))

import matplotlib.pyplot as plt

plt.imshow(prepare_for_display(imgs[0]))

In [None]:
ls

[0m[01;34mdrive[0m/  [01;34msample_data[0m/


In [None]:
n_heads = 6
vit_params = {
    'd_model': 192,
    'd_k': 192 // n_heads,
    'd_v': 192 // n_heads,
    'n_patches': 16,
    'h': 32,
    'w': 32,
    'c': 3,
    'n_heads': n_heads,
    'n_layers': 5,
    'n_classes': 10,
    'dropout': 0.2
}

device = 'cuda' if torch.cuda.is_available() else 'cpu'

print(device)

vit = VisionTransformer(**vit_params)

vit.to(device)

path = './drive/MyDrive/information-theory/vit-p16-h6-l5-d192.pt'
checkpoint = torch.load(path)
vit.load_state_dict(checkpoint['model_state_dict'])
vit.eval()
print('good')

cuda


RuntimeError: ignored

In [None]:
input = torch.randn(1, 17, 192, device=device)
input

tensor([[[ 0.5851, -0.6341, -0.3194,  ..., -1.3412, -1.2480, -0.2019],
         [-0.2087,  1.1588, -2.0356,  ...,  0.9790, -1.5975, -0.1953],
         [-0.7126,  0.5080,  0.4224,  ...,  1.7622, -2.1229, -0.5051],
         ...,
         [ 0.6126, -0.3365, -2.0510,  ...,  1.7777,  1.1230,  0.1882],
         [ 1.8349, -0.6682,  1.5031,  ...,  0.2551, -0.5874,  0.8940],
         [-0.6105,  0.2057,  0.0888,  ...,  0.0930,  0.6441, -0.5364]]],
       device='cuda:0')

In [None]:
encoder0 = nn.Sequential(vit.transformer_encoder.layers[0])
encoder0.to(device)
encoder0.eval()

Sequential(
  (0): EncoderBlock(
    (attn_layer_norm): LayerNorm((192,), eps=1e-05, elementwise_affine=True)
    (multi_headed_attn): MultiHeadedAttention(
      (project_query): Linear(in_features=192, out_features=192, bias=True)
      (project_key): Linear(in_features=192, out_features=192, bias=True)
      (project_value): Linear(in_features=192, out_features=192, bias=True)
      (project_output): Linear(in_features=192, out_features=192, bias=True)
      (attention): ScaledDotProductAttention(
        (attn_dropout): Dropout(p=0.2, inplace=False)
      )
    )
    (mlp_layer_norm): LayerNorm((192,), eps=1e-05, elementwise_affine=True)
    (mlp): MLPBlock(
      (fc1): Linear(in_features=192, out_features=192, bias=True)
      (fc1_dropout): Dropout(p=0.2, inplace=False)
      (fc2): Linear(in_features=192, out_features=192, bias=True)
      (fc2_dropout): Dropout(p=0.2, inplace=False)
    )
  )
)

In [None]:
with torch.no_grad():
    output = encoder0(input)
    print(output.shape)
    print(output.min(), output.max())

torch.Size([1, 17, 192])
tensor(-3.7110, device='cuda:0') tensor(4.4641, device='cuda:0')


In [None]:
o = F.gelu(output)
o.shape, o.min(), o.max()

(torch.Size([1, 17, 192]),
 tensor(-0.1700, device='cuda:0'),
 tensor(4.4641, device='cuda:0'))

In [None]:
def hook(module, input, output):
    print('{} input => min: {}, max: {}'.format(module, input[0].min(), input[0].max()))
    print('{} output => min: {}, max: {}'.format(module, output.min(), output.max()))

handle1.remove()
handle2.remove()

mlp = list(encoder0.children())[0].mlp
handle1 = mlp.fc1.register_forward_hook(hook)
handle2 = mlp.fc2.register_forward_hook(hook)

In [None]:
inputs = torch.randn(1, 17, 192, device=device)

with torch.no_grad():
    outputs = F.gelu(encoder0(inputs))
    print('gelu output => min: {}, max: {}'.format(output.min(), output.max()))

Linear(in_features=192, out_features=192, bias=True) input => min: -3.240483045578003, max: 2.969756841659546
Linear(in_features=192, out_features=192, bias=True) output => min: -6.405068874359131, max: 3.2571728229522705
Linear(in_features=192, out_features=192, bias=True) input => min: -0.16997013986110687, max: 3.255340099334717
Linear(in_features=192, out_features=192, bias=True) output => min: -5.949283123016357, max: 2.2241086959838867
gelu output => min: -0.16997118294239044, max: 4.572508811950684


In [None]:
pos_emb = nn.Sequential(*list(vit.children())[:2])
pos_emb.to(device)
pos_emb.eval()

Sequential(
  (0): Linear(in_features=192, out_features=192, bias=False)
  (1): AddPositionEmbeddings()
)

In [None]:
inputs = torch.randn(1000, 17, 192, device=device)

with torch.no_grad():
    outputs = pos_emb(inputs)
    print('output => min: {}, max: {}'.format(output.min(), output.max()))

output => min: -8.480195045471191, max: 8.930232048034668


In [None]:
def tuples1d(sample, r=(0, 1), bins=16):
    l, h = r
    print(l, h, bins)
    sample = torch.floor((sample - l) / (h - l + 0.0001) * bins).to(torch.int32).cpu().numpy()
    for k in range(sample.shape[0]):
        yield tuple(sample[k])

In [None]:
# in_range = (-1.7, 9.0)
in_range = (-10.0, 10.0)
in_bins = 64
out_range = (-0.17, 15.0)
out_bins = 64

in_l, in_u = in_range

inputs = (in_u - in_l) * torch.rand((1000, 17, 192), device=device) + in_l

with torch.no_grad():
    outputs = F.gelu(encoder0(inputs))
    print('gelu output => min: {}, max: {}'.format(outputs.min(), outputs.max()))

inputs = torch.flatten(inputs, start_dim=1)
outputs = torch.flatten(outputs, start_dim=1)

Linear(in_features=192, out_features=192, bias=True) input => min: -2.5806469917297363, max: 2.540040969848633
Linear(in_features=192, out_features=192, bias=True) output => min: -7.042560577392578, max: 6.321585178375244
Linear(in_features=192, out_features=192, bias=True) input => min: -0.16997124254703522, max: 6.321585178375244
Linear(in_features=192, out_features=192, bias=True) output => min: -20.98538589477539, max: 9.445093154907227
gelu output => min: -0.16997122764587402, max: 17.27882194519043


In [None]:
from collections import defaultdict

histx = defaultdict(int)
histy = defaultdict(int)
histxy = defaultdict(int)

for x, y in zip(tuples1d(inputs, r=in_range, bins=in_bins), tuples1d(outputs, r=out_range, bins=out_bins)):
    # print(all(b < in_bins for b in x))
    # print(all(b < out_bins for b in y))
    # print(min(x), max(x), min(y), max(y))
    if all(b < in_bins for b in x) and all(b < out_bins for b in y):
        histx[x] += 1
        histy[y] += 1
        histxy[(x, y)] += 1

-10.0 10.0 64
-0.17 15.0 64


In [None]:
192 * 192 *

In [None]:
import torch
import torch.nn as nn
from collections import defaultdict

########### Set Device ############
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = 'cpu'
dtype = torch.float32
torch.set_default_dtype(dtype)
print("Using device: {}".format(device))

Using device: cpu


In [None]:
initializers = {
    'kaiming': None, # (default)
    'xavier_uniform': nn.init.xavier_uniform_,
    'xavier_normal': nn.init.xavier_normal_,
    'paper': nn.init.uniform_
}

def weight_initializer(name):
    def init_weights(m):
        if name == 'kaiming':
            return
        if name == 'paper':
            if isinstance(m, nn.Linear):
                boundary = 1 / np.sqrt(m.in_features)
                nn.init.uniform_(m.weight, a=-boundary, b=boundary)
        elif isinstance(m, nn.Linear):
            initializers[name](m.weight)
    return init_weights


def tuples1d(sample, r=(0, 1), bins=16):
    l, h = r
    print(l, h, bins)
    sample = torch.floor((sample - l) / (h - l + 0.0001) * bins).to(torch.int32).cpu().numpy()
    for k in range(sample.shape[0]):
        yield tuple(sample[k])

In [None]:
widths = [2, 3, 4, 5, 6]

samples = [10, 20, 50, 100, 200, 500, 1000, 
               2000, 5000, 10000, 
               20000, 50000, 100000, 
               200000, 500000, 1000000,
               2000000, 5000000, 10000000
          ]

database = {w: {} for w in widths}

In [None]:
from functools import reduce

def tuple_id(t, num_bins):
    id = reduce(lambda x, y: x + y, (num_bins * i + bin  for i, bin in enumerate(t)))
    return id

def id_to_tuple(id, num_inputs, num_bins):
    for i in range(num_inputs):
        bin = id % num_bins
        id = id // num_bins
        print(bin)

t = (4, 4, 4, 4)

t_id = tuple_id(t, 5)
t_back = id_to_tuple(t_id, len(t), 5)
print(t_id)

1
4
1
0
46


In [None]:
def vector_ei(layer_weights , samples=None, batch_size=20,
              in_range=None, in_bins=64,
              out_range=None, out_bins=64, device='cpu'):
    """
    Returns vector_ei.
    """
    
    #################################################
    #   Determine shapes, ranges, and activations   #
    #################################################

    in_shape, out_shape = layer_weights.shape

    in_l, in_u = in_range
    num_inputs = reduce(lambda x, y: x * y, in_shape)
    num_outputs = reduce(lambda x, y: x * y, out_shape)
    in_bin_width = (in_u - in_l) / in_bins

    histx = defaultdict(int)
    histy = defaultdict(int)
    histxy = defaultdict(int)

    for chunk_size in _chunk_sizes(samples, num_inputs, num_outputs, MEMORY_LIMIT):
        
        #################################################
        #   Create buffers for layer input and output   #
        #################################################
        inputs = torch.zeros((chunk_size, *in_shape), device=device)
        outputs = torch.zeros((chunk_size, *out_shape), device=device)
        
        #################################################
        #           Evaluate module on noise            #
        #################################################
        for (i0, i1), bsize in _indices_and_batch_sizes(chunk_size, batch_size):
            sample = (in_u - in_l) * torch.rand((bsize, *in_shape), device=device) + in_l
            try:
                result = _eval_model(sample, in_layer, layer, topology, activation)
            except:
                print(i0, i1, bsize, in_layer, layer, in_shape, out_shape)
                raise
            inputs[i0:i1] = sample
            outputs[i0:i1] = result
        inputs = torch.flatten(inputs, start_dim=1)
        outputs = torch.flatten(outputs, start_dim=1)
        
        #################################################
        #               Update Histogram                #
        #################################################
        for x, y in zip(_tuples1d(inputs, r=in_range, bins=in_bins), _tuples1d(outputs, r=out_range, bins=out_bins)):
            if all(b < in_bins for b in x) and all(b < out_bins for b in y):
                histx[x] += 1
                histy[y] += 1
                histxy[(x, y)] += 1

    vector_ei = _entropy(histx) + _entropy(histy) - _entropy(histxy)

    return vector_ei

In [None]:
# in_range = (-1.7, 9.0)
in_range = (-10.0, 10.0)
in_bins = 64
out_range = (-0.17, 15.0)
out_bins = 64

in_l, in_u = in_range

inputs = (in_u - in_l) * torch.rand((1000, 17, 192), device=device) + in_l

with torch.no_grad():
    outputs = F.gelu(encoder0(inputs))
    print('gelu output => min: {}, max: {}'.format(outputs.min(), outputs.max()))

inputs = torch.flatten(inputs, start_dim=1)
outputs = torch.flatten(outputs, start_dim=1)