# Using CNNs

In [1256]:
# Imports
import re
import xml
import xml.etree.ElementTree as ET

import torch
import torch.nn as nn
from torch.nn.functional import one_hot
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from sklearn.model_selection import train_test_split

from tqdm.auto import tqdm

## Helper Functions 
- `parse_review` : parses the `.xml` file (dataset) for a given language domain pair and extracts all reviews along with the aspect and sentiment
- `remove_non_alpha` : for all english sentences, removes all non alphabet characters
- `pad_collate` : while loading the data as batches into the dataloader, this deals with the varying sequence lengths (by either padding or truncating)
- `initialize_random_embeddings` : used to create aspect embeddings

In [1257]:
# Helper Functions
def parse_review(review):
    return list(map(lambda sen: (sen.find('text').text,\
                    tuple(map(lambda op: (op.attrib["category"],op.attrib["polarity"]),sen.find("Opinions").findall('Opinion')))),\
                    list(filter(lambda sen: sen.find("Opinions"), review.find("sentences").findall("sentence")))))

def remove_non_alpha(sentence):
    cleaned_sentence = re.sub(r'[^a-zA-Z\s]', '', sentence)
    return cleaned_sentence

def get_all_entities_attributes(all_aspects):
    all_entities = []
    all_attributes = []
    for aspect in all_aspects:
        entity, attribute = aspect.split("#")
        all_entities.append(entity)
        all_attributes.append(attribute)
    return all_entities, all_attributes

def initialize_random_embeddings(entities, attributes, embedding_dim):
    entity_embeddings = nn.Embedding(len(entities), embedding_dim)
    attribute_embeddings = nn.Embedding(len(attributes), embedding_dim)
    return entity_embeddings, attribute_embeddings

def get_aspect_embedding(aspect, entity_embeddings, attribute_embeddings):
    entity, attribute = aspect.split("#")
    entity_embedding = entity_embeddings(torch.LongTensor([all_entities.index(entity)]))
    attribute_embedding = attribute_embeddings(torch.LongTensor([all_attributes.index(attribute)]))
    aspect_embedding = (entity_embedding + attribute_embedding) / 2
    return aspect_embedding

# Aspect Category Detection
- In this task, we pass to our CNN model the embedded sentence as input and the aspect associated with each sentence as the label

## Data Preprocessing
- We use our helper functions to parse the XML file, tokenise all the sentences and create vocabularies (for words and for aspects) 

In [1258]:
# Getting all reviews
tree = ET.parse("./datasets/english-restaurants.xml")
root = tree.getroot()
all_reviews = list(map(parse_review, root.findall("Review")))
train_reviews, test_reviews = train_test_split(total_reviews, train_size=0.95)

In [1259]:
# Getting the sentences and aspects
train_sentences, test_sentences = [], []
train_aspects, test_aspects = [], []

# Train 
for review in train_reviews:
    for sentence in review:
        cleaned_sentence = remove_non_alpha(sentence[0])
        if (len(cleaned_sentence) != 0):
            train_sentences.append(cleaned_sentence)
            train_aspects.append(sentence[1][0][0])

# Test
for review in test_reviews:
    for sentence in review:
        cleaned_sentence = remove_non_alpha(sentence[0])
        if (len(cleaned_sentence) != 0):
            test_sentences.append(cleaned_sentence)
            test_aspects.append(sentence[1][0][0])

In [1260]:
# Tokenising sentences and creating the vocabularies
tokenization = lambda x: x.split()

# for sentences
unique_tokens = set([token for sentence in all_sentences for token in tokenization(sentence)])
vocab = {token: idx for idx, token in enumerate(unique_tokens)}

# for aspects
unique_aspects = set(all_aspects)
aspect_vocab = {aspect: idx for idx, aspect in enumerate(unique_aspects)}
aspect_vocab_reverse = {v: k for k, v in aspect_vocab.items()}
num_unique_aspects = len(unique_aspects)

## Dataset and Dataloader
- We define a custom dataset class to load our tokenised sentences, along with the aspect.

