In [None]:
# Encoder
class GRUEncoder(nn.Module):
    def __init__(self, hidden_size, embedding):
        super(GRUEncoder, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = embedding
        self.gru = nn.GRU(hidden_size, hidden_size)
        
    def forward(self, input, hidden, aspect):
        word_embedding = self.embedding(input).view(1, 1, -1)
        aspect_embedding = self.embedding(aspect).view(1, 1, -1)
        word_embedding = torch.cat((word_embedding, aspect_embedding), 0)
        output, hidden = self.gru(word_embedding, hidden)
        return output, hidden
    
    def init_hidden(self):
        return torch.zeros(1, 1, self.hidden_size)

# Decoder
class GRUDecoder(nn.Module):
    def __init__(self, hidden_size, output_size, embedding, dropout_p=0.1):
        super(GRUDecoder, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.dropout_p = dropout_p
        self.embedding = embedding
        self.dropout = nn.Dropout(self.dropout_p)
        self.gru = nn.GRU(self.hidden_size, self.hidden_size)
        self.out = nn.Linear(self.hidden_size*2, self.output_size)
    
    def cal_attention(self, hidden, encoder_hiddens):
        attn_weights = F.softmax(torch.bmm(hidden, encoder_hiddens.T.unsqueeze(0)), dim=-1)
        attn_output = torch.bmm(attn_weights, encoder_hiddens.unsqueeze(0))
        concat_output = torch.cat((attn_output[0], hidden[0]), 1)
        return concat_output
    
    def forward(self, input, hidden, encoder_hiddens):
        word_embedding = self.embedding(input).view(1, 1, -1)
        word_embedding = self.dropout(word_embedding)
        _, hidden = self.gru(word_embedding, hidden)
        concat_output = self.cal_attention(hidden, encoder_hiddens)
        output = F.log_softmax(self.out(concat_output), dim=1)
        return output, hidden
    
    def init_hidden(self):
        return torch.zeros(1, 1, self.hidden_size)

In [None]:
# Set the seed for reproducibility
seed = 4012
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)

num_epochs = 200
display_interval = 20
learning_rate = 0.01
hidden_size = 50
embedding = nn.Embedding(vocab_size, hidden_size)

encoder = GRUEncoder(hidden_size, embedding)
decoder = GRUDecoder(hidden_size, vocab_size, embedding, dropout_p=0.1) # vocab_size?

encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)
criterion = nn.NLLLoss()
plot_losses = []
plot_avg_losses = []
total_loss = 0

for epoch in range(1, num_epochs+1):
    random_idx = random.choice(range(len(train_data)))
    x_index = [[idx] for idx in train_x_idx[random_idx]]
    y_index = [[idx] for idx in target_y_idx[random_idx]]
    a_index = train_a_idx[random_idx]
    
    x_tensor = torch.LongTensor(x_index)
    y_tensor = torch.LongTensor(y_index)
    a_tensor = torch.LongTensor([a_index])
    x_length = x_tensor.size(0)
    y_length = y_tensor.size(0)
    
    loss = 0
    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()
    
    # Feed the x_tensor (sentence) into the encoder
    encoder_hiddens = torch.zeros(MAX_LENGTH, encoder.hidden_size) # For attention mechanism
    encoder_hidden = encoder.init_hidden() # Hidden state for encoder
    for i in range(x_length):
        encoder_output, encoder_hidden = encoder(x_tensor[i], encoder_hidden, a_tensor)
        encoder_hiddens[i] = encoder_hidden[0, 0]

    decoder_input = torch.tensor([[word_to_idx["<BOS>"]]]) 
    decoder_hidden = encoder_hidden

    # Feed the y_tensor (polarity) into the decoder with teacher forcing
    for i in range(y_length):
        decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden, encoder_hiddens)
        loss += criterion(decoder_output, y_tensor[i])
        decoder_input = y_tensor[i]

    # Backpropagation
    loss.backward()
    encoder_optimizer.step()
    decoder_optimizer.step()
    
    loss = loss.item() / y_length
    total_loss += loss
    plot_losses.append(loss)
    
    if (epoch+1) % display_interval == 0:
        avg_loss = total_loss / display_interval
        plot_avg_losses.append(avg_loss)
        total_loss = 0
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')

