In [1]:
# Required Libraries
import torch
from torch.utils.data import Dataset, DataLoader
from torch import nn, optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, accuracy_score
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem.isri import ISRIStemmer
import nltk
import numpy as np
import pandas as pd

nltk.download('punkt')
nltk.download('stopwords')

# Load Dataset
dataset = pd.read_csv('/kaggle/input/datasetdlarabic/aljazeera_data.csv')  # Update path if necessary
dataset = dataset.dropna()  # Remove any null values

# Preprocessing Pipeline
class TextPreprocessor:
    def __init__(self):
        self.stop_words = set(stopwords.words('arabic'))
        self.stemmer = ISRIStemmer()  # Arabic-specific stemmer

    def preprocess(self, text):
        # Tokenization
        tokens = word_tokenize(text)
        # Remove stop words
        tokens = [t for t in tokens if t not in self.stop_words]
        # Stemming
        tokens = [self.stemmer.stem(t) for t in tokens]
        return tokens

preprocessor = TextPreprocessor()
dataset['Processed_Text'] = dataset['Text'].apply(preprocessor.preprocess)

# Encoding Labels
label_encoder = LabelEncoder()
dataset['Encoded_Score'] = label_encoder.fit_transform(dataset['Score'])

# Split Data
X_train, X_test, y_train, y_test = train_test_split(
    dataset['Processed_Text'], dataset['Encoded_Score'], test_size=0.2, random_state=42
)

# Dataset Class
class NLPDataset(Dataset):
    def __init__(self, texts, labels, vocab=None):
        self.texts = texts
        self.labels = labels
        self.vocab = vocab or self.build_vocab()

    def build_vocab(self):
        vocab = set(token for text in self.texts for token in text)
        return {word: idx for idx, word in enumerate(vocab, start=1)}

    def encode_text(self, text):
        return [self.vocab[token] for token in text if token in self.vocab]

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        encoded_text = self.encode_text(self.texts.iloc[idx])
        label = self.labels.iloc[idx]
        return torch.tensor(encoded_text, dtype=torch.long), torch.tensor(label, dtype=torch.long)

train_dataset = NLPDataset(X_train, y_train)
test_dataset = NLPDataset(X_test, y_test, vocab=train_dataset.vocab)

def collate_fn(batch):
    texts, labels = zip(*batch)
    max_len = max(len(text) for text in texts)
    padded_texts = [torch.cat([text, torch.zeros(max_len - len(text))]) for text in texts]
    return torch.stack(padded_texts), torch.tensor(labels)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)


# Check for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# LSTM Model with GPU Support
class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, output_dim):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)  # Multiply by 2 for bidirectional

    def forward(self, x):
        x = self.embedding(x.long())
        _, (hidden, _) = self.lstm(x)
        # Combine forward and backward hidden states
        hidden = torch.cat((hidden[-2], hidden[-1]), dim=1)
        output = self.fc(hidden)
        return output

# Model Parameters
vocab_size = len(train_dataset.vocab) + 1
embed_dim = 128
hidden_dim = 256
output_dim = len(label_encoder.classes_)

# Instantiate Model and Move to Device
model = LSTMModel(vocab_size, embed_dim, hidden_dim, output_dim).to(device)

# Training Settings
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
epochs = 10

# Training Loop with GPU Support
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for texts, labels in train_loader:
        texts, labels = texts.to(device), labels.to(device)  # Move data to GPU
        optimizer.zero_grad()
        outputs = model(texts)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {total_loss / len(train_loader):.4f}")

# Evaluation Function with GPU Support
def evaluate_model(model, data_loader):
    model.eval()
    y_true, y_pred = [], []
    with torch.no_grad():
        for texts, labels in data_loader:
            texts, labels = texts.to(device), labels.to(device)  # Move data to GPU
            outputs = model(texts)
            predicted = torch.argmax(outputs, dim=1)
            y_true.extend(labels.cpu().tolist())  # Move back to CPU for metrics
            y_pred.extend(predicted.cpu().tolist())  # Move back to CPU for metrics
    return y_true, y_pred

# Calculate Metrics
y_train_true, y_train_pred = evaluate_model(model, train_loader)
y_test_true, y_test_pred = evaluate_model(model, test_loader)

