Student Answer -> Bert encoding\
Question -> BERT encoding ------.->cross attention

The model can be created using the makeModel function.

In [1]:
from bert_embedding import BertEmbedding
from torch import nn
import torch
import torch.nn.functional as F
from torch.autograd import Variable
import numpy as np
import copy
import math
import time


In [None]:
!pip install transformers

In [2]:
import torch
from transformers import BertTokenizer, BertModel

# OPTIONAL: if you want to have more information on what's happening, activate the logger as follows
import logging
#logging.basicConfig(level=logging.INFO)

import matplotlib.pyplot as plt
# % matplotlib inline


In [19]:
class BertEmbedding(nn.Module):
    def __init__(self):
        super(BertEmbedding, self).__init__()
        # Load pre-trained model tokenizer (vocabulary)
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        self.model = BertModel.from_pretrained('bert-base-uncased',
                                  output_hidden_states = True, # Whether the model returns all hidden-states.
                                  )
        # Put the model in "evaluation" mode, meaning feed-forward operation.
        self.model.eval()

    def getEmbeddings(self,text):
      tokens_tensor,segments_tensor = self.TextPreprocessing(text)
      embeddings = self.generateEmbedding(tokens_tensor,segments_tensor)
      return embeddings

    def TextPreprocessing(self,text):
        marked_text = "[CLS] " + text + " [SEP]"
        # Tokenize our sentence with the BERT tokenizer.
        tokenized_text = self.tokenizer.tokenize(marked_text)
        indexed_tokens = self.tokenizer.convert_tokens_to_ids(tokenized_text)
        segments_ids = [1] * len(tokenized_text)
        tokens_tensor = torch.tensor([indexed_tokens])
        segments_tensors = torch.tensor([segments_ids])
        return tokens_tensor,segments_tensors
    
    def generateEmbedding(self,tokens_tensor,segments_tensor):
        with torch.no_grad():
            outputs = self.model(tokens_tensor, segments_tensor)
            hidden_states = outputs[2]
        tokens = torch.stack(hidden_states,dim=0)
        tokens = tokens.permute(1,2,0,3)
        final_val = tokens[:,1:-1,-2,:]
        return final_val

In [2]:
bert_abstract = """We introduce a new language representation model called BERT, which stands for Bidirectional Encoder Representations from Transformers.
 Unlike recent language representation models, BERT is designed to pre-train deep bidirectional representations by jointly conditioning on both left and right context in all layers.
 As a result, the pre-trained BERT representations can be fine-tuned with just one additional output layer to create state-of-the-art models for a wide range of tasks, such as question answering and language inference, without substantial task-specific architecture modifications. 
BERT is conceptually simple and empirically powerful. 
It obtains new state-of-the-art results on eleven natural language processing tasks, including pushing the GLUE benchmark to 80.4% (7.6% absolute improvement), MultiNLI accuracy to 86.7 (5.6% absolute improvement) and the SQuAD v1.1 question answering Test F1 to 93.2 (1.5% absolute improvement), outperforming human performance by 2.0%."""

In [3]:
student_ans = "Sky is red"
question = "What is the colour of the sky?"
reference_ans ="Sky appears blue"
full_marks = 3

In [4]:
def getBertEncoding(paragraph):
        sentences = paragraph.split('.')
        bert_embedding = BertEmbedding()
        result = bert_embedding(sentences)
        emb = torch.Tensor(result[0][1])
        # emb = emb.reshape(emb.size()[0],1,emb.size()[-1])
        emb = emb.unsqueeze(0)
        return emb

In [5]:
res = getBertEncoding(student_ans)

In [6]:
res.size()

torch.Size([1, 3, 768])

In [7]:
K= getBertEncoding(student_ans)
Q = getBertEncoding(question)

In [8]:
multihead_attn = nn.MultiheadAttention(embed_dim = 768, num_heads=3)

In [4]:
def clones(module, N):
    "Produce N identical layers."
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

