In [1]:
import pandas as pd
import re
import string
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from gensim.models import Word2Vec
from nltk.tokenize import word_tokenize
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

In [3]:
df = pd.read_csv("/IMDB Dataset 2.csv")

In [10]:
import nltk
from nltk.corpus import stopwords

nltk.download('stopwords')
nltk.download('punkt')
stop_words = set(stopwords.words('english'))

def clean_text(text):
    text = text.lower()
    text = text.translate(str.maketrans('', '', string.punctuation))
    text = re.sub(r'\d+', '', text)
    text = ' '.join(word for word in text.split() if word not in stop_words)
    text = text.strip()
    return text

df['cleaned_text'] = df['review'].apply(clean_text)

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [7]:
df['label'] = df['sentiment'].apply(lambda x: 1 if x == 'positive' else 0)

In [8]:
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

In [11]:
vectorizer = CountVectorizer()
X_train_counts = vectorizer.fit_transform(train_df['cleaned_text'])
X_test_counts = vectorizer.transform(test_df['cleaned_text'])

tokenized_corpus = [word_tokenize(text) for text in train_df['cleaned_text']]
w2v_model = Word2Vec(sentences=tokenized_corpus, vector_size=100, window=5, min_count=1, workers=4)
w2v_model.train(tokenized_corpus, total_examples=len(tokenized_corpus), epochs=10)



(46378463, 48637980)

In [12]:
def text_to_w2v(text, model, vector_size):
    words = word_tokenize(text)
    word_vecs = [model.wv[word] for word in words if word in model.wv]
    if len(word_vecs) == 0:
        return torch.zeros(vector_size)
    return torch.tensor(sum(word_vecs) / len(word_vecs))

train_w2v = torch.stack([text_to_w2v(text, w2v_model, 100) for text in train_df['cleaned_text']])
test_w2v = torch.stack([text_to_w2v(text, w2v_model, 100) for text in test_df['cleaned_text']])

In [13]:
class TextDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.texts[idx], self.labels[idx]

train_dataset = TextDataset(train_w2v, torch.tensor(train_df['label'].values))
test_dataset = TextDataset(test_w2v, torch.tensor(test_df['label'].values))

train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=2, shuffle=False)

In [14]:
class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(RNNModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.rnn(x.unsqueeze(1), h0)
        out = self.fc(out[:, -1, :])
        return out


input_size = 100
hidden_size = 50
output_size = 2

model = RNNModel(input_size, hidden_size, output_size)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [15]:
def train_model(model, train_loader, criterion, optimizer, epochs=10):
    model.train()
    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs.float())
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f'epoch : [{epoch+1}/{epochs}], loss : {running_loss/len(train_loader):.4f}')

In [16]:
def test_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            outputs = model(inputs.float())
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    print(f'accuracy of test set: {100 * correct / total:.2f}%')

In [17]:
train_model(model, train_loader, criterion, optimizer, epochs=15)
test_model(model, test_loader)

epoch : [1/15], loss : 0.3297
epoch : [2/15], loss : 0.3150
epoch : [3/15], loss : 0.3106
epoch : [4/15], loss : 0.3076
epoch : [5/15], loss : 0.3041
epoch : [6/15], loss : 0.3023
epoch : [7/15], loss : 0.2994
epoch : [8/15], loss : 0.2973
epoch : [9/15], loss : 0.2958
epoch : [10/15], loss : 0.2946
epoch : [11/15], loss : 0.2940
epoch : [12/15], loss : 0.2921
epoch : [13/15], loss : 0.2916
epoch : [14/15], loss : 0.2905
epoch : [15/15], loss : 0.2895
accuracy of test set: 87.23%


In [18]:
import numpy as np

def load_glove_embeddings(glove_file):
    embeddings_index = {}
    with open(glove_file, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.split()
            word = values[0]
            vector = np.asarray(values[1:], dtype='float32')
            embeddings_index[word] = vector
    return embeddings_index

In [19]:
glove_file = "/mnt/glove.6B.100d.txt"
embeddings_index = load_glove_embeddings(glove_file)

In [20]:
def text_to_glove(text, embeddings_index, vector_size):
    words = word_tokenize(text)
    word_vecs = [embeddings_index[word] for word in words if word in embeddings_index]
    if len(word_vecs) == 0:
        return torch.zeros(vector_size)
    return torch.tensor(sum(word_vecs) / len(word_vecs))

train_glove = torch.stack([text_to_glove(text, embeddings_index, 100) for text in train_df['cleaned_text']])
test_glove = torch.stack([text_to_glove(text, embeddings_index, 100) for text in test_df['cleaned_text']])

In [21]:
class TextDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.texts[idx], self.labels[idx]

train_dataset = TextDataset(train_glove, torch.tensor(train_df['label'].values))
test_dataset = TextDataset(test_glove, torch.tensor(test_df['label'].values))

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [22]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x.unsqueeze(1), (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

In [23]:
input_size = 100
hidden_size = 50
output_size = 2

model = LSTMModel(input_size, hidden_size, output_size)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [24]:
def train_model(model, train_loader, criterion, optimizer, epochs=10):
    model.train()
    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs.float())
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {running_loss/len(train_loader):.4f}')

In [25]:
def test_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            outputs = model(inputs.float())
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    print(f'accuracy on test set: {100 * correct / total:.2f}%')

train_model(model, train_loader, criterion, optimizer, epochs=10
           )
test_model(model, test_loader)

Epoch [1/10], Loss: 0.5126
Epoch [2/10], Loss: 0.4517
Epoch [3/10], Loss: 0.4482
Epoch [4/10], Loss: 0.4442
Epoch [5/10], Loss: 0.4410
Epoch [6/10], Loss: 0.4386
Epoch [7/10], Loss: 0.4364
Epoch [8/10], Loss: 0.4335
Epoch [9/10], Loss: 0.4327
Epoch [10/10], Loss: 0.4308
accuracy on test set: 80.03%
