In [1]:
# Данный ноутбук использовал окружение google-colab
%pip install catboost fasttext -q
%pip install datasets



# Домашнее задание "NLP. Часть 1"

In [2]:
import math
import re
import os
import random
import json
from collections import Counter, defaultdict
from typing import List, Dict, Tuple, Any

import torch
import numpy as np
import datasets
import fasttext
import fasttext.util
from transformers import BertTokenizer, BertModel



In [3]:
def seed_everything(seed: int):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(42)

In [4]:
def normalize_pretokenize_text(text: str) -> List[str]:
    text = text.lower()
    words = re.findall(r'\b\w+\b', text)
    return words

In [5]:
# This block is for tests only
test_corpus = [
    "the quick brown fox jumps over the lazy dog",
    "never jump over the lazy dog quickly",
    "brown foxes are quick and dogs are lazy"
]

def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
    all_words = []
    for text in texts:
        words = normalize_pretokenize_text(text)
        all_words.extend(words)
    vocab = sorted(set(all_words))
    vocab_index = {word: idx for idx, word in enumerate(vocab)}
    return vocab, vocab_index

vocab, vocab_index = build_vocab(test_corpus)

## Задание 1 (0.5 балла)
Реализовать One-Hot векторизацию текстов

In [6]:
def one_hot_vectorization(
    text: str, vocab: List[str] = None, vocab_index: Dict[str, int] = None
) -> List[List[int]]:
    tokens = normalize_pretokenize_text(text)

    if vocab is None and vocab_index is None:
        vocab = sorted(set(tokens))
        vocab_index = {word: idx for idx, word in enumerate(vocab)}
    elif vocab_index is None and vocab is not None:
        vocab_index = {word: idx for idx, word in enumerate(vocab)}
    elif vocab is None and vocab_index is not None:
        vocab = [None] * len(vocab_index)
        for word, idx in vocab_index.items():
            vocab[idx] = word

    result = []

    for word in tokens:
        vector = [0] * len(vocab)
        idx = vocab_index.get(word)

        if (idx is not None) and (0 <= idx < len(vocab)):
            vector[idx] = 1
        result.append(vector)

    return result


def test_one_hot_vectorization(vocab: List[str], vocab_index: Dict[str, int]) -> bool:
    try:
        text = "the quick brown fox"
        result = one_hot_vectorization(text, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result[0]) != expected_length:
            return False

        words_in_text = normalize_pretokenize_text(text)
        for i, word in enumerate(words_in_text):
            if word in vocab_index:
                idx = vocab_index[word]
                if result[i][idx] != 1:
                    return False

        print("One-Hot-Vectors test PASSED")

        return True
    except Exception as e:
        print(f"One-Hot-Vectors test FAILED: {e}")
        return False

In [7]:
assert test_one_hot_vectorization(vocab, vocab_index)

One-Hot-Vectors test PASSED


## Задание 2 (0.5 балла)
Реализовать Bag-of-Words

In [8]:
def bag_of_words_vectorization(text: str) -> Dict[str, int]:
    tokens = normalize_pretokenize_text(text)
    counter = {}
    for token in tokens:
        counter[token] = counter.get(token, 0) + 1
    return counter


def test_bag_of_words_vectorization() -> bool:
    try:
        text = "the the quick brown brown brown"
        result = bag_of_words_vectorization(text)

        if not isinstance(result, dict):
            return False

        if result.get("the", 0) != 2:
            return False
        if result.get("quick", 0) != 1:
            return False
        if result.get("brown", 0) != 3:
            return False
        if result.get("nonexistent", 0) != 0:
            return False

        print("Bad-of-Words test PASSED")
        return True
    except Exception as e:
        print(f"Bag-of-Words test FAILED: {e}")
        return False

In [9]:
assert test_bag_of_words_vectorization()

Bad-of-Words test PASSED


## Задание 3 (0.5 балла)
Реализовать TF-IDF