In [5]:
def attention(query, key, value, mask=None, dropout=None):
    "Compute 'Scaled Dot Product Attention'"
    d_k = query.size(-1)
    scores = torch.matmul(query, key.transpose(-2, -1)) \
             / math.sqrt(d_k)
    if mask is not None:
        scores = scores.masked_fill(mask == 0, -1e9)
    p_attn = F.softmax(scores, dim = -1)
    if dropout is not None:
        p_attn = dropout(p_attn)
    return torch.matmul(p_attn, value), p_attn

In [6]:
class MultiHeadedAttention(nn.Module):
    "Multi-headed Attention module"
    def __init__(self, h, d_model, dropout=0.1):
        "Take in model size and number of heads."
        super(MultiHeadedAttention, self).__init__()
        assert d_model % h == 0
        # We assume d_v always equals d_k
        self.d_k = d_model // h
        self.h = h
        self.linears = clones(nn.Linear(d_model, d_model), 4)
        self.attn = None
        self.dropout = nn.Dropout(p=dropout)
        
    def forward(self, query, key, value, mask=None):
        if mask is not None:
            # Same mask applied to all h heads.
            mask = mask.unsqueeze(1)
        nbatches = query.size(0)
        
        # 1) Do all the linear projections in batch from d_model => h x d_k 
        query, key, value = \
            [l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
             for l, x in zip(self.linears, (query, key, value))]
        
        # 2) Apply attention on all the projected vectors in batch. 
        x, self.attn = attention(query, key, value, mask=mask, 
                                 dropout=self.dropout)
        
        # 3) "Concat" using a view and apply a final linear. 
        x = x.transpose(1, 2).contiguous() \
             .view(nbatches, -1, self.h * self.d_k)
        return self.linears[-1](x)

In [12]:
m = MultiHeadedAttention(h = 6,d_model = 768)

In [7]:
class FeedForwardLayer(nn.Module):
    "Feedforward with 1 hidden layer"
    def __init__(self,inp_dim,hid_dim,dropout = 0.1):
        super(FeedForwardLayer, self).__init__()
        self.inp_dim = inp_dim
        self.hid_dim = hid_dim
        self.hidden = nn.Linear(inp_dim,hid_dim)
        self.output = nn.Linear(hid_dim,inp_dim)
        self.relu = F.relu
        self.dropout = nn.Dropout(dropout)
        
    def forward(self,x):
        x = self.hidden(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.output(x)
        return x

In [8]:
class LayerNorm(nn.Module):
    "Construct a layernorm module."
    def __init__(self, features, eps=1e-6):
        super(LayerNorm, self).__init__()
        self.a_2 = nn.Parameter(torch.ones(features))
        self.b_2 = nn.Parameter(torch.zeros(features))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        return self.a_2 * (x - mean) / (std + self.eps) + self.b_2

In [9]:
class SublayerConnection(nn.Module):
    "Apply residual connection to any sublayer with the same size."
    def __init__(self, size, dropout=0.1):
        super(SublayerConnection, self).__init__()
        self.norm = LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        
        return x + self.dropout(sublayer(self.norm(x)))
        # return x + sublayer(self.norm(x))



In [10]:
class EncoderBlock(nn.Module):
    "An Encoder block that find the connection between Answer and Question"
    def __init__(self,attentionBlock,feedForwardBlock,size,dropout = 0.1):
        super(EncoderBlock,self).__init__()
        self.attentionBlock = attentionBlock
        self.feedForwardBlock = feedForwardBlock
        self.sublayer = clones(SublayerConnection(size,dropout),2)
        self.size = size

    def forward(self,Query,Value):
        x = self.sublayer[0](Query, lambda x: self.attentionBlock(query=x, value=Value, key=Value))
        print(x.size())
        return self.sublayer[1](x, self.feedForwardBlock)

    

In [11]:
class EncoderModule(nn.Module):
    "Stacks of Encoder blocks"
    def __init__(self, EncoderLayer,N):
        super(EncoderModule,self).__init__()
        self.layers = clones(EncoderLayer,N)
        self.norm = LayerNorm(EncoderLayer.size)
    
    def forward(self,Query,Value):
        for layers in self.layers:
            Query = layers(Query = Query,Value = Value)
        return Query


In [42]:
class RepresentationModule(nn.Module):
    "Bottom layer that gives 2 representations: Reference answer Rep and Student answer Rep"
    def __init__(self,EncoderModule, embeddingLayer):
        super(RepresentationModule,self).__init__()
        self.EncoderModules = clones(EncoderModule,2)
        self.embeddingLayer = embeddingLayer
        #Get 2 clones of EncoderModule
    def forward(self, Question, ReferenceAnswer, StudentAnswer):
        Q = self.embeddingLayer.getEmbeddings(Question)
        seconds = time.time()
        StuAns = self.embeddingLayer.getEmbeddings(StudentAnswer)
        RefAns = self.embeddingLayer.getEmbeddings(ReferenceAnswer)
        print(time.time()-seconds)
        studentAnsRep = self.EncoderModules[0](Q,StuAns)
        RefAnsRep = self.EncoderModules[1](Q,RefAns)
        print(time.time()-seconds)
        return (studentAnsRep,RefAnsRep)

In [13]:
class PositionalEncoding(nn.Module):
    "Implement the PE function."
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        
        # Compute the positional encodings once in log space.
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) *
                             -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
        
    def forward(self, x):
        print(self.pe[:,:x.size(1)].size())
        x = x + Variable(self.pe[:, :x.size(1)], 
                         requires_grad=False)
        print(x.size())
        return self.dropout(x)