In [None]:
# GRU evaluate
def GRUevaluate(encoder, decoder, sentence, aspect, max_length=MAX_LENGTH):
    with torch.no_grad():
        input = preprocess_data([sentence])[0]
        input_idx = [word_to_idx[word] for word in input]
        input_tensor = torch.LongTensor([[ind] for ind in input_idx])
        
        input_length = input_tensor.size(0)
        encoder_hidden = encoder.init_hidden()
        
        asp_idx = word_to_idx[aspect]
        asp_tensor = torch.LongTensor([[asp_idx]])

        encoder_hiddens = torch.zeros(max_length, encoder.hidden_size)

        for ei in range(input_length):
            _, encoder_hidden = encoder(input_tensor[ei], encoder_hidden, asp_tensor)
            encoder_hiddens[ei] += encoder_hidden[0, 0]

        decoder_input = torch.LongTensor([[word_to_idx["<BOS>"]]]) 
        decoder_hidden = encoder_hidden
        decoded_words = []
        
        for di in range(max_length):
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden, encoder_hiddens)
            topv, topi = decoder_output.data.topk(1)
            if topi.item() == word_to_idx["<EOS>"]:
                decoded_words.append("<EOS>")
                break
            else:
                decoded_words.append(word_list[topi.item()])
            decoder_input = topi.squeeze().detach()

        return decoded_words

In [None]:
# Test the GRU model using the test data
for i in range(len(test_x[:10])):
    polarity = GRUevaluate(encoder, decoder, test_x[i], test_a[i])[0]
    print(f"Predicted polarity: {polarity}, Actual polarity: {test_y[i]}")

In [None]:
# Measure accuracy of GRU model on test set
correct = 0
for i in range(len(test_x)):
    polarity = GRUevaluate(encoder, decoder, test_x[i], test_a[i])[0]
    if polarity == test_y[i]:
        correct += 1
accuracy = correct / len(test_x_token)
print(f"Accuracy: {accuracy:.4f}")

In [None]:
# Encoder
class Encoder(nn.Module):
    def __init__(self, hidden_size, embedding):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = embedding
        self.gru = nn.GRU(hidden_size, hidden_size)
        
    def forward(self, input, hidden, aspect):
        word_embedding = self.embedding(input).view(1, 1, -1)
        aspect_embedding = self.embedding(aspect).view(1, 1, -1)
        word_embedding = torch.cat((aspect_embedding, word_embedding), 0)
        output, hidden = self.gru(word_embedding, hidden)
        return output, hidden
    
    def init_hidden(self):
        return torch.zeros(1, 1, self.hidden_size)

# Decoder
class Decoder(nn.Module):
    def __init__(self, hidden_size, output_size, attention='dot_product'):
        super(Decoder, self).__init__()
        self.attention_type = attention
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.linear = nn.Linear(self.hidden_size*2, self.output_size)
    
    def cal_attention(self, hidden, encoder_hiddens):
        if self.attention_type == 'dot_product':            # Dot product attention
            attn_weights = F.softmax(torch.bmm(hidden, encoder_hiddens.T.unsqueeze(0)), dim=-1)
        elif self.attention_type == 'scaled_dot_product':   # Scaled dot product attention
            scale = 1.0 / np.sqrt(self.hidden_size)
            attn_weights = F.softmax(torch.bmm(hidden, encoder_hiddens.T.unsqueeze(0)) * scale, dim=-1)
        elif self.attention_type == 'cosine_similarity':
            query = hidden / torch.norm(hidden, dim=-1)
            keys = encoder_hiddens / torch.norm(encoder_hiddens.T, dim=-1)
            norm_product = torch.bmm(query, keys.T.unsqueeze(0))
            attn_weights = F.softmax(norm_product, dim=-1)

        attn_output = torch.bmm(attn_weights, encoder_hiddens.unsqueeze(0))
        concat_output = torch.cat((attn_output[0], hidden[0]), 1)
        return concat_output
    
    def forward(self, hidden, encoder_hiddens):
        concat_output = self.cal_attention(hidden, encoder_hiddens)
        output = F.log_softmax(self.linear(concat_output), dim=1)
        return output
    
    def init_hidden(self):
        return torch.zeros(1, 1, self.hidden_size)

