In [None]:
!pip install kagglehub
!pip install numpy
!pip install pandas
!pip install spacy
!pip install scikit-learn

In [None]:
!pip install gensim

In [None]:
import pandas as pd
from tqdm import tqdm
from collections import Counter

tqdm.pandas()

In [None]:
import kagglehub
import os

path = kagglehub.dataset_download("rmisra/news-category-dataset")

print(os.listdir(path))
file_path = os.path.join(path, "News_Category_Dataset_v3.json")

In [None]:
df = pd.read_json(file_path, lines=True)

df.head()

In [None]:
counter = Counter(df['category'])
counter.most_common(30)

In [None]:
top_cats = [cat for cat, i in counter.most_common(6)] + ["SPORTS"]
df = df[df["category"].isin(top_cats)].copy()

In [None]:
CATEGORIES = {cat:idx for idx, cat in enumerate(df['category'].unique())}
CATEGORIES

In [None]:
df['label'] = df['category'].map(CATEGORIES)
df.head()

# Tokenization

In [None]:
!python -m spacy download en_core_web_sm

In [None]:
import spacy
from typing import List

nlp = spacy.load("en_core_web_sm")

In [None]:
def tokenize(headline: str) -> List[str]:
    doc = nlp(headline)
    return [
        token.text.lower()
        for token in doc
        if not token.is_punct and token.is_alpha
    ]

In [None]:
# df['headline_tokens'] = df['headline'].progress_apply(tokenize)

In [None]:
df['description_tokens'] = df['short_description'].progress_apply(tokenize)

# Prepare embeddings (GLOVE)

In [None]:
import numpy as np
import torch
from collections import Counter

In [None]:
import gensim.downloader as api

wv = api.load("glove-wiki-gigaword-100")

In [None]:
# EMB_DIM = wv.vector_size
# UNK_VEC = np.zeros(EMB_DIM, dtype="float32")
# VOCAB = set(wv.index_to_key)

# def vectorize(tokens: List[str]) -> torch.Tensor:
#     vecs = [wv[token] for token in tokens if token in VOCAB]
#     if not vecs:
#         return torch.from_numpy(UNK_VEC)
    
#     mean_vec = np.mean(vecs, axis=0).astype("float32")
#     return torch.from_numpy(mean_vec)

In [None]:
EMB_DIM = wv.vector_size

counter = Counter(t for tokens in df["description_tokens"] for t in tokens)
vocab = {"<PAD>":0, # If we need to get specific batch size, but have not enough words in the sentence
         "<UNK>":1}
for token, freq in counter.items():
      vocab[token] = len(vocab) # setting unique ID to each token in vocabulary
PAD_IDX, UNK_IDX = 0, 1

emb_matrix = np.random.normal(scale=0.6, size=(len(vocab), EMB_DIM)).astype("float32")
emb_matrix[PAD_IDX] = np.zeros(EMB_DIM)
emb_matrix[UNK_IDX] = np.zeros(EMB_DIM)
for token, idx in vocab.items():
    # check if the token exists in word2vec
    # add the line in the embedding matrix as vector for this token
    if token in wv:
        emb_matrix[idx] = wv[token]

def transform_to_indices(tokens):
    return [vocab.get(token) for token in tokens]

df["description_indices"] = df["description_tokens"].progress_apply(transform_to_indices)

In [None]:
# df['headline_vector'] = df['headline_tokens'].progress_apply(vectorize)

In [None]:
# df['description_vector'] = df['description_tokens'].progress_apply(vectorize)

In [None]:
df.head()

# Model

In [None]:
import torch.nn as nn