In [10]:
def tf_idf_vectorization(
    text: str,
    corpus: List[str] = None,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None,
) -> List[float]:
    tokens = normalize_pretokenize_text(text)

    if vocab is None and vocab_index is None:
        return []
    elif vocab_index is None and vocab is not None:
        vocab_index = {word: idx for idx, word in enumerate(vocab)}
    elif vocab is None and vocab_index is not None:
        vocab = [None] * len(vocab_index)
        for word, idx in vocab_index.items():
            vocab[idx] = word

    document_frequency = {word: 0 for word in vocab}

    for document in corpus:

        token_doc = set(normalize_pretokenize_text(document))

        for tok in token_doc:
            if tok in document_frequency:
                document_frequency[tok] += 1

    terf_frequency = {token: (tokens.count(token) / len(token)) for token in tokens}

    vector = [0.0] * len(vocab)

    for word, idx in vocab_index.items():
        if word in terf_frequency:
            idf = math.log(len(corpus) / (1 + document_frequency[word]))
            vector[idx] = terf_frequency[word] * idf

    return vector


def test_tf_idf_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "the quick brown"
        result = tf_idf_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("TF-IDF test PASSED")
        return True
    except Exception as e:
        print(f"TF-IDF test FAILED: {e}")
        return False

In [11]:
assert test_tf_idf_vectorization(test_corpus, vocab, vocab_index)

TF-IDF test PASSED


## Задание 4 (1 балл)
Реализовать Positive Pointwise Mutual Information (PPMI).  
https://en.wikipedia.org/wiki/Pointwise_mutual_information
$$PPMI(word, context) = max(0, PMI(word, context))$$
$$PMI(word, context) = log \frac{P(word, context)}{P(word) P(context)} = log \frac{N(word, context)|(word, context)|}{N(word) N(context)}$$
где $N(word, context)$ -- число вхождений слова $word$ в окно $context$ (размер окна -- гиперпараметр)

In [12]:
def ppmi_vectorization(
    text: str,
    corpus: List[str] = None,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None,
    window_size: int = 2,
) -> List[float]:

    if corpus is None:
        corpus = text

    if vocab_index is None and vocab is not None:
        vocab_index = {word: idx for idx, word in enumerate(vocab)}

    elif vocab is None and vocab_index is not None:
        vocab = [None] * len(vocab_index)
        for word, idx in vocab_index.items():
            vocab[idx] = word

    word_in_context = defaultdict(int)
    word_counts = defaultdict(int)
    context_counts = defaultdict(int)

    for document in corpus:
        tokens = normalize_pretokenize_text(document)
        for idx, tok in enumerate(tokens):
            left = max(0, idx - window_size)
            right = min(len(tokens), idx + window_size + 1)

            for j in range(left, right):
                if j == idx:
                    continue
                context = tokens[j]
                if context not in vocab_index:
                    continue
                word_in_context[(tok, context)] += 1
                word_counts[tok] += 1
                context_counts[context] += 1

    total_pairs = sum(word_in_context.values())
    tokens = normalize_pretokenize_text(text)
    ppmi = [0.0] * len(vocab)

    for tok in tokens:
        if tok not in word_counts:
            continue
        tok_count = word_counts[tok]

        for context, idx in vocab_index.items():
            pair = (tok, context)
            word_in_context_count = word_in_context.get(pair, 0)
            if word_in_context_count == 0:
                continue

            context_count = context_counts.get(context, 0)
            if context_count == 0:
                continue

            pmi = math.log(
                (word_in_context_count * total_pairs) / (tok_count * context_count)
            )
            if pmi > 0:
                ppmi[idx] += pmi

    return ppmi


def test_ppmi_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "quick brown fox"
        result = ppmi_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("PPMI test PASSED")
        return True
    except Exception as e:
        print(f"PPMI test FAILED: {e}")
        return False

In [13]:
assert test_ppmi_vectorization(test_corpus, vocab, vocab_index)

PPMI test PASSED


## Задание 5 (1 балл)
Реализовать получение эмбеддингов из fasttext и bert (для bert лучше использовать CLS токен)

