<a href="https://colab.research.google.com/github/Skypouk/NLP/blob/main/Copy_of_Convolutional_Sentiment_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Checking the existence of the tools and packages needed...

In [1]:
!pip3 install torch torchvision torchtext



In [2]:
! python3 -m spacy download en

[38;5;2m✔ Download and installation successful[0m
You can now load the model via spacy.load('en_core_web_sm')
[38;5;2m✔ Linking successful[0m
/usr/local/lib/python3.6/dist-packages/en_core_web_sm -->
/usr/local/lib/python3.6/dist-packages/spacy/data/en
You can now load the model via spacy.load('en')


In [3]:
!pip install msgpack==0.5.6

Collecting msgpack==0.5.6
[?25l  Downloading https://files.pythonhosted.org/packages/22/4e/dcf124fd97e5f5611123d6ad9f40ffd6eb979d1efdc1049e28a795672fcd/msgpack-0.5.6-cp36-cp36m-manylinux1_x86_64.whl (315kB)
[K     |█                               | 10kB 22.4MB/s eta 0:00:01[K     |██                              | 20kB 27.8MB/s eta 0:00:01[K     |███▏                            | 30kB 29.2MB/s eta 0:00:01[K     |████▏                           | 40kB 32.5MB/s eta 0:00:01[K     |█████▏                          | 51kB 33.9MB/s eta 0:00:01[K     |██████▎                         | 61kB 35.6MB/s eta 0:00:01[K     |███████▎                        | 71kB 36.2MB/s eta 0:00:01[K     |████████▎                       | 81kB 23.5MB/s eta 0:00:01[K     |█████████▍                      | 92kB 21.9MB/s eta 0:00:01[K     |██████████▍                     | 102kB 23.3MB/s eta 0:00:01[K     |███████████▍                    | 112kB 23.3MB/s eta 0:00:01[K     |████████████▌        

# 4 - Convolutional Sentiment Analysis

In this notebook, we will be using a *convolutional neural network* (CNN) to conduct sentiment analysis.


Traditionally, CNNs are used to analyse images and are made up of one or more *convolutional* layers, followed by one or more linear layers. The convolutional layers use filters (also called *kernels* or *receptive fields*) which scan across an image and produce a processed version of the image. This processed version of the image can be fed into another convolutional layer or a linear layer. Each filter has a shape, e.g. a 3x3 filter covers a 3 pixel wide and 3 pixel high area of the image, and each element of the filter has a weight associated with it, the 3x3 filter would have 9 weights. In traditional image processing these weights were specified by hand by engineers, however the main advantage of the convolutional layers is that these weights are learned via backpropagation. 

The intuitive idea behind learning the weights is that your convolutional layers act like *feature extractors*, extracting parts of the image that are most important for your CNN's goal, e.g. if using a CNN to detect faces in an image, the CNN may be looking for features such as the existance of a nose, mouth or a pair of eyes in the image.

So why use CNNs on text? In the same way that a 3x3 filter can look over a patch of an image, a 1x2 filter can look over a 2 sequential words in a piece of text, i.e. a bi-gram. In the previous tutorial we looked at the FastText model which used bi-grams by explicitly adding them to the end of a text, in this CNN model we will instead use multiple filters of different sizes which will look at the bi-grams (a 1x2 filter), tri-grams (a 1x3 filter) and n-grams (a 1x$n$ filter) within the text.

The intuition here is that the appearance of certain bi-grams, tri-grams and n-grams within the review will be a good indication of the final sentiment.

## Preparing Data

Let's prepare the data. 


In [4]:
import torch
from torchtext import data
from torchtext import datasets
import random

SEED = 1234

torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)

TEXT = data.Field(tokenize='spacy')
LABEL = data.LabelField()

train, test = datasets.IMDB.splits(TEXT, LABEL)

train, valid = train.split(random_state=random.seed(SEED))


aclImdb_v1.tar.gz:   0%|          | 0.00/84.1M [00:00<?, ?B/s]

downloading aclImdb_v1.tar.gz


aclImdb_v1.tar.gz: 100%|██████████| 84.1M/84.1M [00:03<00:00, 22.9MB/s]


In [5]:
len(train[0].text)

91

Build the vocab and load the pre-trained word embeddings.

In [6]:
TEXT.build_vocab(train, max_size=25000, vectors="glove.6B.100d")
LABEL.build_vocab(train)

.vector_cache/glove.6B.zip: 862MB [07:32, 1.91MB/s]                          
100%|█████████▉| 399154/400000 [00:16<00:00, 23594.57it/s]

In [7]:
test_w=TEXT.vocab.itos[9205]
test_w2=TEXT.vocab.itos[9206]

print(test_w)
print(test_w2)

encouraged
enduring


In [8]:
from torch.nn.functional import cosine_similarity
test_v=TEXT.vocab.vectors[9205].unsqueeze(0)
test_v2=TEXT.vocab.vectors[9206]

cosine_similarity(test_v,TEXT.vocab.vectors,dim=1).sort()




torch.return_types.sort(values=tensor([-0.4001, -0.3987, -0.3701,  ...,  0.7593,  0.7693,  1.0000]), indices=tensor([20166, 14299,  5204,  ...,  5456, 11636,  9205]))

In [9]:
print(TEXT.vocab.itos[538])

amazing


As before, we create the iterators.

In [10]:
BATCH_SIZE = 64

train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    (train, valid, test), 
    batch_size=BATCH_SIZE, 
    sort_key=lambda x: len(x.text), 
    repeat=False,
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu'))

## Build the Model

Now to build our model.

The first major hurdle is visualizing how CNNs are used for text. Images are typically 2 dimensional (we'll ignore the fact that there is a third "colour" dimension for now) whereas text is 1 dimensional. However, we know that the first step is converting the words into vectors. This is how we can visualize our words in 2 dimensions, each word along one axis and the elements of vectors aross the other dimension. Consider the 2 dimensional representation of the embedded sentence below:

![](https://i.imgur.com/ci1h9hv.png)

We can then use a filter that is **[n x emb_dim]**. This will cover $n$ sequential words entirely, as their width will be `emb_dim` dimensions. Consider the image below. Our word vectors are represented in green, here we have 4 words with 5 dimensional embeddings, creating a 4x5 "image". A filter that covers two words at a time (i.e. bi-grams) will be **[2x5]**, shown in yellow. The output of this filter (shown in red) will be a single real number that is the weighted sum of all elements covered by the filter.

![](https://i.imgur.com/QlXduXu.png)

The filter then moves "down" the image (or across the sentence) to cover the next bi-gram and another output is calculated. 

![](https://i.imgur.com/wuA330x.png)

Finally, the filter moves down again and the final output for this filter is calculated.

![](https://i.imgur.com/gi1GaEz.png)


In our model, we will also have different sizes of filters, heights of 3, 4 and 5, with 100 of each of them. The intuition is that we will be looking for the occurence of different tri-grams, 4-grams and 5-grams that are relevant for analysing sentiment of movie reviews.

The next step in our model is to use *pooling* (specifically *max pooling*) on the output of the convolutional layers. We are taking the maximum value over a dimension. Below an example of taking the maximum value (0.9) from the output of the convolutional layer on the example sentence (not shown in the activation function applied to the output of the convolutions).

![](https://i.imgur.com/gzkS3ze.png)

The idea here is that the maximum value is the "most important" feature for determining the sentiment of the review, this corresponds to the "most important" n-gram within the review. How do we know what the "most important" n-gram is? Luckily, we don't have to! Through backpropagation, the weights of the filters are changed so that whenever certain n-grams that correspond to a sentiment are seen, the output of the filter is a "high" value. This "high" value then passes through the max pooling layer if it is the maximum value in the output. 

As our model has 100 filters of 3 different sizes, that means we have 300 different n-grams the model thinks are important. We concatenate these together into a single vector and pass them through a linear layer to predict the sentiment. We can think of the weights of this linear layer as "weighting up the evidence" from each of the 300 n-grams and making a final decision. 


In [11]:
import torch.nn as nn

class CNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, dropout):
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.conv_0 = nn.Conv2d(in_channels=1, out_channels=n_filters, kernel_size=(filter_sizes[0],embedding_dim))
        self.conv_1 = nn.Conv2d(in_channels=1, out_channels=n_filters, kernel_size=(filter_sizes[1],embedding_dim))
        self.conv_2 = nn.Conv2d(in_channels=1, out_channels=n_filters, kernel_size=(filter_sizes[2],embedding_dim))
        self.fc = nn.Linear(len(filter_sizes)*n_filters, output_dim)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        
        #x = [sent len, batch size]
        
        x = x.permute(1, 0)
                
        #x = [batch size, sent len]
        
        embedded = self.embedding(x)
                
        #embedded = [batch size, sent len, emb dim]
        
        embedded = embedded.unsqueeze(1)
        
        #embedded = [batch size, 1, sent len, emb dim]
        
        conved_0 = F.relu(self.conv_0(embedded).squeeze(3))
        conved_1 = F.relu(self.conv_1(embedded).squeeze(3))
        conved_2 = F.relu(self.conv_2(embedded).squeeze(3))
            
        #conv_n = [batch size, n_filters, sent len - filter_sizes[n]]
        
        pooled_0 = F.max_pool1d(conved_0, conved_0.shape[2]).squeeze(2)
        pooled_1 = F.max_pool1d(conved_1, conved_1.shape[2]).squeeze(2)
        pooled_2 = F.max_pool1d(conved_2, conved_2.shape[2]).squeeze(2)
        
        #pooled_n = [batch size, n_filters]
        
        cat = self.dropout(torch.cat((pooled_0, pooled_1, pooled_2), dim=1))

        #cat = [batch size, n_filters * len(filter_sizes)]
            
        return self.fc(cat)

We create an instance of our `CNN` class.

In [12]:
INPUT_DIM = len(TEXT.vocab)
EMBEDDING_DIM = 100
N_FILTERS = 100
FILTER_SIZES = [3,4,5]
OUTPUT_DIM = 1
DROPOUT = 0.5

model = CNN(INPUT_DIM, EMBEDDING_DIM, N_FILTERS, FILTER_SIZES, OUTPUT_DIM, DROPOUT)

And load the pre-trained embeddings

In [13]:
pretrained_embeddings = TEXT.vocab.vectors

model.embedding.weight.data.copy_(pretrained_embeddings)

tensor([[ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [-0.0382, -0.2449,  0.7281,  ..., -0.1459,  0.8278,  0.2706],
        ...,
        [ 0.2879, -0.2096,  0.2155,  ..., -0.0165, -0.3002,  0.5226],
        [-0.4165,  0.0542, -0.1853,  ..., -0.2558, -0.2313,  1.0969],
        [ 0.4557,  0.5966,  0.0917,  ..., -0.6968, -0.2632, -0.4583]])

## Train the Model

Training is the same as before. We initialize the optimizer, loss function (criterion) and place the model and criterion on the GPU (if available)

In [14]:
import torch.optim as optim

optimizer = optim.Adam(model.parameters())

criterion = nn.BCEWithLogitsLoss()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = model.to(device)
criterion = criterion.to(device)

We implement the function to calculate accuracy...

In [15]:
import torch.nn.functional as F

def binary_accuracy(preds, y):
    """
    Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
    """

    #round predictions to the closest integer
    rounded_preds = torch.round(torch.sigmoid(preds))
    correct = (rounded_preds == y.float()).float() #convert into float for division 
    acc = correct.sum()/len(correct)
    return acc

We define a function for training our model...

**Note**: as we are using dropout again, we must remember to use `model.train()` to ensure the dropout is "turned on" while training.

In [16]:
def train_model(model, iterator, optimizer, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for batch in iterator:
        
        optimizer.zero_grad()
        
        predictions = model(batch.text).squeeze(1)
        
        loss = criterion(predictions, batch.label.float())
        
        acc = binary_accuracy(predictions, batch.label)
        
        loss.backward()
        
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

We define a function for testing our model...

**Note**: again, as we are now using dropout, we must remember to use `model.eval()` to ensure the dropout is "turned off" while evaluating.

In [17]:
def evaluate_model(model, iterator, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    with torch.no_grad():
    
        for batch in iterator:

            predictions = model(batch.text).squeeze(1)
            
            loss = criterion(predictions, batch.label.float())
            
            acc = binary_accuracy(predictions, batch.label)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

Finally, we train our model...

In [18]:
  N_EPOCHS = 5
  for epoch in range(N_EPOCHS):

      train_loss, train_acc = train_model(model, train_iterator, optimizer, criterion)
      #valid_loss, valid_acc = evaluate_model(model, valid_iterator, criterion)
      valid_loss=0
      valid_acc=0

      print(f'Epoch: {epoch+1:02}, Train Loss: {train_loss:.3f}, Train Acc: {train_acc*100:.2f}%, Val. Loss: {valid_loss:.3f}, Val. Acc: {valid_acc*100:.2f}%')



100%|█████████▉| 399154/400000 [00:30<00:00, 23594.57it/s]

Epoch: 01, Train Loss: 0.502, Train Acc: 73.99%, Val. Loss: 0.000, Val. Acc: 0.00%
Epoch: 02, Train Loss: 0.310, Train Acc: 86.93%, Val. Loss: 0.000, Val. Acc: 0.00%
Epoch: 03, Train Loss: 0.224, Train Acc: 91.25%, Val. Loss: 0.000, Val. Acc: 0.00%
Epoch: 04, Train Loss: 0.152, Train Acc: 94.36%, Val. Loss: 0.000, Val. Acc: 0.00%
Epoch: 05, Train Loss: 0.093, Train Acc: 96.95%, Val. Loss: 0.000, Val. Acc: 0.00%


...and get our best test accuracy yet! 

In [19]:
test_loss, test_acc = evaluate_model(model, test_iterator, criterion)

print(f'Test Loss: {test_loss:.3f}, Test Acc: {test_acc*100:.2f}%')

Test Loss: 0.304, Test Acc: 88.34%


In [24]:
import spacy
import numpy as np
from IPython.core.display import display,HTML
from torch.nn.functional import cosine_similarity

nlp = spacy.load('en')
eps = np.finfo(np.float32).eps.item()
def predict_sentiment(sentence, explain_scores=True,explain_relative_to=1):
  
    tokenized_sentence = [tok.text for tok in nlp.tokenizer(sentence)]
    indexed_sentence = [TEXT.vocab.stoi[t] for t in tokenized_sentence]
    tensor = torch.LongTensor(indexed_sentence).to(device)
    tensor = tensor.unsqueeze(1)
    prediction = torch.sigmoid(model(tensor))
    
    original_input_embedding,input_grad=get_input_gradients(indexed_sentence,prediction,explain_relative_to)
    
    explanation=get_prediction_explanation(tokenized_sentence,original_input_embedding,input_grad,explain_scores)
      
    return {'tokenized_sentence': tokenized_sentence,'prediction': prediction.item(),'explanation': explanation }

def get_input_gradients(original_sentence,prediction,in_relation_to):
    gradient_truth=torch.Tensor([in_relation_to]).unsqueeze(0)
    if torch.cuda.is_available():
      gradient_truth=gradient_truth.cuda()
    
    loss=criterion(prediction,gradient_truth)
    optimizer.zero_grad()
    loss.backward()
    
    input_grad=torch.Tensor(len(original_sentence),model.embedding.weight.size(1))
    original_input_embedding=torch.Tensor(len(original_sentence),model.embedding.weight.size(1))
    
    for i in range(0,len(original_sentence)):
      original_input_embedding[i]=model.embedding.weight[original_sentence[i]]
      input_grad[i]=model.embedding.weight.grad[original_sentence[i]]
    
    return original_input_embedding,input_grad
    
    

def get_input_scores(input,input_embedding,input_grad):
  
  
  # Take a SGD step using grads
  
  input_after_step=input_embedding-input_grad
  after_grad_norms = torch.norm(input_after_step, 2, 1)
  before_grad_norms = torch.norm(input_embedding, 2, 1)
  variation = after_grad_norms-before_grad_norms
 
  standard_deviation=torch.std(variation)
  mean=torch.mean(variation)
  z_score=(variation-mean)/standard_deviation
 
  return z_score

def old_get_input_scores(input,input_embedding,input_grad):
  
  grad_norms=torch.norm(input_grad,2,1)
  
  return grad_norms/torch.max(grad_norms)

  
def get_prediction_explanation(input,input_embedding,input_grad, explain_scores):
 
  
  input_word_scores=get_input_scores(input,input_embedding,input_grad)
   
  explanation=""
  for i in range(0,len(input)):
    token=input[i]
    if explain_scores:
      str_token="%s (%.3f)"%(token,input_word_scores[i])
    else:
      str_token=token
    
    explanation=explanation+str_token+'&nbsp;</font>'
    if i>0 and i%20==0:
      explanation=explanation+"<br/>"
    
  return {'word_scores': input_word_scores,'input_gradient': input_grad,'textual_explanation':explanation }

  
  

In [25]:
from torch.nn.functional import cosine_similarity

def get_projected_words(word,word_gradient,num_words=1):
  
  word_index=TEXT.vocab.stoi[word]
  word_embedding=TEXT.vocab.vectors[word_index]
  learning_rate=1
  i=0
  result=[]
  
  while i<100000:
    try: 
      word_embedding=word_embedding-learning_rate*word_gradient
    except:
      # We can have a float overflow here if this process gets out of control
      return result
    similarity_value,similarity_index=cosine_similarity(word_embedding.unsqueeze(0),TEXT.vocab.vectors,dim=1).sort(descending=True)
    if similarity_index[0]!=word_index:
      if  similarity_value[0]<0.5:
        break
      
      result.append({'word':TEXT.vocab.itos[similarity_index[0]],'similarity': similarity_value[0]})
      word_index=similarity_index[0]
      learning_rate=1
      if len(result)>=num_words:
        break
      
    i=i+1
    learning_rate=learning_rate*1.1
      
  return result

  
def get_projected_sentence_word(prediction,word):
  
  sentence=prediction['tokenized_sentence']
  word_index_in_sentence=[i for i in range(0,len(sentence)) if sentence[i]==word][0]
  word_gradient=prediction['explanation']['input_gradient'][word_index_in_sentence]
  
  
  return get_projected_words(word,word_gradient,1)
  
  

In [26]:


prediction=predict_sentiment("This is a ridiculous movie and you should never see it.")
print(prediction['prediction'])
display(HTML(prediction['explanation']['textual_explanation']))
print(get_projected_sentence_word(prediction,'ridiculous'))

0.195659339427948


[{'word': 'amazing', 'similarity': tensor(0.6718)}]


In [28]:

prediction=predict_sentiment("By Timandra Harkness It is a glorious film, but you could not make it now. And that is not just my opinion. ")
print(prediction['prediction'])
display(HTML(prediction['explanation']['textual_explanation']))




0.9711049199104309


In [30]:
print(get_projected_sentence_word(prediction,'glorious'))

[{'word': 'splendid', 'similarity': tensor(0.5050)}]


In [31]:
def display_message(message):
  display(HTML(message))
  
def predict_and_make_it_better(text,better_direction=1):
  
  version=0
  word_to_change=None
  better_word=None
  
  while True:
    
    prediction=predict_sentiment(text,explain_scores=False,explain_relative_to=better_direction)
    display_message("<H2> Version "+str(version)+"</H2>")
    if word_to_change!=None:
      display_message("<H3>"+word_to_change+"->"+better_word+"</H3>")
      
    display_message("<H3> Sentiment: "+str(prediction['prediction'])+"+</H3>")
    display(HTML(prediction['explanation']['textual_explanation']))
    
    # Get the word with the highest absolute score
    word_to_change=None
    better_word=None
  
    word_scores=prediction['explanation']['word_scores']
    _,sorted_indices=torch.abs(word_scores).sort(descending=True)
    changed_text=False
    for i in range(0,sorted_indices.size(0)):
      tokenized_sentence=prediction['tokenized_sentence']
      word_to_change=tokenized_sentence[sorted_indices[i]]
      better_words=get_projected_sentence_word(prediction,word_to_change)
      if len(better_words)>0:
        better_word=better_words[0]['word']
        new_tokenized_sentence=[t if t!=word_to_change else better_word for t in tokenized_sentence]
        text=" ".join(new_tokenized_sentence)
        changed_text=True
        break
    
    if not changed_text:
      return
    
    version=version+1
    
               
    
    

In [33]:
predict_and_make_it_better("bad worse stupid not funny",better_direction=1)