In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from tokenizers import Tokenizer, models, pre_tokenizers, trainers
import regex as re
import warnings
from torch.utils.data import DataLoader, TensorDataset
import swifter
warnings.filterwarnings("ignore")


vocab_size=30000

batch_size = 32
block_size = 80 # number of tokens in one sentence
learning_rate = 0.001

max_iter = 5

n_embd = 128
n_head = 4
n_layer = 4

total_rows = 310217
train_size = int(total_rows * 0.8)
test_size =  total_rows - train_size

n_classes = 3
pad_idx = 0

text_column = "clean_text"
target_column = "rnk"



uzum_reviews_df = pd.read_parquet("../data/uzum_dataset.parquet", engine='pyarrow')
uzum_reviews_df["len"] = uzum_reviews_df["normalized_review_text"].str.len()
uzum_reviews_filtered_df = uzum_reviews_df[uzum_reviews_df["len"] <= block_size]
rating_map = {
    'very poor' : 0,
    'poor' : 0,
    'fair' : 1,
    'good' : 2,
    'excellent' : 2
}

uzum_reviews_filtered_df["rnk"] = uzum_reviews_filtered_df["rating"].map(rating_map)


def normalize_uzum_reviews(df: pd.DataFrame) -> pd.DataFrame:
    """
    cleans the text based on the following criterias listed below
    :param df: pandas dataframe
    :returns: cleaned pandas dataframe
    """

    latin = r"\p{Latin}"
    cyrillic = r"\p{Cyrillic}"
    digits = r"\p{Number}"


    allowed_re = re.compile(fr"(?:{latin}|{cyrillic}|{digits}|\s)")

    final_clean = {'ø','ʔ','ʕ','ʖ','ᴥ','ᵕ','⅚','ᴗ'}

    latin_map = {
    "à": "a", "á": "a", "â": "a", "ã": "a",
    "ç": "c",
    "è": "e", "é": "e", "ë": "e",
    "ì": "i", "í": "i",
    "ñ": "n",
    "ò": "o", "ó": "o", "ô": "o", "õ": "o", "ö": "o",
    "ù": "u", "ú": "u", "û": "u", "ü": "u",
    "ý": "y", "ÿ": "y",
    "ĝ": "g'", "ğ": "g'", "ġ": "g'", "ģ": "g'",
    "ĥ": "h",
    "ı": "i",
    "ĵ": "j",
    "ķ": "k",
    "ĺ": "l", "ļ": "l",
    "ń": "n", "ň": "n",
    "ō": "o'", "ŏ": "o'", "ő": "o'",
    "ŕ": "r",
    "ś": "s", "ş": "sh",
    "ũ": "u", "ū": "u", "ů": "u",
    "ź": "z", "ž": "j",
    "ǒ": "o'", "ǫ": "q",
    "ǵ": "g'",
    "ɓ": "b",
    "ə": "e",
    '²': '2',
    '³': '3',
    '¹': '1',
    'ď': 'd',
    'ɢ': 'g',
    'ɪ': 'i',
    'ɴ': 'n',
    'ʀ': 'r',
    'ʏ': 'y',
    'ʜ': 'h',
    'ʟ': 'l',
    'ө': 'o',
    'ᴀ': 'a',
    'ᴄ': 'c',
    'ᴅ': 'd',
    'ᴇ': 'e',
    'ᴊ': 'j',
    'ᴋ': 'k',
    'ᴍ': 'm',
    'ᴏ': 'o',
    'ᴘ': 'p',
    'ᴛ': 't',
    'ᴜ': 'u',
    '⁰': '0',
    '⁴': '4',
    '⁵': '5'
}


    def normalize_text(text: str) -> str:
        out = []
        for ch in text:
        # skip unwanted characters
            if ch in final_clean:
               continue

        # keep only allowed characters (latin, cyrillic, digits, spaces)
            if not allowed_re.fullmatch(ch):
               continue

        # map special latin → uzbek letters
            out.append(latin_map.get(ch, ch))

        return "".join(out)


    df['clean_text'] = df["normalized_review_text"].astype(str).swifter.apply(normalize_text)

    return df