In [None]:
class FFNN(nn.Module):
    def __init__(self, emb_dim: int, n_classes: int, hidden_dim: int = 64):
        super().__init__()
        
        self.net = nn.Sequential(
            nn.Linear(emb_dim, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, n_classes)
        )

    def forward(self, x):
        return self.net(x)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class LSTMAttentionClassifier(nn.Module):
    def __init__(self, emb_matrix, n_classes, freeze, hidden=128, num_layers=1, bidirectional=True, dropout=0.3):
        super().__init__()
        self.bidirectional = bidirectional
        self.hidden_size = hidden
        self.num_directions = 2 if bidirectional else 1

        self.embedding = nn.Embedding.from_pretrained(
            torch.tensor(emb_matrix),
            freeze=freeze
        )

        self.lstm = nn.LSTM(
            input_size=emb_matrix.shape[1],
            hidden_size=hidden,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=bidirectional,
            dropout=dropout if num_layers > 1 else 0
        )

        self.attention = nn.Linear(hidden * self.num_directions, 1)

        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden * self.num_directions, n_classes)

    def forward(self, x, lengths):
        emb = self.embedding(x)

        packed = nn.utils.rnn.pack_padded_sequence(emb, lengths.cpu(), batch_first=True, enforce_sorted=False)
        packed_out, _ = self.lstm(packed)
        lstm_out, _ = nn.utils.rnn.pad_packed_sequence(packed_out, batch_first=True)

        attn_weights = self.attention(lstm_out)
        attn_weights = torch.softmax(attn_weights.squeeze(-1), dim=1).unsqueeze(-1)

        context = torch.sum(lstm_out * attn_weights, dim=1)

        out = self.dropout(context)
        logits = self.fc(out)
        return logits

# Dataset Preparation

In [None]:
from torch.utils.data import Dataset, DataLoader, random_split

In [None]:
# class CustomDataset(Dataset):
#     def __init__(self, vecs, labels):
#         self.vecs = vecs
#         self.labels = labels
#     def __len__(self):
#         return len(self.vecs)
#     def __getitem__(self, idx):
#         return self.vecs[idx], self.labels[idx]

# # dataset = CustomDataset(df["headline_vector"].tolist(), df["label"].tolist())
# dataset = CustomDataset(df["description_vector"].tolist(), df["label"].tolist())
# train_size = int(0.8*len(dataset))
# val_size = len(dataset) - train_size
# train_ds, val_ds = random_split(dataset, [train_size, val_size],
#                                 generator=torch.Generator().manual_seed(42))

# train_dl = DataLoader(train_ds, batch_size=64, shuffle=True)
# val_dl = DataLoader(val_ds, batch_size=64)

In [None]:
df = df[df['description_indices'].apply(lambda x: len(x) > 0)].reset_index(drop=True)

In [None]:
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import torch

# Создаём списки
X = df["description_indices"].tolist()
y = df["label"].tolist()

# Stratified split
X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)

# Кастомный Dataset
class CustomDataset(Dataset):
    def __init__(self, idxs, labels):
        self.idxs = idxs
        self.labels = labels
    def __len__(self):
        return len(self.idxs)
    def __getitem__(self, idx):
        return self.idxs[idx], self.labels[idx]

# Функция паддинга
def pad(batch):
    idxs, labels = zip(*batch)
    idxs = [torch.tensor(idx, dtype=torch.long) for idx in idxs]
    lens = torch.tensor([len(idx) for idx in idxs], dtype=torch.long)
    pads = pad_sequence(idxs, batch_first=True, padding_value=PAD_IDX)
    return pads, torch.tensor(labels, dtype=torch.float32), lens

# Создание датасетов и загрузчиков
train_ds = CustomDataset(X_train, y_train)
val_ds = CustomDataset(X_val, y_val)

train_dl = DataLoader(train_ds, batch_size=32, shuffle=True, collate_fn=pad)
val_dl = DataLoader(val_ds, batch_size=64, shuffle=False, collate_fn=pad)

# Learning

In [None]:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

In [None]:
from torch.optim import Adam

device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
from torch.optim import Adam

# model = FFNN(EMB_DIM, len(CATEGORIES), hidden_dim=256)
# optimizer = Adam(model.parameters())