In [14]:
import fasttext
import fasttext.util

FASTTEXT_MODEL = None


def get_fasttext_embeddings(
    text: str, model_path: str = None, model: any = None
) -> List[np.ndarray]:

    global FASTTEXT_MODEL

    tokens = normalize_pretokenize_text(text)

    if not tokens:
      return []

    if model is not None:
        load_model = model
    else:
        if FASTTEXT_MODEL is not None:
            load_model = FASTTEXT_MODEL
        else:
            if model_path is None:
                fasttext.util.download_model("en", if_exists="ignore")
                model_path = "cc.en.300.bin"

            load_model = fasttext.load_model(model_path)
            FASTTEXT_MODEL = load_model

    embeddings = []
    for tok in tokens:
        vector = load_model.get_word_vector(tok)
        embeddings.append(vector)

    return embeddings

In [15]:
BERT_MODEL = {}


def get_bert_embeddings(
    text: str, model_name: str = "bert-base-uncased", pool_method: str = "cls"
) -> np.ndarray:

    global BERT_MODEL

    if text is None:
        text = ""
    text = text.strip()

    if model_name in BERT_MODEL:
        tokenizer, model = BERT_MODEL[model_name]
    else:
        tokenizer = BertTokenizer.from_pretrained(model_name)
        model = BertModel.from_pretrained(model_name)
        model.eval()
        BERT_MODEL[model_name] = (tokenizer, model)

    encoded = tokenizer(text, return_tensors="pt", truncation=True, padding=True)

    with torch.no_grad():
        outputs = model(**encoded)

    last_hidden = outputs.last_hidden_state  # тензор последнего слоя

    if pool_method == "cls":
        # last_hidden[батч, длина послед, размер эмбеддинга]
        vec = last_hidden[:, 0, :]  # извлекли CLC

    vec = vec.squeeze(0).numpy()

    return vec.astype(np.float32)

