In [5]:
import torch
from torch.nn import MSELoss
import spacy
from nltk.corpus import wordnet as wn
import numpy as np
import re
import json

In [19]:
def get_batches(data, batch_size):
    
    no_of_batches = len(data) // batch_size
    
    for n in range(0, len(data), no_of_batches):
        
        try:
            
            x1 = data.sent_1.iloc[n:n + no_of_batches].values
            x2 = data.sent_2.iloc[n:n + no_of_batches].values
            Y = data.score.iloc[n:n + no_of_batches].values
        
        except IndexError:
            
            x1 = data.sent_1.iloc[n:].values
            x2 = data.sent_2.iloc[n:].values
            Y = data.score.iloc[n:].values
    
    return x1, x2, Y

In [24]:
class TextSimilarity:
    
    def __init__(self, tags_dict = None, correlation_matrix = None):
        
        self._tags = self._get_tags_dict() if tags_dict is None else tags_dict
        self._no_of_tags = len(self._tags) 
        self._tag_correlation_matrix = np.identity(self._no_of_tags) if correlation_matrix is None else correlation_matrix
        self._parser = spacy.load("en")
        
    def _get_tags_dict(self):
        
        with open("data/tags.json","r") as fl:
            tags = json.load(fl)
            
        return tags
    
    def _similarity_word(self, pair_A, pair_B):

        #getting head and dependent texts 
        head_a, head_b = pair_A[0].text, pair_B[0].text
        dep_a, dep_b = pair_A[2].text, pair_B[2].text

        if head_a == head_b:
            head = 1
        else:
            try:
                #WordNet synsets for heads
                head_a, head_b = wn.synsets(head_a)[0], wn.synsets(head_b)[0]

                #path based similarity (Li et. al) for head
                head = head_a.path_similarity(head_b)

                head = 0 if head is None else head  

            except Exception:
                head = 0

        if dep_a == dep_b:
            dep = 1
        else:
            try:
                #WordNet synsets for dependent
                dep_a, dep_b = wn.synsets(dep_a)[0], wn.synsets(dep_b)[0]

                #path based similarity (Li et. al) for dependent
                dep = dep_a.path_similarity(dep_b)

                dep = 0 if dep is None else dep

            except Exception:
                dep = 0     

        return head + dep

    def _similarity_tag(self, tag_a, tag_b):
        
        tag_a_id, tag_b_id = self._tags[tag_a], self._tags[tag_b] 
        score = self._tag_correlation_matrix[tag_a_id,tag_b_id]
        
        return score
    
    def semantic_similarity(self, documents_1, documents_2):
        
        #checking the sizes of both documents
        assert len(documents_1) == len(documents_2), "Size of both lists should be same."
        
        #scores vector
        scores = torch.zeros([len(documents_1),],dtype=torch.double)
        
        i = 0
            
        for document_1, document_2 in zip(documents_1,documents_2):
            
            #parsing documets using spaCy English language parser
            tokens_1,tokens_2 = self._parser(document_1), self._parser(document_2)

            #seperating dependency pairs and tags from tokens
            pairs_1 = [(token.head,token.dep_,token) for token in tokens_1]
            pairs_2 = [(token.head,token.dep_,token) for token in tokens_2]

            score = 0

            #calculating score 
            for pair_A in pairs_1:

                for pair_B in pairs_2:

                    score += self._similarity_word(pair_A, pair_B) * self._similarity_tag(pair_A[1], pair_B[1])

            #averaging score 
            score = score / (len(tokens_1) + len(tokens_2))
            
            scores[i] += score
            
            i += 1

        return scores 

In [30]:
sim = TextSimilarity()
sim._tag_correlation_matrix = torch.from_numpy(sim._tag_correlation_matrix)
sim._tag_correlation_matrix.requires_grad = True
sent_1 = ["he is boy","it is dog"]
sent_2 = ["he is girl","it is cat"]
score = sim.semantic_similarity(sent_1,sent_2)
print(score)
sim._tag_correlation_matrix

tensor([0.8611, 0.8667], dtype=torch.float64, grad_fn=<CopySlices>)


tensor([[1., 0., 0.,  ..., 0., 0., 0.],
        [0., 1., 0.,  ..., 0., 0., 0.],
        [0., 0., 1.,  ..., 0., 0., 0.],
        ...,
        [0., 0., 0.,  ..., 1., 0., 0.],
        [0., 0., 0.,  ..., 0., 1., 0.],
        [0., 0., 0.,  ..., 0., 0., 1.]],
       dtype=torch.float64, requires_grad=True)

In [31]:
loss = MSELoss()
optimizer = torch.optim.Adam([sim._tag_correlation_matrix], lr=0.001)
target = torch.DoubleTensor([1.,1.])
target.requires_grad = True
loss_score = loss(score, target)
print(loss_score.item())
loss_score.backward()
print(sim._tag_correlation_matrix.grad)
optimizer.step()
sim._tag_correlation_matrix

0.03706790123456788
tensor([[-0.1815,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        ...,
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000]],
       dtype=torch.float64)


tensor([[1.0010, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
        [0.0000, 1.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
        [0.0000, 0.0000, 1.0000,  ..., 0.0000, 0.0000, 0.0000],
        ...,
        [0.0000, 0.0000, 0.0000,  ..., 1.0000, 0.0000, 0.0000],
        [0.0000, 0.0000, 0.0000,  ..., 0.0000, 1.0000, 0.0000],
        [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 1.0000]],
       dtype=torch.float64, requires_grad=True)

In [None]:
def train(data, model, epochs = 25, lr = 0.001, validation_thresh = 0.2, batch_size = 10, print_every = 5):
    
    model._tag_correlation_matrix = torch.from_numpy(model._tag_correlation_matrix)
    model._tag_correlation_matrix.requires_grad = True
    
    criterion = MSELoss()
    optimizer = torch.optim.Adam([model._tag_correlation_matrix], lr=lr)
    
    train_data = data.iloc[:validation_thresh]
    valid_data = data.iloc[validation_thresh:]
    
    train_losses = []
    valid_losses = []
    count = 0
    
    for epoch in range(epochs):
        
        losses = []
        
        for x1, x2, Y in get_batches(train_data, batch_size):
            
            count += 1
            
            scores = model.semantic_similarity(x1, x2)

            loss = criterion(scores, Y)
            
            losses.append(loss.item())
            
            loss.backward()
            
            optimizer.step()
            
            
            if count % print_every == 0:
                
                train_losses.append(np.mean(losses))
                losses = []
                
                for x1, x2, Y in get_batches(valid_data, batch_size):
                    
                    scores = model.semantic_similarity(x1, x2)

                    loss = criterion(scores, Y)

                    losses.append(loss.item())
                    
                valid_losses.append(np.mean(losses))
                
                print(f"{count} {epoch}/{epochs}\ttraining loss:{train_losses[-1]}\tvalididation loss:{valid_losses[-1]}")
            