In [1261]:
class AspectExtractionDataset(Dataset):
    def __init__(self, sentences, aspects, tokenizer, vocab, aspect_vocab):
        self.sentences = sentences
        self.aspects = aspects
        self.tokenizer = tokenizer
        self.vocab = vocab
        self.aspect_vocab = aspect_vocab

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, index):
        sentence = self.tokenizer(self.sentences[index])
        aspect = [self.aspect_vocab[self.aspects[index]]]
        return sentence, aspect

In [1262]:
def pad_collate_aspect_extraction(batch):
    sentences, aspects = zip(*batch)
    padded_sentences = torch.nn.utils.rnn.pad_sequence([torch.tensor([vocab[token] for token in sentence]) for sentence in sentences], batch_first=True)
    one_hot_aspects = one_hot(torch.tensor(aspects), num_classes=num_unique_aspects)
    return padded_sentences, one_hot_aspects.float()

In [1263]:
# Loading the dataset and dataloader

# Training
BATCH_SIZE = 1
aspect_extraction_train_dataset = AspectExtractionDataset(train_sentences, train_aspects, tokenization, vocab, aspect_vocab)
aspect_extraction_train_dataloader = DataLoader(aspect_extraction_train_dataset, batch_size=BATCH_SIZE, collate_fn=pad_collate_aspect_extraction, shuffle=True)

# Testing
# aspect_extraction_test_dataset = AspectExtractionDataset(test_sentences, test_aspects, tokenization, vocab, aspect_vocab)
# aspect_extraction_test_dataloader = DataLoader(aspect_extraction_test_dataset, batch_size=BATCH_SIZE, collate_fn=pad_collate_aspect_extraction, shuffle=True)

## CNN Model
- Here, we use a simple CNN to learn and predict the aspects for a given sentence

In [1264]:
class AspectExtractionCNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, aspect_vocab_size):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.conv1 = nn.Conv1d(embedding_dim, 128, kernel_size=3, padding=3)
        self.conv2 = nn.Conv1d(128, 512, kernel_size=3, padding=3)
        self.fc = nn.Linear(512, aspect_vocab_size)

    def forward(self, x):
        x = self.embedding(x.long())
        x = x.permute(0, 2, 1)
        x = self.conv2(self.conv1(x))
        x = torch.max(x, dim=2)[0]
        x = self.fc(x)
        return x

## Train and Eval
- We define functions to train the model and evaluate it by making predictions

In [1265]:
def train(model, dataloader, criterion, optimizer, device):
    model.train()
    for inputs, targets in dataloader:
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), targets.squeeze())
        loss.backward()
        optimizer.step()
    return loss.item()

In [1267]:
def predict_aspects(outputs, aspect_vocab, threshold):
    outputs = torch.softmax(outputs, dim=1)
    predicted_aspects = [aspect for aspect, prob in aspect_vocab.items() if outputs[0][prob] >= threshold]
    return predicted_aspects

In [1270]:
# Training the model

# Hyperparameters
VOCAB_SIZE = len(vocab)
EMBEDDING_DIM = 50
ASPECT_VOCAB_SIZE = len(aspect_vocab)
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
LEARNING_RATE = 0.001
NUM_EPOCHS = 10

model_aspect_extraction = AspectExtractionCNN(VOCAB_SIZE, EMBEDDING_DIM, ASPECT_VOCAB_SIZE).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_aspect_extraction.parameters(), lr=LEARNING_RATE)

for epoch in tqdm(range(NUM_EPOCHS), desc="Training Epochs"):
    epoch_loss = train(model_aspect_extraction, aspect_extraction_train_dataloader, criterion, optimizer, device)
    print(f'Epoch {epoch+1}/{NUM_EPOCHS}, Loss: {epoch_loss:4f}')

Training Epochs:   0%|          | 0/10 [00:00<?, ?it/s]