In [None]:
# Set the seed for reproducibility
# seed = 4012
# torch.manual_seed(seed)
# torch.cuda.manual_seed(seed)
# np.random.seed(seed)
# random.seed(seed)

num_epochs = 10000
display_interval = 500
learning_rate = 0.01
hidden_size = 50
attention_type = 'dot_product' # 'dot_product', 'scaled_dot_product', 'cosine_similarity'

# embedding = nn.Embedding.from_pretrained(glove_weights)
embedding = nn.Embedding(vocab_size, hidden_size)

encoder = Encoder(hidden_size, embedding)
decoder = Decoder(hidden_size, 3, attention=attention_type)

encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)
criterion = nn.NLLLoss()
plot_losses = []
plot_avg_losses = []
total_loss = 0

for epoch in range(1, num_epochs+1):
    random_idx = random.choice(range(len(train_data)))
    x_index = [[idx] for idx in train_x_idx[random_idx]]
    y_index = train_y_idx[random_idx]
    a_index = train_a_idx[random_idx]
    
    x_tensor = torch.LongTensor(x_index)
    y_tensor = torch.LongTensor([y_index])
    a_tensor = torch.LongTensor([a_index])
    x_length = x_tensor.size(0)

    loss = 0
    encoder.train()
    decoder.train()
    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()
    
    # Feed the x_tensor (sentence) into the encoder
    encoder_hiddens = torch.zeros(MAX_LENGTH, encoder.hidden_size) # For attention mechanism
    encoder_hidden = encoder.init_hidden() # Hidden state for encoder
    for i in range(x_length):
        encoder_output, encoder_hidden = encoder(x_tensor[i], encoder_hidden, a_tensor)
        encoder_hiddens[i] = encoder_hidden[0, 0]

    y_output = decoder(encoder_hidden, encoder_hiddens)
    loss += criterion(y_output, y_tensor)

    # Backpropagation
    loss.backward()
    encoder_optimizer.step()
    decoder_optimizer.step()
    
    loss = loss.item()
    total_loss += loss
    plot_losses.append(loss)
    
    if (epoch+1) % display_interval == 0:
        avg_loss = total_loss / display_interval
        plot_avg_losses.append(avg_loss)
        total_loss = 0
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')

In [None]:
# Save the model
# torch.save(encoder, 'Model/encoder1.pt')
# torch.save(decoder, 'Model1/decoder1.pt')

# Plot loss over number of epochs
plt.plot(range(1, num_epochs+1), plot_losses)
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss over number of epochs')
plt.show()

plt.plot(range(1, num_epochs+1, display_interval), plot_avg_losses)
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss over number of epochs')
plt.show()

In [None]:
# GRU evaluate
def GRUevaluate(encoder, decoder, sentence, aspect, max_length=MAX_LENGTH):
    with torch.no_grad():
        input = preprocess_data([sentence])[0]
        input_idx = [word_to_idx[word] for word in input]
        input_tensor = torch.LongTensor([[ind] for ind in input_idx])
        
        input_length = input_tensor.size(0)
        encoder_hidden = encoder.init_hidden()
        
        asp_idx = word_to_idx[aspect]
        asp_tensor = torch.LongTensor([[asp_idx]])

        encoder_hiddens = torch.zeros(max_length, encoder.hidden_size)

        for ei in range(input_length):
            _, encoder_hidden = encoder(input_tensor[ei], encoder_hidden, asp_tensor)
            encoder_hiddens[ei] += encoder_hidden[0, 0]
        
        y_pred = decoder(encoder_hidden, encoder_hiddens)
        topv, topi = y_pred.data.topk(1)
        return idx_to_polarity[topi.item()]

In [None]:
# Measure accuracy of LSTM model on test set
correct = 0
for i in range(len(test_x)):
    polarity = GRUevaluate(encoder, decoder, test_x[i], test_a[i])
    if polarity == test_y[i]:
        correct += 1
accuracy = correct / len(test_x_token)
print(f"Accuracy: {accuracy:.4f}")