def get_token_data():
    """
    Trains a BPE tokenizer on the text column, encodes + pads the texts
    :returns: X (tensor), y (tensor), tokenizer (trained)
    """

    uzum_df = normalize_uzum_reviews(uzum_reviews_filtered_df)


    tokenizer = Tokenizer(models.BPE())
    tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()


    trainer = trainers.BpeTrainer(
        vocab_size=vocab_size,
        special_tokens=["<pad>", "<unk>"]
    )


    tokenizer.train_from_iterator(uzum_df[text_column].astype(str).tolist(), trainer)

    PAD_ID = tokenizer.token_to_id("<pad>")



    def padding_sentence(ids,pad_id=PAD_ID):
        if len(ids) < block_size:
            ids += [pad_id] * (block_size - len(ids))
        return ids[:block_size]


    X_seq = [padding_sentence(tokenizer.encode(str(t)).ids) for t in uzum_df[text_column]]


    X = torch.tensor(X_seq, dtype=torch.long)
    y = torch.tensor(uzum_df[target_column].values, dtype=torch.long)

    return X, y, tokenizer

In [16]:
X, y, tokenizer = get_token_data()

Pandas Apply:   0%|          | 0/310217 [00:00<?, ?it/s]

In [17]:
X.shape, y.unique().tolist()

(torch.Size([310217, 80]), [0, 1, 2])

In [18]:
dataset = TensorDataset(X, y)

train_data, test_data = torch.utils.data.random_split(dataset, [train_size, test_size])


train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=batch_size)


In [19]:
class EmbeddingLinear(nn.Module):
    def __init__(self):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, n_embd, padding_idx=pad_idx)
        self.fc = nn.Linear(n_embd, n_classes)

    def forward(self, x):
        embedded = self.embedding(x)          # (batch_size, block_size, n_embd)
        pooled = embedded.mean(dim=1)         # mean pooling: (batch_size, n_embd)
        out = self.fc(pooled)                 # (batch_size, n_classes)
        return out

    def predict(self, tokenizer, text, device="cpu"):
        self.eval()
        self.to(device)

        ids = tokenizer.encode(text).ids

        if len(ids) < block_size:
            ids += [pad_idx] * (block_size - len(ids))
        ids = ids[:block_size]

        x = torch.tensor([ids], dtype=torch.long, device=device)

        with torch.no_grad():
            logits = self(x)
            probs = F.softmax(logits, dim=1)

        return probs.squeeze(0)

In [20]:
model = EmbeddingLinear()

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

print("Using: ", device)

Using:  cuda


In [21]:
for epoch in range(max_iter):
    model.train()
    total_loss = 0

    for batch_X, batch_y in train_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)

        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch+1}/{max_iter}, Loss: {total_loss/len(train_loader):.4f}")

Epoch 1/5, Loss: 0.3882
Epoch 2/5, Loss: 0.3093
Epoch 3/5, Loss: 0.2934
Epoch 4/5, Loss: 0.2835
Epoch 5/5, Loss: 0.2760


In [25]:
text = "Rahmat, juda yoqdi!"
probs = model.predict(tokenizer, text, device=device)
print(probs)
out = torch.argmax(probs).tolist()
print("Model: ", out)
print('excellent' if out == 2 else 'fair' if out == 1 else 'poor')

tensor([0.0054, 0.0115, 0.9831], device='cuda:0')
Model:  2
excellent


In [26]:
text = "bomapti"
probs = model.predict(tokenizer, text, device=device)
print(probs)
out = torch.argmax(probs).tolist()
print("Model: ", out)
print('excellent' if out == 2 else 'fair' if out == 1 else 'poor')

tensor([0.8792, 0.0825, 0.0383], device='cuda:0')
Model:  0
poor