Epoch 1/10, Loss: 1.094722
Epoch 2/10, Loss: 0.664143
Epoch 3/10, Loss: 0.045833
Epoch 4/10, Loss: 0.000037
Epoch 5/10, Loss: 0.000253
Epoch 6/10, Loss: 0.106663
Epoch 7/10, Loss: 5.612749
Epoch 8/10, Loss: 0.000566
Epoch 9/10, Loss: 0.000001
Epoch 10/10, Loss: -0.000000


In [1272]:
# Evaluation
model.eval()
test_dataset = AspectExtractionDataset(test_sentences, test_aspects, tokenization, vocab, aspect_vocab)
test_dataloader = DataLoader(test_dataset, batch_size=1, collate_fn=pad_collate_aspect_extraction, shuffle=True)
correct = 0
total = 0
for inputs, targets in test_dataloader:
    inputs = inputs.to(device)
    outputs = model_aspect_extraction(inputs)
    predicted_aspects = predict_aspects(outputs, aspect_vocab, threshold=0.25)
    print(f"True Aspect(s) : {aspect_vocab_reverse[torch.argmax(targets).item()]} | Predicted Aspect(s): {predicted_aspects}")
    if (predicted_aspects[0] == aspect_vocab_reverse[torch.argmax(targets).item()]):
        correct += 1
    total +=1

True Aspect(s) : FOOD#QUALITY | Predicted Aspect(s): ['FOOD#QUALITY']
True Aspect(s) : RESTAURANT#GENERAL | Predicted Aspect(s): ['AMBIENCE#GENERAL']
True Aspect(s) : SERVICE#GENERAL | Predicted Aspect(s): ['AMBIENCE#GENERAL']
True Aspect(s) : FOOD#QUALITY | Predicted Aspect(s): ['FOOD#QUALITY']
True Aspect(s) : RESTAURANT#GENERAL | Predicted Aspect(s): ['AMBIENCE#GENERAL']
True Aspect(s) : SERVICE#GENERAL | Predicted Aspect(s): ['RESTAURANT#GENERAL']
True Aspect(s) : AMBIENCE#GENERAL | Predicted Aspect(s): ['AMBIENCE#GENERAL']
True Aspect(s) : FOOD#QUALITY | Predicted Aspect(s): ['FOOD#STYLE_OPTIONS']
True Aspect(s) : FOOD#PRICES | Predicted Aspect(s): ['FOOD#QUALITY']
True Aspect(s) : AMBIENCE#GENERAL | Predicted Aspect(s): ['AMBIENCE#GENERAL']
True Aspect(s) : RESTAURANT#GENERAL | Predicted Aspect(s): ['RESTAURANT#GENERAL']
True Aspect(s) : SERVICE#GENERAL | Predicted Aspect(s): ['RESTAURANT#GENERAL', 'SERVICE#GENERAL']
True Aspect(s) : SERVICE#GENERAL | Predicted Aspect(s): ['SERVI

In [1273]:
print(f"Accuracy : {correct*100/total:.4f}%")

Accuracy : 58.0247%


# Sentiment Polarity
- In this task, we pass to our model the sentence embeddings and along with it the aspect embeddings. The associated sentiment is passed as the label. 

## Data Preprocessing

In [1274]:
# Getting all reviews
tree = ET.parse("./datasets/english-restaurants.xml")
root = tree.getroot()
all_reviews = list(map(parse_review, root.findall("Review")))
train_reviews, test_reviews = train_test_split(total_reviews, train_size=0.95)

In [1275]:
# Getting the sentences and aspects
train_sentences, test_sentences = [], []
train_aspects, test_aspects = [], []
train_sentiments, test_sentiments = [], []

# Train 
for review in train_reviews:
    for sentence in review:
        cleaned_sentence = remove_non_alpha(sentence[0])
        if (len(cleaned_sentence) != 0):
            train_sentences.append(cleaned_sentence)
            train_aspects.append(sentence[1][0][0])
            if (sentence[1][0][-1] == 'positive'):
                train_sentiments.append(1)
            elif (sentence[1][0][-1] == 'negative'):
                train_sentiments.append(2)
            elif (sentence[1][0][-1] == 'neutral'):
                train_sentiments.append(3)

