<a href="https://colab.research.google.com/github/gremlin97/EVA-8/blob/main/S10/Implementations/MHA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# Importing required libraries
from torch.utils.data import Dataset
import torch.nn.functional as F
from collections import Counter
from os.path import exists
import torch.optim as optim
import torch.nn as nn
import numpy as np
import random
import torch
import math
import re

In [3]:
def attention(q,k,v, dropout=None):
  scores = q.matmul(k.transpose(-2,-1))/(math.sqrt(k.shape[-1]))

  scores = F.softmax(scores,dim=-1) # bx seq_len x embeddings [word1, word2, word3...word(seqlen)]

  scores = dropout(scores) if dropout is not None else scores

  out = scores.matmul(v)

  return out  

In [4]:
class MHAttention(nn.Module):
  def __init__(self, heads, out_dim, dropout=0.1):
    super().__init__()

    self.Wk =  nn.Linear(out_dim, out_dim)
    self.Wq =  nn.Linear(out_dim, out_dim)
    self.Wv =  nn.Linear(out_dim, out_dim)

    self.heads = heads
    self.out_dim = out_dim
    self.out_head_dim = out_dim//heads
    self.unify_heads = nn.Linear(out_dim,out_dim)
    self.dropout = nn.Dropout(dropout)

  def split_heads(self, x):
    return x.reshape(x.shape[0], self.heads, -1, self.out_head_dim)
  
  def forward(self, x):
    k = self.Wk(x)
    q = self.Wq(x)
    v = self.Wv(x)

    k = self.split_heads(k)
    q = self.split_heads(q)
    v = self.split_heads(v)

    scores = attention(k,q,v,self.dropout)
    scores = scores.transpose(1,2).contiguous().view(scores.shape[0],-1,self.out_dim)
    out = self.unify_heads(scores)
    return out

In [5]:
class FeedForward(nn.Module):
  def __init__(self, inp_dim, inner_dim, dropout=0.1):
    super().__init__()
    self.linear1 = nn.Linear(inp_dim,inner_dim)
    self.linear2 = nn.Linear(inner_dim,inp_dim)
    self.dropout = nn.Dropout(dropout)

  def forward(self,x):
    return self.linear2(self.dropout(F.relu(self.linear1(x))))

class Encoder(nn.Module):
  def __init__(self, heads, transformer_dim, ff_inner_dim,dropout=0.1):
    super().__init__()
    self.mha = MHAttention(heads,transformer_dim,dropout)
    self.ff = FeedForward(transformer_dim,ff_inner_dim,dropout)
    self.norm = nn.LayerNorm(transformer_dim)
    self.dropout = nn.Dropout(dropout)
  
  def forward(self, x):
    x_res = self.norm(x)
    x = x + self.dropout(self.mha(x_res))
    x_res = self.norm(x)
    x = x + self.dropout(self.ff(x_res))

    x_res = self.norm(x)
    x = x + self.dropout(self.mha(x_res))
    x_res = self.norm(x)
    x = x + self.dropout(self.ff(x_res))

    return x

In [6]:
heads = 4
embed_size = 64
ff_dim = 2*embed_size
seq_len = 10
enc = Encoder(heads,embed_size,ff_dim,0.1)
out = enc(torch.randn(2,seq_len,embed_size))

In [7]:
out.shape

torch.Size([2, 10, 64])

In [8]:
torch.save(enc.state_dict(), 'encoder_weights.pth')

In [9]:
encoder2 = Encoder(heads, embed_size, ff_dim, dropout=0.1)
encoder2.load_state_dict(torch.load('encoder_weights.pth'))

<All keys matched successfully>

In [10]:
out = encoder2(torch.randn(2,seq_len,embed_size))
out.shape

torch.Size([2, 10, 64])

In [36]:
# import tensorflow as tf
# import tensorflow.keras.layers as layers
# import math

# def attention(q, k, v, dropout=None):
#   scores = tf.matmul(q, tf.transpose(k, perm=[0, 1, 3, 2])) / math.sqrt(k.shape[-1])

#   scores = tf.nn.softmax(scores, axis=-1)

#   scores = dropout(scores) if dropout is not None else scores

#   out = tf.matmul(scores, v)

#   return out

# class MHAttention(tf.keras.layers.Layer):
#   def __init__(self, heads, out_dim, dropout=0.1):
#     super(MHAttention, self).__init__()

#     self.Wk = layers.Dense(out_dim)
#     self.Wq = layers.Dense(out_dim)
#     self.Wv = layers.Dense(out_dim)

#     self.heads = heads
#     self.out_dim = out_dim
#     self.out_head_dim = out_dim // heads
#     self.unify_heads = layers.Dense(out_dim)
#     self.dropout = layers.Dropout(dropout)

#   def split_heads(self, x):
#     return tf.reshape(x, [x.shape[0], self.heads, -1, self.out_head_dim])

#   def call(self, x):
#     k = self.Wk(x)
#     q = self.Wq(x)
#     v = self.Wv(x)

#     k = self.split_heads(k)
#     q = self.split_heads(q)
#     v = self.split_heads(v)

#     scores = attention(q, k, v, self.dropout)
#     scores = tf.reshape(tf.transpose(scores, perm=[0, 2, 1, 3]), [scores.shape[0], -1, self.out_dim])
#     out = self.unify_heads(scores)
#     return out

# class FeedForward(tf.keras.layers.Layer):
#   def __init__(self, inp_dim, inner_dim, dropout=0.1):
#     super(FeedForward, self).__init__()
#     self.linear1 = layers.Dense(inner_dim)
#     self.linear2 = layers.Dense(inp_dim)
#     self.dropout = layers.Dropout(dropout)