# loss_function = nn.CrossEntropyLoss()

In [None]:
# EPOCHS = 20

# for epoch in range(EPOCHS):
#     epoch_losses = 0
    
#     model.train()
#     for X, y in train_dl:
#         y = y.to(device).long()
#         X = X.to(device).float()
#         optimizer.zero_grad()
#         out = model(X)
#         loss = loss_function(out, y)
#         epoch_losses += loss.detach().cpu().item()
#         loss.backward()
#         optimizer.step()
        
#     print("Loss function:", epoch_losses / len(train_dl))

#     model.eval()
#     preds, true_labels = [], []
#     with torch.no_grad():
#         for X, y in val_dl:
#             X = X.to(device).float()
#             logits = model(X)
#             probs = torch.softmax(logits, dim=1).cpu()
#             preds.extend(torch.argmax(probs, dim=1).cpu().numpy())
#             true_labels.extend(y.numpy())

#     acc = accuracy_score(true_labels, preds)
#     p, r, f1, _ = precision_recall_fscore_support(true_labels, preds, average="macro")
#     if epoch % 5 == 0:
#         print(f"Epoch {epoch:02d} | Acc {acc:.3f} · P {p:.3f} · R {r:.3f} · F1 {f1:.3f}")

In [None]:
model = LSTMClassifier(emb_matrix, len(CATEGORIES), False, num_layers=2)
optimizer = Adam(model.parameters())
loss_function = nn.CrossEntropyLoss()

In [None]:
EPOCHS = 20
best_f1 = 0.0
best_model_path = "best_model.pt"

for epoch in range(EPOCHS):
    epoch_losses = 0
    model.train()

    for X, y, lengths in train_dl:
        X = X.to(device).long()
        y = y.to(device).long()
        lengths = lengths.to(device)

        optimizer.zero_grad()
        out = model(X, lengths)
        loss = loss_function(out, y)
        loss.backward()
        optimizer.step()

        epoch_losses += loss.item() * X.size(0)

    avg_loss = epoch_losses / len(train_dl.dataset)
    print(f"Epoch {epoch:02d} | Train Loss: {avg_loss:.4f}")

    model.eval()
    preds, true_labels = [], []

    with torch.no_grad():
        for X, y, lengths in val_dl:
            X = X.to(device).long()
            y = y.to(device).long()
            lengths = lengths.to(device)

            logits = model(X, lengths)
            pred_labels = torch.argmax(torch.softmax(logits, dim=1), dim=1)

            preds.extend(pred_labels.cpu().numpy())
            true_labels.extend(y.cpu().numpy())

    acc = accuracy_score(true_labels, preds)
    p, r, f1, _ = precision_recall_fscore_support(true_labels, preds, average="macro")

    if f1 > best_f1:
        best_f1 = f1
        torch.save(model.state_dict(), best_model_path)
        print(f"Saved new best model at epoch {epoch:02d} with F1 = {f1:.3f}")

    if epoch % 2 == 0 or epoch == EPOCHS - 1:
        print(f"Eval  | Acc {acc:.3f} · P {p:.3f} · R {r:.3f} · F1 {f1:.3f}")

In [None]:
import torch
from torch.nn.utils.rnn import pad_sequence

def predict_category(text: str, model, vocab, categories, device, tokenize, PAD_IDX=0):
    model.eval()
    tokens = tokenize(text)

    if len(tokens) == 0:
        return "Unknown"

    indices = [vocab.get(token, vocab.get("<UNK>", 1)) for token in tokens]
    tensor = torch.tensor(indices, dtype=torch.long).unsqueeze(0).to(device)  # [1, seq_len]
    lengths = torch.tensor([len(indices)]).to(device)

    with torch.no_grad():
        logits = model(tensor, lengths)
        probs = torch.softmax(logits, dim=1)
        predicted_class = torch.argmax(probs, dim=1).item()

    return categories[predicted_class]