# Test
for review in test_reviews:
    for sentence in review:
        cleaned_sentence = remove_non_alpha(sentence[0])
        if (len(cleaned_sentence) != 0):
            test_sentences.append(cleaned_sentence)
            test_aspects.append(sentence[1][0][0])
            if (sentence[1][0][-1] == 'positive'):
                train_sentiments.append(1)
            elif (sentence[1][0][-1] == 'negative'):
                train_sentiments.append(2)
            elif (sentence[1][0][-1] == 'neutral'):
                train_sentiments.append(3)

In [1286]:
idx_to_sentiment = {0: 'positive', 1: 'negative', 2: 'neutral'}

In [1276]:
# Tokenising sentences and creating the vocabularies
tokenization = lambda x: x.split()

# for sentences
unique_tokens = set([token for sentence in train_sentences for token in tokenization(sentence)])
vocab = {token: idx for idx, token in enumerate(unique_tokens)}

# tokenising sentences
tokenised_sentences_train = [torch.tensor([vocab[token] for token in tokenization(sentence)]) for sentence in train_sentences]

In [1277]:
all_entities, all_attributes = get_all_entities_attributes(train_aspects)
entity_embeddings, attribute_embeddings = initialize_random_embeddings(all_entities, all_attributes, embedding_dim=15)
embedded_aspects = {}
for aspect in train_aspects:
    aspect_embedding = get_aspect_embedding(aspect, entity_embeddings, attribute_embeddings)
    embedded_aspects[aspect] = aspect_embedding

In [1278]:
tokenised_sentences = []
aspect_embeddings = []
for i, sentence in enumerate(train_sentences):
    sent_tokenised = torch.tensor([vocab[token] for token in tokenization(sentence)])
    tokenised_sentences.append(sent_tokenised)
    aspect_embeddings.append(embedded_aspects[train_aspects[i]])

## Dataset and Dataloader

In [1279]:
class SentimentPolarityDataset(Dataset):
    def __init__(self, tokenised_sentences, aspect_embeddings, sentiments):
        self.tokenised_sentences = tokenised_sentences
        self.aspect_embeddings = aspect_embeddings
        self.sentiments = sentiments
        self.label_encoder = LabelEncoder()
        self.sentiments_labels_encoded = self.label_encoder.fit_transform(self.sentiments)

    def __len__(self):
        return len(self.tokenised_sentences)

    def __getitem__(self, index):
        return {
            'tokenised_sentence': self.tokenised_sentences[index],
            'aspect_embedding': self.aspect_embeddings[index][0],
            'label': self.sentiments_labels_encoded[index]
        }
sentiment_polarity_dataset = SentimentPolarityDataset(tokenised_sentences, aspect_embeddings, train_sentiments)

In [1280]:
def pad_collate_sentiment_polarity(batch):
    sentences = [torch.LongTensor(item['tokenised_sentence']) for item in batch]
    aspects = [item['aspect_embedding'] for item in batch]
    labels = torch.LongTensor([item['label'] for item in batch])
    padded_sentences = pad_sequence(sentences, batch_first=True, padding_value=0)
    return {
        'padded_sentence': padded_sentences,
        'aspect_embedding': torch.stack(aspects),
        'label': labels
    }
# Create DataLoader instances with collate function
sentiment_polarity_dataloader = DataLoader(sentiment_polarity_dataset, batch_size=1, shuffle=True, collate_fn=pad_collate_sentiment_polarity)

## CNN Model

In [1281]:
class SentimentPolarityCNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, aspect_input_size, num_classes):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.aspect_linear = nn.Linear(aspect_input_size, embedding_dim)
        self.conv1 = nn.Conv1d(embedding_dim, 128, kernel_size=3, padding=1)
        self.fc = nn.Linear(128, num_classes)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x_sentence, x_aspect):
        x_sentence = self.embedding(x_sentence)
        x_sentence = x_sentence.permute(0, 2, 1)
        x_aspect = self.aspect_linear(x_aspect)
        x_aspect = x_aspect.unsqueeze(2)
        x_concat = torch.cat((x_sentence, x_aspect), dim=2)
        x_conv = self.conv1(x_concat)
        x_pool = torch.max(x_conv, dim=2)[0]
        output = self.fc(x_pool)
        output = self.softmax(output)
        return output