#   def call(self, x):
#     return self.linear2(self.dropout(tf.nn.relu(self.linear1(x))))

# class Encoder(tf.keras.layers.Layer):
#   def __init__(self, heads, transformer_dim, ff_inner_dim, dropout=0.1):
#     super(Encoder, self).__init__()
#     self.mha = MHAttention(heads, transformer_dim, dropout)
#     self.ff = FeedForward(transformer_dim, ff_inner_dim, dropout)
#     self.norm = layers.LayerNormalization()
#     self.dropout = layers.Dropout(dropout)
  
#   def call(self, x):
#     x_res = self.norm(x)
#     x = x + self.dropout(self.mha(x_res))
#     x_res = self.norm(x)
#     x = x + self.dropout(self.ff(x_res))

#     x_res = self.norm(x)
#     x = x + self.dropout(self.mha(x_res))
#     x_res = self.norm(x)
#     x = x + self.dropout(self.ff(x_res))

#     return x

# heads = 4
# embed_size = 64
# ff_dim = 2*embed_size
# seq_len = 10
# enc = Encoder(heads, embed_size, ff_dim, 0.1)
# out = enc(tf.random.normal([2, seq_len, embed_size]))
# print(out.shape)

(2, 10, 64)


In [67]:
import tensorflow as tf
from tensorflow.keras import layers


def attention(q, k, v, dropout=None):
    scores = tf.matmul(q, tf.transpose(k, perm=[0, 2, 1])) / tf.math.sqrt(tf.cast(k.shape[-1], tf.float32))
    scores = tf.nn.softmax(scores, axis=-1)
    scores = dropout(scores) if dropout is not None else scores
    out = tf.matmul(scores, v)
    return out


class MHAttention(layers.Layer):
    def __init__(self, heads, out_dim, dropout=0.1):
        super().__init__()

        self.Wk = layers.Dense(out_dim)
        self.Wq = layers.Dense(out_dim)
        self.Wv = layers.Dense(out_dim)

        self.heads = heads
        self.out_dim = out_dim
        self.out_head_dim = out_dim // heads
        self.unify_heads = layers.Dense(out_dim)
        self.dropout = layers.Dropout(dropout)

    def split_heads(self, x):
      return tf.reshape(x, [self.heads, -1, self.out_head_dim])

    def call(self, x):
        k = self.Wk(x)
        q = self.Wq(x)
        v = self.Wv(x)

        k = self.split_heads(k)
        q = self.split_heads(q)
        v = self.split_heads(v)

        scores = attention(q, k, v, self.dropout)
        scores = tf.reshape(scores, [self.heads, seq_len, self.out_head_dim])
        scores = tf.reshape(scores, [-1, self.out_dim])
        out = self.unify_heads(scores)
        return out

class FeedForward(layers.Layer):
    def __init__(self, inp_dim, inner_dim, dropout=0.1):
        super().__init__()
        self.linear1 = layers.Dense(inner_dim, activation='relu')
        self.linear2 = layers.Dense(inp_dim)
        self.dropout = layers.Dropout(dropout)

    def call(self, x):
        return self.linear2(self.dropout(self.linear1(x)))

class Encoder(layers.Layer):
    def __init__(self, heads, transformer_dim, ff_inner_dim, dropout=0.1):
        super().__init__()
        self.mha = MHAttention(heads, transformer_dim, dropout)
        self.ff = FeedForward(transformer_dim, ff_inner_dim, dropout)
        self.norm = layers.LayerNormalization()
        self.dropout = layers.Dropout(dropout)

    def call(self, x):
        x_res = self.norm(x)
        x = x + self.dropout(self.mha(x_res))
        x_res = self.norm(x)
        x = x + self.dropout(self.ff(x_res))

        x_res = self.norm(x)
        x = x + self.dropout(self.mha(x_res))
        x_res = self.norm(x)
        x = x + self.dropout(self.ff(x_res))

        return x

In [68]:
# define the encoder model
heads = 4
embed_size = 64
ff_dim = 2*embed_size
seq_len = 8
enc = Encoder(heads,embed_size,ff_dim,0.1)

inputs = tf.keras.layers.Input(shape=(seq_len, embed_size))
outputs = enc(inputs)
model = tf.keras.Model(inputs=inputs, outputs=outputs)

# # save the model
# tf.keras.models.save_model(model, 'encoder_weights.h5')

In [69]:
model.summary()

Model: "model_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_33 (InputLayer)       [(None, 8, 64)]           0         
                                                                 
 encoder_34 (Encoder)        (None, 8, 64)             33344     
                                                                 
Total params: 33,344
Trainable params: 33,344
Non-trainable params: 0
_________________________________________________________________


In [70]:
# input_seq = np.random.random((2, 10, 64))
inp = tf.random.normal(shape=(1, 8, 64))

# get the output
output = model.predict(inp)

# print the output shape
print(output.shape)

(1, 8, 64)


In [73]:
!pip install mxnet

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting mxnet
  Downloading mxnet-1.9.1-py3-none-manylinux2014_x86_64.whl (49.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.1/49.1 MB[0m [31m13.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting graphviz<0.9.0,>=0.8.1
  Downloading graphviz-0.8.4-py2.py3-none-any.whl (16 kB)
Installing collected packages: graphviz, mxnet
  Attempting uninstall: graphviz
    Found existing installation: graphviz 0.20.1
    Uninstalling graphviz-0.20.1:
      Successfully uninstalled graphviz-0.20.1
Successfully installed graphviz-0.8.4 mxnet-1.9.1
