In [1]:
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from collections import Counter
import numpy as np
from torch.optim import Adam
import time

In [2]:

# Check if GPU is available and set the device accordingly
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

# Define the CNN model
class SentimentCNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, dropout):
        super(SentimentCNN, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        self.convs = nn.ModuleList([
            nn.Conv2d(in_channels=1, out_channels=n_filters, kernel_size=(fs, embedding_dim))
            for fs in filter_sizes
        ])
        
        self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        x = x.long()
        embedded = self.embedding(x)
        embedded = embedded.unsqueeze(1)
        conved = [F.relu(conv(embedded)).squeeze(3) for conv in self.convs]
        pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
        cat = self.dropout(torch.cat(pooled, dim=1))
        return self.fc(cat)

# Dataset class
class ReviewsDataset(Dataset):
    def __init__(self, reviews, labels):
        self.reviews = reviews
        self.labels = labels
        
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return torch.tensor(self.reviews[idx], dtype=torch.int64), torch.tensor(self.labels[idx], dtype=torch.int64)

Using device: cuda


In [3]:

# Tokenization and vectorization
def tokenize_and_encode(texts, vocab_size=10000):
    tokenized_texts = [text.split() for text in texts]
    all_words = [word for text in tokenized_texts for word in text]
    most_common_words = [word for word, _ in Counter(all_words).most_common(vocab_size)]
    word_to_index = {word: i + 1 for i, word in enumerate(most_common_words)}  # +1 for padding index 0
    
    encoded_texts = []
    for text in tokenized_texts:
        encoded_text = [word_to_index.get(word, 0) for word in text]  # Use 0 for unknown words
        encoded_texts.append(encoded_text)
    
    return encoded_texts, word_to_index

# Load and preprocess the data
df = pd.read_csv('./dataset/Final_CompanyReviews.csv')
reviews = df['review_description'].astype(str).fillna("")  # Convert to string and fill NaN
labels = df['rating'].values

# Tokenize and encode the reviews
encoded_reviews, word_to_index = tokenize_and_encode(reviews)
max_length = max(len(review) for review in encoded_reviews)
padded_reviews = [review + [0] * (max_length - len(review)) for review in encoded_reviews]

# Split the dataset
X_train, X_test, y_train, y_test = train_test_split(padded_reviews, labels, test_size=0.2, random_state=42, )

# Create Datasets and DataLoaders
train_dataset = ReviewsDataset(X_train, y_train)
test_dataset = ReviewsDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)


In [4]:

# Model parameters
vocab_size = len(word_to_index) + 1  # +1 for padding index
embedding_dim = 100
n_filters = 100
filter_sizes = [3, 4, 5]
output_dim = 3  # 3 classes: positive, negative, neutral
dropout = 0.5

# Instantiate the model and move it to the device
model = SentimentCNN(vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, dropout).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters())

# Training loop
def train(model, train_loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for texts, labels in train_loader:
        texts, labels = texts.to(device), labels.to(device)
        predictions = model(texts)
        loss = criterion(predictions, labels)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    avg_loss = total_loss / len(train_loader)
    return avg_loss

# Evaluation function
def evaluate(model, data_loader, criterion, device):
    model.eval()
    total_loss = 0
    correct_predictions = 0
    total_predictions = 0

    with torch.no_grad():
        for texts, labels in data_loader:
            texts, labels = texts.to(device), labels.to(device)
            predictions = model(texts)
            loss = criterion(predictions, labels)
            total_loss += loss.item()

            _, predicted_labels = torch.max(predictions, 1)
            correct_predictions += (predicted_labels == labels).sum().item()
            total_predictions += labels.size(0)

    avg_loss = total_loss / len(data_loader)
    accuracy = correct_predictions / total_predictions
    return avg_loss, accuracy

# Train and evaluate the model
print("Training the model...")
init_time = time.time()
num_epochs = 30
best_accuracy = 0
for epoch in range(num_epochs):
    train_loss = train(model, train_loader, optimizer, criterion, device)
    test_loss, test_accuracy = evaluate(model, test_loader, criterion, device)
    print(f'Epoch: {epoch+1}, Training Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')
    if test_accuracy > best_accuracy:
        best_accuracy = test_accuracy
        torch.save(model.state_dict(), 'best_model.pt')
print(f'Best test accuracy: {best_accuracy:.4f}')
print(f"Training time: {time.time() - init_time}s")


Training the model...
Epoch: 1, Training Loss: 0.6701, Test Loss: 0.5827, Test Accuracy: 0.7735
Epoch: 2, Training Loss: 0.5345, Test Loss: 0.5369, Test Accuracy: 0.7989
Epoch: 3, Training Loss: 0.4701, Test Loss: 0.5232, Test Accuracy: 0.8105
Epoch: 4, Training Loss: 0.4217, Test Loss: 0.5473, Test Accuracy: 0.8087
Epoch: 5, Training Loss: 0.3813, Test Loss: 0.5372, Test Accuracy: 0.8120
Epoch: 6, Training Loss: 0.3513, Test Loss: 0.5631, Test Accuracy: 0.8157
Epoch: 7, Training Loss: 0.3164, Test Loss: 0.5943, Test Accuracy: 0.8170
Epoch: 8, Training Loss: 0.2984, Test Loss: 0.6205, Test Accuracy: 0.8149
Epoch: 9, Training Loss: 0.2686, Test Loss: 0.6554, Test Accuracy: 0.8137
Epoch: 10, Training Loss: 0.2535, Test Loss: 0.6777, Test Accuracy: 0.8099
Epoch: 11, Training Loss: 0.2385, Test Loss: 0.7556, Test Accuracy: 0.8181
Epoch: 12, Training Loss: 0.2195, Test Loss: 0.7838, Test Accuracy: 0.8147
Epoch: 13, Training Loss: 0.2114, Test Loss: 0.8455, Test Accuracy: 0.8155
Epoch: 14, T

In [5]:

# Prediction function
def predict(model, text, word_to_index, max_length, device):
    model.eval()
    tokens = text.split()
    encoded_text = [word_to_index.get(token, 0) for token in tokens]
    padded_text = encoded_text + [0] * (max_length - len(encoded_text))
    input_tensor = torch.tensor(padded_text, dtype=torch.int64).unsqueeze(0).to(device)
    
    with torch.no_grad():
        prediction = model(input_tensor)
        _, predicted_label = torch.max(prediction, 1)
    
    return predicted_label.item()


In [6]:
list_of_reviews = ['الجو حلو والمكان جميل', 'المكان سيء والخدمة سيئة', 'المكان عادي والخدمة عادية']
for review in list_of_reviews:
    label = predict(model, review, word_to_index, max_length, device)
    print(f'Review: ({review}), Predicted Label: {label}')


Review: (الجو حلو والمكان جميل), Predicted Label: 1
Review: (المكان سيء والخدمة سيئة), Predicted Label: 0
Review: (المكان عادي والخدمة عادية), Predicted Label: 1


In [7]:
print("Loading the best model...")
model.load_state_dict(torch.load('best_model.pt'))
model.to(device)
for review in list_of_reviews:
    label = predict(model, review, word_to_index, max_length, device)
    print(f'Review: ({review}), Predicted Label: {label}')

Loading the best model...
Review: (الجو حلو والمكان جميل), Predicted Label: 1
Review: (المكان سيء والخدمة سيئة), Predicted Label: 0
Review: (المكان عادي والخدمة عادية), Predicted Label: 1