## Train and Eval

In [1283]:
vocab_size = len(vocab)
embedding_dim = 50 
aspect_input_size = 15
num_classes = len(sentiment_polarity_dataset.label_encoder.classes_)
model_sentiment_polarity = SentimentPolarityCNN(vocab_size, embedding_dim, aspect_input_size, num_classes)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_sentiment_polarity.parameters(), lr=0.001)

num_epochs = 15
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Performance does NOT improve beyond 10 epochs -> early stopping
for epoch in tqdm(range(num_epochs), desc="Training Epochs"):
    for batch in train_dataloader:
        padded_sentence, aspect_embedding, labels = (batch['padded_sentence'],batch['aspect_embedding'],batch['label'])
        padded_sentence, aspect_embedding, labels = (padded_sentence.to(device),aspect_embedding.to(device),labels.to(device))
        optimizer.zero_grad()
        outputs = model_sentiment_polarity(padded_sentence, aspect_embedding)

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item():.4f}")

Training Epochs:   0%|          | 0/15 [00:00<?, ?it/s]

Epoch 1/15, Loss: 1.2227
Epoch 2/15, Loss: 1.1146
Epoch 3/15, Loss: 0.7416
Epoch 4/15, Loss: 0.5937
Epoch 5/15, Loss: 0.6082
Epoch 6/15, Loss: 0.5591
Epoch 7/15, Loss: 0.5654
Epoch 8/15, Loss: 0.5571
Epoch 9/15, Loss: 0.5595
Epoch 10/15, Loss: 0.5574
Epoch 11/15, Loss: 0.5534
Epoch 12/15, Loss: 0.5545
Epoch 13/15, Loss: 0.5532
Epoch 14/15, Loss: 0.5539
Epoch 15/15, Loss: 0.5531


In [1289]:
correct = 0
total = 0
for batch in train_dataloader:
    padded_sentence, aspect_embedding, labels = (batch['padded_sentence'],batch['aspect_embedding'],batch['label'])
    padded_sentence, aspect_embedding, labels = (padded_sentence.to(device),aspect_embedding.to(device),labels.to(device))
    outputs = model_sentiment_polarity(padded_sentence, aspect_embedding)
    prediction = idx_to_sentiment[torch.argmax(outputs).item()]
    label = idx_to_sentiment[labels.item()]
    print(f"Actual Sentiment : {label} | Predicted Sentiment : {prediction}")
    if (prediction == label):
        correct += 1
    total += 1

Actual Sentiment : positive | Predicted Sentiment : positive
Actual Sentiment : positive | Predicted Sentiment : positive
Actual Sentiment : negative | Predicted Sentiment : negative
Actual Sentiment : negative | Predicted Sentiment : negative
Actual Sentiment : neutral | Predicted Sentiment : neutral
Actual Sentiment : negative | Predicted Sentiment : negative
Actual Sentiment : neutral | Predicted Sentiment : neutral
Actual Sentiment : neutral | Predicted Sentiment : neutral
Actual Sentiment : negative | Predicted Sentiment : negative
Actual Sentiment : positive | Predicted Sentiment : positive
Actual Sentiment : negative | Predicted Sentiment : negative
Actual Sentiment : negative | Predicted Sentiment : negative
Actual Sentiment : positive | Predicted Sentiment : positive
Actual Sentiment : positive | Predicted Sentiment : positive
Actual Sentiment : negative | Predicted Sentiment : negative
Actual Sentiment : negative | Predicted Sentiment : negative
Actual Sentiment : neutral | P

In [1290]:
print(f"Accuracy : {correct*100/total:.4f}%")

Accuracy : 100.0000%
