In [1]:
# For tips on running notebooks in Google Colab, see
# https://pytorch.org/tutorials/beginner/colab
%matplotlib inline

LSTMs in Pytorch
----------------

Before getting to the example, note a few things. Pytorch\'s LSTM
expects all of its inputs to be 3D tensors. The semantics of the axes of
these tensors is important. The first axis is the sequence itself, the
second indexes instances in the mini-batch, and the third indexes
elements of the input. We haven\'t discussed mini-batching, so let\'s
just ignore that and assume we will always have just 1 dimension on the
second axis. If we want to run the sequence model over the sentence
\"The cow jumped\", our input should look like

$$\begin{aligned}
\begin{bmatrix}
\overbrace{q_\text{The}}^\text{row vector} \\
q_\text{cow} \\
q_\text{jumped}
\end{bmatrix}
\end{aligned}$$

Except remember there is an additional 2nd dimension with size 1.

In addition, you could go through the sequence one at a time, in which
case the 1st axis will have size 1 also.

Let\'s see a quick example.


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

torch.manual_seed(1)

In [None]:
lstm = nn.LSTM(3, 3)  # Input dim is 3, output dim is 3
inputs = [torch.randn(1, 3) for _ in range(5)]  # make a sequence of length 5

# initialize the hidden state.
hidden = (torch.randn(1, 1, 3),
          torch.randn(1, 1, 3))
for i in inputs:
    # Step through the sequence one element at a time.
    # after each step, hidden contains the hidden state.
    out, hidden = lstm(i.view(1, 1, -1), hidden)

# alternatively, we can do the entire sequence all at once.
# the first value returned by LSTM is all of the hidden states throughout
# the sequence. the second is just the most recent hidden state
# (compare the last slice of "out" with "hidden" below, they are the same)
# The reason for this is that:
# "out" will give you access to all hidden states in the sequence
# "hidden" will allow you to continue the sequence and backpropagate,
# by passing it as an argument  to the lstm at a later time
# Add the extra 2nd dimension
inputs = torch.cat(inputs).view(len(inputs), 1, -1)
hidden = (torch.randn(1, 1, 3), torch.randn(1, 1, 3))  # clean out hidden state
out, hidden = lstm(inputs, hidden)
print(out)
print(hidden)

tensor([[[-0.0187,  0.1713, -0.2944]],

        [[-0.3521,  0.1026, -0.2971]],

        [[-0.3191,  0.0781, -0.1957]],

        [[-0.1634,  0.0941, -0.1637]],

        [[-0.3368,  0.0959, -0.0538]]], grad_fn=<MkldnnRnnLayerBackward0>)
(tensor([[[-0.3368,  0.0959, -0.0538]]], grad_fn=<StackBackward0>), tensor([[[-0.9825,  0.4715, -0.0633]]], grad_fn=<StackBackward0>))


In [None]:
import pandas as pd
from string import punctuation
from collections import Counter
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, TensorDataset
import torch.nn as nn
import torch.optim as optim

# Load your CSV files for training, validation, and test datasets
train_data = pd.read_csv("https://raw.githubusercontent.com/Venkatalakshmikottapalli/LSTM-Sentiment-Classifier/refs/heads/main/data/Train.csv")
valid_data = pd.read_csv("https://raw.githubusercontent.com/Venkatalakshmikottapalli/LSTM-Sentiment-Classifier/refs/heads/main/data/Valid.csv")
test_data = pd.read_csv("https://raw.githubusercontent.com/Venkatalakshmikottapalli/LSTM-Sentiment-Classifier/refs/heads/main/data/Test.csv")

# View data structure
print(train_data.head())


                                                text  label
0  I grew up (b. 1965) watching and loving the Th...      0
1  When I put this movie in my DVD player, and sa...      0
2  Why do people who do not know what a particula...      0
3  Even though I have great interest in Biblical ...      0
4  Im a die hard Dads Army fan and nothing will e...      1


In [None]:
# #Most students don't have access to GPUs so create a tiny version of the dataset that can fit on a CPU
# imdb_dataset = pd.concat([imdb_dataset[imdb_dataset.sentiment=='positive'].head(n=20),
#                           imdb_dataset[imdb_dataset.sentiment=='negative'].head(n=20)])


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

class TextDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        return text, label

### Preprocessing
 Remove Punctuation and get all the words from review dataset. Count all the words and sort it based on counts



In [None]:
# Preprocessing text data
all_reviews = list()
for text in train_data['text'].to_list():
    text = text.lower()
    text = "".join([ch for ch in text if ch not in punctuation])
    all_reviews.append(text)


In [None]:
# Combine all reviews into a single string and count word frequencies
all_text = " ".join(all_reviews)
all_words = all_text.split()
# Count all words using Counter
count_words = Counter(all_words)
total_words = len(all_words)
sorted_words = count_words.most_common(total_words)

### Tokenization
 Create a dictionary to convert words to Integers based on the number of occurrence of the word

In [None]:
'''
we will start creating dictionary with index 1 because 0 is reserved for padding
'''

vocab_to_int={w:i+1 for i,(w,c) in enumerate(sorted_words)}
print(vocab_to_int)



In [None]:
# Create vocabulary
vocab_to_int = {w: i+1 for i, (w, c) in enumerate(sorted_words)}

# Encode reviews
encoded_reviews = list()
for review in all_reviews:
    encoded_review = list()
    for word in review.split():
        encoded_review.append(vocab_to_int.get(word, 0))
    encoded_reviews.append(encoded_review)

In [None]:
# Use the 'label' column, which already contains 0 or 1 as values
labels = train_data['label'].to_list()