# Training Metrics
train_mse = mean_squared_error(y_train_true, y_train_pred)
train_mae = mean_absolute_error(y_train_true, y_train_pred)
train_r2 = r2_score(y_train_true, y_train_pred)
train_acc = accuracy_score(y_train_true, y_train_pred)

# Testing Metrics
test_mse = mean_squared_error(y_test_true, y_test_pred)
test_mae = mean_absolute_error(y_test_true, y_test_pred)
test_r2 = r2_score(y_test_true, y_test_pred)
test_acc = accuracy_score(y_test_true, y_test_pred)

# Print Metrics
print("Training Metrics:")
print(f"MSE: {train_mse:.4f}, MAE: {train_mae:.4f}, R2: {train_r2:.4f}, Accuracy: {train_acc:.4f}")
print("Testing Metrics:")
print(f"MSE: {test_mse:.4f}, MAE: {test_mae:.4f}, R2: {test_r2:.4f}, Accuracy: {test_acc:.4f}")


[nltk_data] Downloading package punkt to /usr/share/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /usr/share/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
Using device: cuda
Epoch 1, Loss: 1.1123
Epoch 2, Loss: 0.6933
Epoch 3, Loss: 0.5175
Epoch 4, Loss: 0.3984
Epoch 5, Loss: 0.3169
Epoch 6, Loss: 0.2243
Epoch 7, Loss: 0.1607
Epoch 8, Loss: 0.1113
Epoch 9, Loss: 0.0952
Epoch 10, Loss: 0.0657
Training Metrics:
MSE: 0.7984, MAE: 0.0522, R2: 0.9684, Accuracy: 0.9929
Testing Metrics:
MSE: 5.4629, MAE: 0.6007, R2: 0.6410, Accuracy: 0.9081


In [3]:
from nltk.translate.bleu_score import sentence_bleu, corpus_bleu

def calculate_bleu_score(y_true, y_pred, label_decoder):
    """
    Calculate BLEU score for true and predicted labels.
    Args:
        y_true: List of true labels (integer or numeric encoded).
        y_pred: List of predicted labels (integer or numeric encoded).
        label_decoder: Function to decode integer labels to text labels.
    Returns:
        Average BLEU score and individual BLEU scores.
    """
    references = []
    candidates = []

    for true, pred in zip(y_true, y_pred):
        # Decode and tokenize true label
        true_decoded = label_decoder(true)
        if isinstance(true_decoded, str):
            references.append([true_decoded.split()])
        else:
            raise ValueError(f"Decoded reference is not a string: {true_decoded}")

        # Decode and tokenize predicted label
        pred_decoded = label_decoder(pred)
        if isinstance(pred_decoded, str):
            candidates.append(pred_decoded.split())
        else:
            raise ValueError(f"Decoded candidate is not a string: {pred_decoded}")

    # Calculate BLEU scores
    individual_bleu_scores = [
        sentence_bleu(reference, candidate, weights=(1.0, 0, 0, 0))  # Unigram BLEU
        for reference, candidate in zip(references, candidates)
    ]
    average_bleu = corpus_bleu(references, candidates, weights=(1.0, 0, 0, 0))  # Unigram BLEU
    return average_bleu, individual_bleu_scores

# Label Decoder Function
def decode_label(encoded_label):
    """
    Decode an integer-encoded label back to its original text representation.
    Args:
        encoded_label: Integer or float representation of the label.
    Returns:
        Decoded text label.
    """
    if isinstance(encoded_label, (int, float)):
        return f"decoded_text_{int(encoded_label)}"
    return str(encoded_label)

# Calculate BLEU scores for training and testing data
train_bleu, train_individual_bleu_scores = calculate_bleu_score(y_train_true, y_train_pred, decode_label)
test_bleu, test_individual_bleu_scores = calculate_bleu_score(y_test_true, y_test_pred, decode_label)

# Print Metrics
print("Training BLEU Score:", train_bleu)
print("Testing BLEU Score:", test_bleu)


Training BLEU Score: 0.9929266136162688
Testing BLEU Score: 0.9081272084805654


Corpus/Sentence contains 0 counts of 2-gram overlaps.
BLEU scores might be undesirable; use SmoothingFunction().