# Tests

In [None]:
model = LSTMClassifier(emb_matrix, len(CATEGORIES), False, num_layers=2)

model.load_state_dict(torch.load("best_model.pt"))

model.to(device)

model.eval()

In [None]:
# inv_cats = {  }
inv_cats = {idx:word for idx, word in enumerate(CATEGORIES)}

In [None]:
descriptions = [
    "President addresses the nation on upcoming policy changes and budget reforms.",
    "10 mindfulness techniques to reduce daily stress and improve sleep.",
    "Top 5 destinations in Europe to visit this summer on a budget.",
    "Actor surprises fans with unexpected appearance at film festival.",
    "Senate debates controversial law affecting voting rights across states.",
    "Healthy breakfast recipes to kickstart your metabolism every morning.",
    "Behind the scenes of the latest Hollywood blockbuster release.",
    "Best hiking trails in Southeast Asia for nature lovers and adventurers.",
    "Government releases new guidelines for international trade relations.",
    "Yoga and breathing exercises that can ease anxiety and boost focus.",
    "Trump's voters want to see the Epstein files - but have faith in their president",
    "Hidden in a quiet Italian town is one of the world's most unique art schools – and a rewarding destination for curious travellers."
]

In [None]:
for desc in descriptions:
    predicted = predict_category(
        desc,
        model=model,
        vocab=vocab,
        categories=inv_cats,
        device=device,
        tokenize=tokenize,
        PAD_IDX=PAD_IDX
    )
    print("Description:", desc)
    print("Predicted category:", predicted)
    print("-----------------")

In [None]:
descs = [
    # PARENTING
    "Tips for new parents to help toddlers sleep through the night.",
    "How to talk to your teenager about social media and mental health.",

    # ENTERTAINMENT
    "A behind-the-scenes look at the latest Marvel movie production.",
    "Famous pop singer drops surprise album and breaks streaming records.",

    # POLITICS
    "The Senate passes a new climate bill after weeks of negotiations.",
    "President addresses economic recovery in latest press conference.",

    # WELLNESS
    "10 easy yoga poses to reduce anxiety and improve posture.",
    "Experts share strategies to maintain emotional well-being during winter.",

    # STYLE & BEAUTY
    "Fall fashion trends to refresh your wardrobe this season.",
    "The rise of sustainable beauty brands in the cosmetics industry.",
    "Ready to refresh your wardrobe for this summer with the president of the USA?",

    # TRAVEL
    "Top 10 hidden islands to explore in Southeast Asia.",
    "A guide to experiencing Paris like a local on a weekend trip."
]


for desc in descs:
    predicted = predict_category(
        desc,
        model=model,
        vocab=vocab,
        categories=inv_cats,
        device=device,
        tokenize=tokenize,
        PAD_IDX=PAD_IDX
    )
    print("Description:", desc)
    print("Predicted category:", predicted)
    print("-----------------")

In [None]:
CATEGORIES

In [None]:
sports_descriptions = [
    "Local football team wins the championship after dramatic penalty shootout.",
    "Olympic sprinter sets new world record in 100-meter dash.",
    "Top 10 workouts professional athletes use to stay in peak condition.",
    "Basketball legend announces retirement after two decades in the sport.",
    "How to train for your first marathon: tips from elite runners.",
    "The science behind muscle recovery and post-workout nutrition.",
    "Why yoga is becoming a staple in NFL players' training routines.",
    "Highlights from last night's thrilling NBA playoff game.",
    "How mental resilience separates top athletes from the rest.",
    "Teen gymnast stuns judges with flawless Olympic routine."
]

for desc in sports_descriptions:
    predicted = predict_category(
        desc,
        model=model,
        vocab=vocab,
        categories=inv_cats,
        device=device,
        tokenize=tokenize,
        PAD_IDX=PAD_IDX
    )
    print("Description:", desc)
    print("Predicted category:", predicted)
    print("-----------------")