In [34]:
class FinalFeedForwardLayer(nn.Module):
    def __init__(self,input_size):
        super().__init__()
        self.fc1 = nn.Linear(input_size,400)
        self.fc2 = nn.Linear(400,20)
        self.fc3 = nn.Linear(20,10)
        self.fc4 = nn.Linear(10,1)
    def forward(self, xb):
        xb = F.relu(self.fc1(xb))
        xb = F.relu(self.fc2(xb))
        xb = F.relu(self.fc3(xb))
        xb = F.sigmoid(self.fc4(xb))      # batch wise forwarding
        return xb
    
    def training_step(self, batch):
        inputs, targets = batch 
        # Generate predictions
        out = self(inputs)         
        # Calcuate loss
        loss = F.l1_loss(out, targets)  # batch wise training step and loss
        return loss
    
    def validation_step(self, batch):
        inputs, targets = batch
        # Generate predictions
        out = self(inputs)
        # Calculate loss
        loss =F.l1_loss(out, targets)       # batch wise validation and loss    
        return {'val_loss': loss.detach()}
        
    def validation_epoch_end(self, outputs):
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()   # Combine val losses of all batches as average
        return {'val_loss': epoch_loss.item()}
    
    def epoch_end(self, epoch, result, num_epochs):
        # Print result every 20th epoch
        if (epoch+1) % 100 == 0 or epoch == num_epochs-1:
            print("Epoch [{}], val_loss: {:.4f}".format(epoch+1, result['val_loss']))

In [46]:
class PrepLayer(nn.Module):
        def __init__(self,positionalLayer,d_model,max_size):
            super(PrepLayer,self).__init__()
            self.borderLayer = torch.zeros(1,1,768)
            self.d_model = d_model
            self.max_size = max_size
            self.positionalEncoding = positionalLayer
        def forward(self,StudAns,RefAns):
            RefAns = torch.add(RefAns,1)
            FinalRep = torch.cat((StudAns,self.borderLayer),dim=1)
            FinalRep = torch.cat((FinalRep,RefAns),dim = 1)
            FinalRep = self.positionalEncoding(FinalRep)
            FinalRep = FinalRep.flatten(start_dim=1, end_dim=2)
            self.padValue = torch.zeros(1,self.d_model*self.max_size*2+1 - FinalRep.shape[1])
            FinalRep = torch.cat((FinalRep,self.padValue),dim = 1)
            return FinalRep