# Pad or truncate reviews to a fixed length
sequence_length = 250
features = np.zeros((len(encoded_reviews), sequence_length), dtype=int)
for i, review in enumerate(encoded_reviews):
    review_len = len(review)
    if review_len <= sequence_length:
        zeros = list(np.zeros(sequence_length - review_len))
        new = zeros + review
    else:
        new = review[:sequence_length]
    features[i, :] = np.array(new)

In [None]:
#Our dataset has ‘positive’ and ‘negative’ as a label, it will be easy if we have 1 and 0, instead of ‘positive’ and ‘negative’
#labels=[1 if label.strip()=='positive' else 0 for label in imdb_dataset.sentiment.to_list()]

### Train, validation, and test set splits

In [None]:
# Split datasets into training, validation, and testing sets
train_x = features[:int(0.6 * len(features))]
train_y = labels[:int(0.6 * len(features))]
valid_x = features[int(0.6 * len(features)):int(0.8 * len(features))]
valid_y = labels[int(0.6 * len(features)):int(0.8 * len(features))]
test_x = features[int(0.8 * len(features)):]
test_y = labels[int(0.8 * len(features)):]

In [None]:
from collections import Counter

print("\nLabel distribution in Training set:", Counter(train_y))
print("\nLabel distribution in Validation set:", Counter(valid_y))
print("\nLabel distribution in Test set:", Counter(test_y))



Label distribution in Training set: Counter({0: 12062, 1: 11938})

Label distribution in Validation set: Counter({1: 4029, 0: 3971})

Label distribution in Test set: Counter({1: 4014, 0: 3986})


In [None]:
# Convert data to PyTorch tensors
train_data = TensorDataset(torch.LongTensor(train_x), torch.FloatTensor(train_y))
valid_data = TensorDataset(torch.LongTensor(valid_x), torch.FloatTensor(valid_y))
test_data = TensorDataset(torch.LongTensor(test_x), torch.FloatTensor(test_y))

# Create DataLoader for batching
batch_size = 24
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True)

### LSTM model specification


In [None]:
# Define the model class
class SentimentLSTM(nn.Module):
    def __init__(self, vocab_size, output_size, embedding_dim, hidden_dim, n_layers, drop_prob=0.5):
        super().__init__()
        self.output_size = output_size
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=drop_prob, batch_first=True)
        self.dropout = nn.Dropout(0.3)
        self.fc1 = nn.Linear(hidden_dim, 64)
        self.fc2 = nn.Linear(64, 16)
        self.fc3 = nn.Linear(16, output_size)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x, hidden):
      batch_size = x.size(0)  

      embedd = self.embedding(x)
      lstm_out, hidden = self.lstm(embedd, hidden)

      # Take the last output from the sequence
      lstm_out = lstm_out[:, -1, :]

      out = self.dropout(lstm_out)
      out = self.fc1(out)
      out = self.dropout(out)
      out = self.fc2(out)
      out = self.dropout(out)
      out = self.fc3(out)

      sig_out = self.sigmoid(out)
      return sig_out.squeeze(), hidden 


    def init_hidden(self, batch_size):
        weight = next(self.parameters()).data
        if torch.cuda.is_available():
            hidden = (weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().cuda(),
                      weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().cuda())
        else:
            hidden = (weight.new(self.n_layers, batch_size, self.hidden_dim).zero_(),
                      weight.new(self.n_layers, batch_size, self.hidden_dim).zero_())
        return hidden


### Instantiate the model with hyperparameters

In [None]:
# Initialize model
vocab_size = len(vocab_to_int) + 1  # Add 1 for padding
output_size = 1
embedding_dim = 128
hidden_dim = 256
n_layers = 2
model = SentimentLSTM(vocab_size, output_size, embedding_dim, hidden_dim, n_layers)


### Training

In [None]:
# Loss and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training the model
epochs = 3
train_on_gpu = torch.cuda.is_available()
if train_on_gpu:
    model.cuda()

for epoch in range(epochs):
    model.train()
    h = model.init_hidden(batch_size)
    for inputs, labels in train_loader:
        if train_on_gpu:
            inputs, labels = inputs.cuda(), labels.cuda()
        h = tuple([each.data for each in h])
        optimizer.zero_grad()
        output, h = model(inputs, h)
        loss = criterion(output.squeeze(), labels.float())
        loss.backward()
        optimizer.step()
    # Validate the model
model.eval()
val_losses = []
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in valid_loader:
        batch_size = inputs.shape[0]
        h = model.init_hidden(batch_size)

        if train_on_gpu:
            inputs, labels = inputs.cuda(), labels.cuda()
            h = tuple([each.cuda() for each in h])

        output, h = model(inputs, h)
        val_loss = criterion(output.squeeze(), labels.float())
        val_losses.append(val_loss.item())

        pred = torch.round(output.squeeze())
        correct += (pred == labels).sum().item()
        total += labels.size(0)

val_loss_avg = np.mean(val_losses)
val_accuracy = correct / total
print(f"Validation Loss: {val_loss_avg}, Validation Accuracy: {val_accuracy}")

### Evaluation


In [None]:
# Test the model
model.eval()
test_losses = []
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        batch_size = inputs.shape[0]  
        h = model.init_hidden(batch_size)  

        if train_on_gpu:
            inputs, labels = inputs.cuda(), labels.cuda()
            h = tuple([each.cuda() for each in h])  

        output, h = model(inputs, h)
        test_loss = criterion(output.squeeze(), labels.float())
        test_losses.append(test_loss.item())

        pred = torch.round(output.squeeze())
        correct += (pred == labels).sum().item()
        total += labels.size(0)

test_loss_avg = np.mean(test_losses)
test_accuracy = correct / total
print(f"Test Loss: {test_loss_avg}, Test Accuracy: {test_accuracy}")

Test Loss: 0.3671612872959611, Test Accuracy: 0.853875