## Задание 6 (1.5 балла)
Реализовать обучение так, чтобы можно было поверх эмбеддингов, реализованных в предыдущих заданиях, обучить какую-то модель (вероятно неглубокую, например, CatBoost) на задаче классификации текстов ([IMDB](https://huggingface.co/datasets/stanfordnlp/imdb)).

In [16]:
def vectorize_dataset(
    dataset_name: str = "imdb",
    vectorizer_type: str = "bow",
    split: str = "train",
    sample_size: int = 2500,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None
) -> Tuple[Any, List, List]:

    dataset = datasets.load_dataset(dataset_name, split=split)
    dataset = dataset.shuffle(seed=21).select(range(min(sample_size, len(dataset))))

    if sample_size:
        dataset = dataset.select(range(min(sample_size, len(dataset))))

    texts = [item['text'] for item in dataset if 'text' in item and item['text'].strip()]
    labels = [item['label'] for item in dataset if 'label' in item]

    def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
        all_words = []
        for text in texts:
            words = normalize_pretokenize_text(text)
            all_words.extend(words)
        vocab = sorted(set(all_words))
        vocab_index = {word: idx for idx, word in enumerate(vocab)}
        return vocab, vocab_index

    if vocab is None or vocab_index is None:
      vocab, vocab_index = build_vocab(texts)

    vectorized_data = []
    for text in texts:
        if vectorizer_type == "one_hot":
            word_vectors = one_hot_vectorization(text, vocab, vocab_index)

            if word_vectors:
              doc_vector = np.max(word_vectors, axis=0)
            else:
              doc_vector = np.zeros(len(vocab), dtype=int)

            vectorized_data.append(doc_vector.tolist())

        elif vectorizer_type == "bow":
            bow_dict = bag_of_words_vectorization(text)
            vector = [bow_dict.get(word, 0) for word in vocab]
            vectorized_data.append(vector)
        elif vectorizer_type == "tfidf":
            vectorized_data.append(tf_idf_vectorization(text, texts, vocab, vocab_index))
        elif vectorizer_type == "ppmi":
            vectorized_data.append(ppmi_vectorization(text, texts, vocab, vocab_index))
        elif vectorizer_type == "fasttext":
            embeddings = get_fasttext_embeddings(text)
            if embeddings:
                avg_embedding = np.mean(embeddings, axis=0)
                vectorized_data.append(avg_embedding.tolist())
            else:
                vectorized_data.append([0] * 300)
        elif vectorizer_type == "bert":
            embedding = get_bert_embeddings(text)
            vectorized_data.append(embedding.tolist())
        else:
            raise ValueError(f"Unknown vectorizer type: {vectorizer_type}")

    return vocab, vectorized_data, labels

In [17]:
from catboost import CatBoostClassifier
from sklearn.metrics import classification_report, accuracy_score, f1_score
from sklearn.model_selection import train_test_split, cross_val_score, KFold


def train(embeddings_method="bow", test_size=0.2, val_size=0.2, cv_folds=5, sample_size=2500):

    vocab, x, y = vectorize_dataset("imdb", embeddings_method, "train", sample_size=sample_size,vocab=None, vocab_index=None)

    vocab_index = {word: idx for idx, word in enumerate(vocab)}

    _, x_test, y_test = vectorize_dataset("imdb", embeddings_method, "test", sample_size=sample_size, vocab=vocab, vocab_index=vocab_index)

    x = np.array(x, dtype=np.float32)
    y = np.array(y, dtype=int)
    x_test = np.array(x_test, dtype=np.float32)
    y_test = np.array(y_test, dtype=int)

    x_train, x_val, y_train, y_val = train_test_split(x, y, test_size=val_size, stratify=y,random_state=21)

    model = CatBoostClassifier(
        iterations=200,
        depth=6,
        learning_rate=0.1,
        loss_function="Logloss",
        eval_metric="Accuracy",
        verbose=True,
        random_state=21,
    )

    model.fit(x_train, y_train, eval_set=(x_val, y_val), verbose=False)

    y_val_pred = model.predict(x_val)
    val_acc = accuracy_score(y_val, y_val_pred)
    val_f1 = f1_score(y_val, y_val_pred)

    print(f"[{embeddings_method}] Val accuracy: {val_acc:.4f}, F1: {val_f1:.4f}" )

    y_test_pred = model.predict(x_test)
    test_acc = accuracy_score(y_test, y_test_pred)
    test_f1 = f1_score(y_test, y_test_pred)

    print(f"[{embeddings_method}] Test accuracy: {test_acc:.4f}, F1: {test_f1:.4f}")
    print("Classification report (test):")
    print(classification_report(y_test, y_test_pred, digits=4))

    kf = KFold(n_splits=cv_folds, shuffle=True, random_state=21)
    cv_model = CatBoostClassifier(
        iterations=200,
        depth=6,
        learning_rate=0.1,
        loss_function="Logloss",
        eval_metric="Accuracy",
        verbose=False,
        random_state=21,
    )
    cv_scores = cross_val_score(cv_model, x, y, cv=kf, scoring="accuracy")

    print( f"[{embeddings_method}] CV accuracy: {cv_scores.mean():.4f} ± {cv_scores.std():.4f}")
    print("=" * 80)

    return model

In [18]:
for embeddings_method in ["one_hot", "bow", "tfidf", "ppmi", "fasttext", "bert"]:
    train(embeddings_method=embeddings_method,sample_size=500)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


[one_hot] Val accuracy: 0.7700, F1: 0.7810
[one_hot] Test accuracy: 0.6860, F1: 0.7270
Classification report (test):
              precision    recall  f1-score   support

           0     0.7571    0.5403    0.6306       248
           1     0.6471    0.8294    0.7270       252

    accuracy                         0.6860       500
   macro avg     0.7021    0.6848    0.6788       500
weighted avg     0.7016    0.6860    0.6792       500

[one_hot] CV accuracy: 0.7500 ± 0.0219
[bow] Val accuracy: 0.7900, F1: 0.7921
[bow] Test accuracy: 0.7080, F1: 0.7256
Classification report (test):
              precision    recall  f1-score   support

           0     0.7318    0.6492    0.6880       248
           1     0.6893    0.7659    0.7256       252

    accuracy                         0.7080       500
   macro avg     0.7106    0.7075    0.7068       500
weighted avg     0.7104    0.7080    0.7069       500

[bow] CV accuracy: 0.7360 ± 0.0427
[tfidf] Val accuracy: 0.7500, F1: 0.7573
[tfid

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

[bert] Val accuracy: 0.7800, F1: 0.7556
[bert] Test accuracy: 0.7940, F1: 0.7867
Classification report (test):
              precision    recall  f1-score   support

           0     0.7695    0.8347    0.8008       248
           1     0.8225    0.7540    0.7867       252

    accuracy                         0.7940       500
   macro avg     0.7960    0.7943    0.7938       500
weighted avg     0.7962    0.7940    0.7937       500

[bert] CV accuracy: 0.8060 ± 0.0258


### Вывод

1. One-hot, BOW, TF-IDF 
	- На валидационном наборе: accuracy = 0.75–0.79, F1 = 0.75–0.79 — выглядит адекватно;
	- На тесте: accuracy=0.68–0.73, F1=0.72–0.75;
	- для one-hot есть перекос в сторону класса 1 (recall(1) ~0.83, recall(0) ~0.54);
	- у BOW/TF-IDF баланс по классам лучше, оба класса ловятся, но всё равно немного перетягивается positive;

В целом разреженные представления (one-hot, BOW, TF-IDF) на тесте работоспособны, заметного переобучения нет, но чувствительность к дисбалансу классов сохраняется.

2. PPMI
	- Валид: ~0.69 accuracy.
	- Тест: accuracy ~0.53, F1 для класса 1 заметно проседает (0.34), при этом класс 0 модель ловит намного лучше (F1=0.63).

То есть PPMI даёт почти случайное качество на тесте и сильно смещён к предсказанию одного класса. В отличие от BOW/TF-IDF/One-hot, здесь обобщающая способность заметно хуже — на тесте модель предсказывает случайно.

3. fastText
	- Валид: 0.74 accuracy, F1 = 0.72.
	- Тест: 0.742 accuracy, F1 = 0.74, классы 0 и 1 предсказываются почти симметрично (precision/recall ~0.73–0.75).

Это первый метод, который даёт устойчивое и ровное качество и на валидации, и на тесте. Эмбеддинги fastText хорошо обобщаются на маленькой подвыборке IMDB и заметно стабильнее PPMI и базовых разреженных представлений.

4. BERT
	- Валидация: 0.78 accuracy, F1 = 0.76.
	- Тест: 0.794 accuracy, F1 = 0.79, оба класса распознаются примерно одинаково хорошо — лучший результат среди всех методов по accuracy/F1 и по стабильности (CV = 0.81).

Причина: BERT даёт контекстные эмбеддинги предложений. CLS-вектор учитывает порядок слов и контекст, поэтому на вход CatBoost приходит гораздо более информативное и «плотное» представление текста.

Итог:
Для этой настройки ДЗ (малый sample_size=500, простой CatBoost):

1.	Разреженные классические представления (one-hot, BOW, TF-IDF, PPMI): по CV/валидации выглядят неплохо, и на тесте (кроме PPMI) дают вполне приемлемое качество. Однако PPMI сильно отстаёт и на тесте уходит в перекос по классам.

2.	fastText: даёт уже хорошие, устойчивые результаты и адекватное обобщение train -> test, по качеству находится на уровне или чуть выше лучших разреженных методов, при этом сохраняет баланс по классам.

3.	BERT (CLS): показывает лучшее качество и на CV, и на тесте, даёт хороший баланс по классам и остаётся самым стабильным методом на малой выборке за счёт сильной предобученной языковой модели и контекстных признаков.