In [50]:
class GetMarks(nn.Module):
    def __init__(self):
        super(GetMarks,self).__init__()
    def forward(self,grade,full_marks):
        return grade*full_marks

In [30]:
class GradingModule(nn.Module):
    "Uses the Representations to compare and grade them"
    def __init__(self,prepLayer,d_model,max_size):
        super(GradingModule,self).__init__()
        self.max_size = max_size
        self.d_model = d_model
        self.prepLayer = prepLayer
        self.feedForward = FinalFeedForwardLayer(self.d_model*self.max_size*2 + 1)
        self.GetMarks = GetMarks()
        
    def forward(self,stu,ref,full_marks):
        FinalRep = self.prepLayer(stu,ref)
        grade = self.feedForward(FinalRep)
        final_marks = self.GetMarks(grade,full_marks)
        return final_marks
        
        
        

In [39]:
class Upgrader(nn.Module):
    def __init__(self,Representation_Module, Grading_Module):
        super(Upgrader,self).__init__()
        self.RepModule = Representation_Module
        self.GradModule = Grading_Module
    def forward(self, StudentAns,Question,RefAnswer,full_marks):
        stu_rep,ref_rep = self.RepModule( StudentAns,Question,RefAnswer)
        grad = self.GradModule(stu_rep,ref_rep,full_marks)
        return grad

In [32]:
def makeModel(emb_dim = 768,heads = 3,hid_lay_dim = 2304, N = 6,max_size = 100):
    c = copy.deepcopy
    attn = MultiHeadedAttention(h=6,d_model=emb_dim)
    ff = FeedForwardLayer(emb_dim,hid_lay_dim)
    embedding_layer = BertEmbedding()
    positionalLayer = PositionalEncoding(d_model=emb_dim)
    model = Upgrader(
                RepresentationModule(
                    EncoderModule( EncoderBlock( c(attn), c(ff), emb_dim), N),
                    embedding_layer)
                ,GradingModule(
                    PrepLayer(d_model=emb_dim,positionalLayer=positionalLayer,max_size=max_size),
                    d_model=emb_dim,
                    max_size=max_size
                ))
    return model

In [47]:
    model = makeModel()

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.predictions.transform.dense.weight', 'cls.seq_relationship.bias', 'cls.seq_relationship.weight', 'cls.predictions.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.decoder.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [36]:
student_ans = "Sky is red"
question = "What is the colour of the sky?"
reference_ans ="Sky appears blue"
full_marks = 3

In [48]:
model(student_ans,question,reference_ans,full_marks)

0.4714689254760742
torch.Size([1, 3, 1])
torch.Size([1, 3, 768])
torch.Size([1, 3, 1])
torch.Size([1, 3, 1])
torch.Size([1, 3, 768])
torch.Size([1, 3, 1])
torch.Size([1, 3, 1])
torch.Size([1, 3, 768])
torch.Size([1, 3, 1])
torch.Size([1, 3, 1])
torch.Size([1, 3, 768])
torch.Size([1, 3, 1])
torch.Size([1, 3, 1])
torch.Size([1, 3, 768])
torch.Size([1, 3, 1])
torch.Size([1, 3, 1])
torch.Size([1, 3, 768])
torch.Size([1, 3, 1])
torch.Size([1, 3, 1])
torch.Size([1, 3, 768])
torch.Size([1, 3, 1])
torch.Size([1, 3, 1])
torch.Size([1, 3, 768])
torch.Size([1, 3, 1])
torch.Size([1, 3, 1])
torch.Size([1, 3, 768])
torch.Size([1, 3, 1])
torch.Size([1, 3, 1])
torch.Size([1, 3, 768])
torch.Size([1, 3, 1])
torch.Size([1, 3, 1])
torch.Size([1, 3, 768])
torch.Size([1, 3, 1])
torch.Size([1, 3, 1])
torch.Size([1, 3, 768])
torch.Size([1, 3, 1])
1.8134639263153076
torch.Size([1, 7, 768])
torch.Size([1, 7, 768])




AttributeError: 'int' object has no attribute 'full_marks'