In [None]:
import numpy as np
import pandas as pd
import torch
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import re
import matplotlib.pyplot as plt
from hazm.utils import stopwords_list
from hazm import Normalizer
from tqdm import tqdm
from torch.optim.lr_scheduler import StepLR
from hazm import Stemmer, WordTokenizer
from sklearn.utils.class_weight import compute_class_weight
from sklearn.model_selection import StratifiedShuffleSplit
import torch.nn as nn


# Dataset

In [None]:
df = pd.read_csv("data.csv")

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")


In [None]:
df.drop(columns=['Score'], axis=1, inplace=True)

In [None]:
split = StratifiedShuffleSplit(n_splits=1, test_size=0.25)
for train_idx, test_idx in split.split(df["Text"].values, df["Suggestion"].values):
    X_train, X_test = df["Text"].values[train_idx], df["Text"].values[test_idx]
    y_train, y_test = df["Suggestion"].values[train_idx] - 1, df["Suggestion"].values[test_idx] - 1


# Preprocessing

In [None]:
tokenizer = WordTokenizer()
stopwords = stopwords_list()
normalizer = Normalizer()
stemmer = Stemmer()

def stem_tokens(tokens):
    return [stemmer.stem(token) for token in tokens]

def tokenize(text):
    return tokenizer.tokenize(text)

def remove_stopwords(tokens):
    return [token for token in tokens if token not in stopwords]

def preprocess_text(text):
    text = normalizer.normalize(text)
    text = re.sub(r"[^\w\s\u0600-\u06FF]", "", text)
    text = re.sub(r"[\d۰-۹]+", "", text)
    text = re.sub(r"\s+", " ", text).strip()
    tokens = tokenize(text)
    tokens = remove_stopwords(tokens)
    tokens = stem_tokens(tokens)
    return ' '.join(tokens)

def preprocess_series(text_series):
    return text_series.apply(lambda x: preprocess_text(x))

X_train = preprocess_series(pd.Series(X_train))
print('Preprocessing for train done.')

X_test = preprocess_series(pd.Series(X_test))
print('Preprocessing for test done.')


In [None]:
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

In [None]:
tokenized_train = X_train.apply(lambda x: x.split())
tokenized_test = X_test.apply(lambda x: x.split())

all_tokens = [token for tokens in tokenized_train for token in tokens]
word_counts = Counter(all_tokens)

vocab = {word: idx + 2 for idx, (word, _) in enumerate(word_counts.most_common())}
vocab["<PAD>"] = 0
vocab["<UNK>"] = 1


In [None]:
def tokens_to_indices(tokens, vocab):
    return [vocab.get(token, vocab["<UNK>"]) for token in tokens]

X_train_indices = tokenized_train.apply(lambda tokens: tokens_to_indices(tokens, vocab))
X_test_indices = tokenized_test.apply(lambda tokens: tokens_to_indices(tokens, vocab))

In [None]:
max_len = 123

def pad_seq(seq, max_len):
    return seq[:max_len] + [0]*(max_len - len(seq)) if len(seq) < max_len else seq[:max_len]

X_train_padded = torch.tensor([pad_seq(seq, max_len) for seq in X_train_indices])
X_test_padded = torch.tensor([pad_seq(seq, max_len) for seq in X_test_indices])


In [None]:
class CommentDataset(Dataset):
    def __init__(self, inputs, labels):
        self.inputs = inputs
        self.labels = labels

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        return self.inputs[idx], self.labels[idx]

train_dataset = CommentDataset(X_train_padded, y_train_tensor)
test_dataset = CommentDataset(X_test_padded, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)


In [None]:
import torch
import torch.nn as nn

class Attention(nn.Module):
    def __init__(self, hidden_dim):
        super(Attention, self).__init__()
        self.attn = nn.Linear(hidden_dim * 2, 1)

    def forward(self, lstm_output):
        attn_weights = torch.softmax(self.attn(lstm_output), dim=1)
        context = torch.sum(attn_weights * lstm_output, dim=1)
        return context

class TextClassificationModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, padding_idx=0):
        super(TextClassificationModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=padding_idx)
        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_dim, batch_first=True, bidirectional=True)
        self.attention = Attention(hidden_dim)
        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
    def forward(self, x):
        embedded = self.embedding(x)
        lstm_output, _ = self.lstm(embedded)
        context = self.attention(lstm_output)
        output = self.fc(self.dropout(context))
        return output


In [None]:
embedding_matrix = np.random.normal(0, 1, (len(vocab), max_len)).astype(np.float32)

vocab_size = len(vocab)
hidden_dim = 16
output_dim = 3

model = TextClassificationModel(vocab_size, max_len, hidden_dim, output_dim)
model.to(device)
print(model)


In [None]:
n_epochs = 40
val_per_epoch = 0
optimizer = optim.Adam(model.parameters(), lr=0.0002)

class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float).to(device)

criterion = nn.CrossEntropyLoss(weight=class_weights_tensor)

In [None]:
train_loss = list()

class EarlyStopping:
    def __init__(self, patience=5, verbose=False):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_loss = None
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None or val_loss < self.best_loss:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
                if self.verbose:
                    print("Early stopping")

    def should_stop(self):
        return self.early_stop


early_stopping = EarlyStopping(patience=3, verbose=True)


for epoch in range(n_epochs):
    running_loss = 0
    running_acc = 0

    model.train()
    for idx, (X, y) in enumerate(tqdm(train_loader)):
        optimizer.zero_grad()
        X, y = X.to(device), y.to(device)
        y_hat = model(X)
        loss = criterion(y_hat, y)

        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    running_loss = running_loss/ len(train_loader)
    running_acc = running_acc / len(train_loader)
    train_loss.append(running_loss)
    print(f'Epoch {epoch+1}/{n_epochs} : training loss: {round(running_loss,3)}')

    early_stopping(running_loss)
    if early_stopping.should_stop():
      print("Early stopping triggered")
      break

In [None]:
plt.title('train loss')
plt.plot(train_loss)
plt.xlabel("epochs")
plt.ylabel("loss")
plt.show()

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report

model.eval()
with torch.no_grad():
    outputs = model(X_test_padded.to(device))
    _, predicted_labels = torch.max(outputs, 1)
    predicted_labels = predicted_labels.cpu().numpy()
    true_labels = y_test_tensor.cpu().numpy()

print("Accuracy:", accuracy_score(true_labels, predicted_labels))
print("Precision:", precision_score(true_labels, predicted_labels, average='weighted'))
print("Recall:", recall_score(true_labels, predicted_labels, average='weighted'))
print("F1 Score:", f1_score(true_labels, predicted_labels, average='weighted'))

print("\nClassification Report:")
print(classification_report(true_labels, predicted_labels))


In [None]:
print("\nSample predictions on test data:")
for i in range(7):
    print(f"Text: {X_test[i]}")
    print(f"True Label: {true_labels[i]+1}, Predicted: {predicted_labels[i]+1}")
    print("------")
