In [34]:
# Данный ноутбук использовал окружение google-colab
# %pip install catboost fasttext -q

Ошибка в формуле Wordpiece на семинаре

$$
\text{score} = \frac{\text{freq\_of\_pair}}{\text{freq\_of\_first\_element}\times \text{freq\_of\_second\_element}}
$$

```bash
conda create -n hw9 python=3.10 -y

conda activate hw9

conda list

conda install numpy matplotlib pandas scipy jupyter notebook ipykernel -y

python -m ipykernel install --user --name=hw9_env --display-name="Python3.10 (hw9_env)"

which python

nvidia-smi

pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu130

python -c "import torch; print(f'PyTorch: {torch.__version__}'); print(f'CUDA: {torch.cuda.is_available()}')"

pip3 install transformers fasttext datasets 
```

# Домашнее задание "NLP. Часть 1"

In [35]:
import math
import re
import os
import random
import json
from collections import Counter, defaultdict
from typing import List, Dict, Tuple, Any

import torch
import numpy as np
import datasets
import fasttext
import fasttext.util
from transformers import BertTokenizer, BertModel

In [36]:
def seed_everything(seed: int):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(52)

In [37]:
def normalize_pretokenize_text(text: str) -> List[str]:
    text = text.lower()
    words = re.findall(r'\b\w+\b', text)
    return words

In [38]:
# This block is for tests only
test_corpus = [
    "the quick brown fox jumps over the lazy dog",
    "never jump over the lazy dog quickly",
    "brown foxes are quick and dogs are lazy"
]

def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
    all_words = []
    for text in texts:
        words = normalize_pretokenize_text(text)
        all_words.extend(words)
    vocab = sorted(set(all_words))
    vocab_index = {word: idx for idx, word in enumerate(vocab)}
    return vocab, vocab_index

vocab, vocab_index = build_vocab(test_corpus)

Тогда, переопределяем

In [39]:
def build_vocab(texts: List[str], top_k: int = None, min_freq: int = 1) -> Tuple[List[str], Dict[str,int]]:
    cnt = Counter()
    for t in texts:
        cnt.update(normalize_pretokenize_text(t))
    items = [(w,f) for w,f in cnt.items() if f >= min_freq]
    items.sort(key=lambda x: -x[1])
    if top_k:
        items = items[:top_k]
    vocab = [w for w,_ in items]
    vocab_index = {w:i for i,w in enumerate(vocab)}
    return vocab, vocab_index

In [40]:
vocab

['and',
 'are',
 'brown',
 'dog',
 'dogs',
 'fox',
 'foxes',
 'jump',
 'jumps',
 'lazy',
 'never',
 'over',
 'quick',
 'quickly',
 'the']

In [41]:
vocab_index

{'and': 0,
 'are': 1,
 'brown': 2,
 'dog': 3,
 'dogs': 4,
 'fox': 5,
 'foxes': 6,
 'jump': 7,
 'jumps': 8,
 'lazy': 9,
 'never': 10,
 'over': 11,
 'quick': 12,
 'quickly': 13,
 'the': 14}

## Задание 1 (0.5 балла)
Реализовать One-Hot векторизацию текстов

In [42]:
def one_hot_vectorization(
    text: str,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None
) -> List[List[int]]:
    words = normalize_pretokenize_text(text)
    vocab_size = len(vocab)
    one_hot_vectors = []

    for word in words:
        vector = [0] * vocab_size
        
        if word in vocab_index:
            idx = vocab_index[word]
            vector[idx] = 1
        
        one_hot_vectors.append(vector)
    
    return one_hot_vectors


def test_one_hot_vectorization(
    vocab: List[str],
    vocab_index: Dict[str, int]
) -> bool:
    try:
        text = "the quick brown fox"
        result = one_hot_vectorization(text, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result[0]) != expected_length:
            return False

        words_in_text = normalize_pretokenize_text(text)
        for i, word in enumerate(words_in_text):
            if word in vocab_index:
                idx = vocab_index[word]
                if result[i][idx] != 1:
                    return False

        print("One-Hot-Vectors test PASSED")

        return True
    except Exception as e:
        print(f"One-Hot-Vectors test FAILED: {e}")
        return False

In [43]:
text = "the quick brown fox"
one_hot_vectorization(text, vocab, vocab_index)

[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
 [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
 [0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]]

In [44]:
text = "the quick brown fox abc and fox"
one_hot_vectorization(text, vocab, vocab_index)

[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
 [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
 [0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]]

In [45]:
assert test_one_hot_vectorization(vocab, vocab_index)

One-Hot-Vectors test PASSED


Придётся переопределить

In [46]:
def one_hot_vectorization(
    text: str,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None
) -> List[int]:
    words = set(normalize_pretokenize_text(text))
    vector = [0] * len(vocab)
    for w in words:
        if w in vocab_index:
            vector[vocab_index[w]] = 1
    return vector

In [47]:
text = "the quick brown fox"
one_hot_vectorization(text, vocab, vocab_index)

[0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 1]

In [48]:
text = "the quick fox abc brown fox"
one_hot_vectorization(text, vocab, vocab_index)

[0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 1]

__Минусы:__

* Высокая размерность 
* Нет информации о семантической близости слов
* Разреженные векторы

## Задание 2 (0.5 балла)
Реализовать Bag-of-Words

__BoW - мешок слов:__

* Учитывается частота каждого слова
* Не учитывается порядок слов
* Результат - словарь {слово: количество_вхождений}

__Отличие от One-Hot:__

* One-Hot: 0 или 1 (бинарное представление) для каждого слова
* BoW: подсчёт частот (может быть > 1)

In [49]:
def bag_of_words_vectorization(text: str) -> Dict[str, int]:
    # Не зря же в начале импортировали Counter
    words = normalize_pretokenize_text(text)
    return dict(Counter(words))

def test_bag_of_words_vectorization() -> bool:
    try:
        text = "the the quick brown brown brown"
        result = bag_of_words_vectorization(text)

        if not isinstance(result, dict):
            return False

        if result.get('the', 0) != 2:
            return False
        if result.get('quick', 0) != 1:
            return False
        if result.get('brown', 0) != 3:
            return False
        if result.get('nonexistent', 0) != 0:
            return False

        print("Bad-of-Words test PASSED")
        return True
    except Exception as e:
        print(f"Bag-of-Words test FAILED: {e}")
        return False

In [50]:
text = "the the quick brown brown brown"
bag_of_words_vectorization(text)

{'the': 2, 'quick': 1, 'brown': 3}

In [51]:
assert test_bag_of_words_vectorization()  # Добавить ()text

Bad-of-Words test PASSED


## Задание 3 (0.5 балла)
Реализовать TF-IDF

**TF-IDF (Term Frequency - Inverse Document Frequency)** - это статистическая мера важности слова в документе относительно коллекции документов (корпуса).

**Формулы:**

$$\text{TF-IDF}(t, d) = \text{TF}(t, d) \times \text{IDF}(t)$$

$$\text{TF}(t, d) = \frac{\text{count}(t \text{ in } d)}{\text{total words in } d}$$

$$\text{IDF}(t) = \log \frac{N}{1 + |\{d \in D : t \in d\}|}$$

Где:
- `t` - термин (слово)
- `d` - документ
- `N` - общее количество документов
- `|{d ∈ D : t ∈ d}|` - количество документов, содержащих термин `t`. Опционально - Лаплас сглаживание


In [52]:
def tf_idf_vectorization(
        text: str,
        corpus: List[str] = None,
        vocab: List[str] = None,
        vocab_index: Dict[str, int] = None  # не исплользуем
) -> List[float]:
    
    words = normalize_pretokenize_text(text)
    
    word_count = Counter(words)
    total_words = len(words)
    tf = {}
    for word in vocab:
        tf[word] = word_count.get(word, 0) / total_words if total_words > 0 else 0
    
    # Inverse Document Frequency
    N = len(corpus)
    idf = {}
    
    for word in vocab:
        # Считаем в скольких документах встречается слово
        doc_count = 0
        for doc in corpus:
            doc_words = normalize_pretokenize_text(doc)
            if word in doc_words:
                doc_count += 1
        
        # IDF = log(N / (1 + doc_count))
        # +1 в знаменателе для избежания деления на 0 (Лаплас)
        idf[word] = math.log(N / (1 + doc_count))
    
    # Вычисляем TF-IDF
    tfidf_vector = []
    for word in vocab:
        tfidf_value = tf[word] * idf[word]
        tfidf_vector.append(tfidf_value)
    
    return tfidf_vector

def test_tf_idf_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "the quick brown"
        result = tf_idf_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("TF-IDF test PASSED")
        return True
    except Exception as e:
        print(f"TF-IDF test FAILED: {e}")
        return False

In [53]:
assert test_tf_idf_vectorization(test_corpus, vocab, vocab_index)

TF-IDF test PASSED


## Задание 4 (1 балл)
Реализовать Positive Pointwise Mutual Information (PPMI).  
https://en.wikipedia.org/wiki/Pointwise_mutual_information

$$
PPMI(word, context) = max(0, PMI(word, context))
$$

<br>

$$
PMI(word, context) = log \frac{P(word, context)}  {P(word) \times P(context)} = log \frac{N(word, context)\times|(word, context)|}{N(word) \times N(context)}
$$

<br>

Где:
- `N(word, context)` - число вхождений слова `word` в окно `context` (размер окна - гиперпараметр)
- `|(word, context)|` — сколько раз встречается context

Выводы:

- Если слова "king" и "queen" часто встречаются рядом -> высокий PMI
- Если "the" встречается со всеми словами -> низкий PMI
- PPMI обрезает отрицательные значения <- (max(0, PMI))

In [54]:
def ppmi_vectorization(
    text: str,
    corpus: List[str] = None,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None,
    window_size: int = 2
) -> List[float]:
    
    words = normalize_pretokenize_text(text)
    
    word_counts = Counter()  # Сколько раз встречается каждое слово
    context_counts = Counter()  # Сколько раз слово является контекстом
    pair_counts = defaultdict(int)  # Сколько раз пара (word, context) встречается
    total_pairs = 0
    
    for doc in corpus:
        doc_words = normalize_pretokenize_text(doc)
        
        # Обновляем счётчик слов
        word_counts.update(doc_words)
        
        # Для каждого слова считаем контекст в окне
        for i, target_word in enumerate(doc_words):
            # Левое окно: [i-window_size, i)
            # Правое окно: (i, i+window_size]
            start = max(0, i - window_size)
            end = min(len(doc_words), i + window_size + 1)
            
            for j in range(start, end):
                if i != j:  # Не берём само слово
                    context_word = doc_words[j]
                    pair_counts[(target_word, context_word)] += 1
                    context_counts[context_word] += 1
                    total_pairs += 1
    
    # Вычисляем PPMI для слов из текущего текста
    ppmi_vector = []
    
    for context_word in vocab:
        # Собираем PMI для всех слов в тексте относительно context_word
        pmi_sum = 0.0
        count = 0
        
        for target_word in words:
            if target_word in word_counts:
                # Количество пар (target_word, context_word)
                n_word_context = pair_counts.get((target_word, context_word), 0)
                
                if n_word_context > 0:

                    p_word_context = n_word_context / total_pairs
                    
                    p_word = word_counts[target_word] / sum(word_counts.values())
                    
                    p_context = context_counts[context_word] / sum(context_counts.values())
                    
                    # PMI = log(P(word, context) / (P(word) * P(context)))
                    pmi = math.log(p_word_context / (p_word * p_context))
                    
                    ppmi = max(0, pmi)
                    
                    pmi_sum += ppmi
                    count += 1
        
        # Усредняем PPMI по всем словам в тексте
        avg_ppmi = pmi_sum / count if count > 0 else 0.0
        ppmi_vector.append(avg_ppmi)
    
    return ppmi_vector

def test_ppmi_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "quick brown fox"
        result = ppmi_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("PPMI test PASSED")
        return True
    except Exception as e:
        print(f"PPMI test FAILED: {e}")
        return False

In [55]:
assert test_ppmi_vectorization(test_corpus, vocab, vocab_index)

PPMI test PASSED


## Задание 5 (1 балл)
Реализовать получение эмбеддингов из fasttext и bert (для bert лучше использовать CLS токен)

__FastText__

* Основан на Word2Vec, но учитывает подслова (subwords)
* Может работать с незнакомыми словами (out-of-vocabulary)


__BERT (Bidirectional Encoder Representations from Transformers)__
* Основан на архитектуре Transformer
* Контекстуальные эмбеддинги: одно слово имеет __разные векторы в разных контекстах__

In [56]:
def get_fasttext_embeddings(
        text: str, 
        model_path: str = None, 
        model: any = None
) -> List[np.ndarray]:
    
    # Загружаем модель, если она не передана
    if model is None:
        if model_path is None:
            # Скачиваем предобученную модель (английский язык)
            fasttext.util.download_model('en', if_exists='ignore')
            model_path = 'cc.en.300.bin'
        
        model = fasttext.load_model(model_path)
    
    words = normalize_pretokenize_text(text)
    
    embeddings = []
    for word in words:
        # get_word_vector возвращает numpy array размером (300,)
        word_embedding = model.get_word_vector(word)
        embeddings.append(word_embedding)
    
    return embeddings

In [57]:
%%time
sh = np.array([get_fasttext_embeddings('I am ml engineer')])
sh.shape

CPU times: user 1.26 s, sys: 14.2 s, total: 15.4 s
Wall time: 16.4 s


(1, 4, 300)

`pool_method: str = 'cls'`  
- mean: для семантического поиска, сравнения документов или max - для выделения ключевых признаков, semantic similarity, поиск
- max: вытягивает самые «сильные» активации, иногда полезно для извлечения маркерных признаков (ключевых слов, эмоций)

In [58]:
def get_bert_embeddings(
    text: str,
    model_name: str = 'bert-base-uncased',
    pool_method: str = 'cls' 
) -> np.ndarray:
    """
    Возвращает один эмбеддинг текста на основе выбранного метода пуллинга.
    """

    # Загружаем токенизатор и модель
    tokenizer = BertTokenizer.from_pretrained(model_name)
    model = BertModel.from_pretrained(model_name)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    
    # Переводим модель в режим inference (отключаем dropout)
    model.eval()    
    
    # см ниже
    inputs = tokenizer(
        text,
        return_tensors='pt',  # возвращает PyTorch тензоры
        padding=True,
        truncation=True,
        max_length=512  # Лимит длина для BERT
    )

    inputs = {k: v.to(device) for k, v in inputs.items()}
    # print(inputs['input_ids'].shape)  
    # print(inputs['attention_mask'].shape) 

    with torch.no_grad():
        outputs = model(**inputs)
    
    # Извлекаем эмбеддинги
    last_hidden_state = outputs.last_hidden_state  # (1, seq_len, 768) # hidden_size = 768 для bert-base-uncased
    
    # Применяем pooling
    if pool_method == 'cls':
        # Берём эмбеддинг токена [CLS] (первый токен)
        embedding = last_hidden_state[0, 0, :].detach().cpu().numpy()  # (768,)
        
    else:
        raise ValueError(f"Unk: {pool_method}")
    
    
    return embedding

In [59]:
%%time
sh = np.array([get_bert_embeddings('I am ml engineer')])
sh.shape

CPU times: user 219 ms, sys: 431 ms, total: 650 ms
Wall time: 2.23 s


(1, 768)

`inputs = tokenizer():`

- tokenizer разбивает текст на WordPiece токены, добавляет специальные [CLS] и [SEP]
- Формирует input_ids (индексы токенов) и attention_mask (1 — реальный токен, 0 — паддинг)


В стандартном BertModel (из transformers) кроме last_hidden_state возвращается (если не выключено) ещё pooler_output:

* Берётся вектор CLS: h_cls = last_hidden_state[:, 0] (shape: (batch, 768))
* Прогоняется через линейный слой + tanh:
* pooler_output = tanh(W * h_cls + b)
* Обучалось в pretraining для задачи Next Sentence Prediction (NSP)
* Этот слой заточен под NSP

#### Оптимизация get_bert_embeddings (кэширование модели + избегаем повторной загрузки)

Проблема: сейчас модель и токенизатор грузятся при каждом вызове огромные накладные расходы (IO + cuda init)

In [60]:
_DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
_BERT_TOKENIZER = None
_BERT_MODEL = None

def init_bert(model_name: str = 'bert-base-uncased'):
    global _BERT_TOKENIZER, _BERT_MODEL
    if _BERT_MODEL is None:
        _BERT_TOKENIZER = BertTokenizer.from_pretrained(model_name)
        _BERT_MODEL = BertModel.from_pretrained(model_name)
        _BERT_MODEL.to(_DEVICE).eval()

def get_bert_embeddings(
    text: str,
    pool_method: str = 'cls'
) -> np.ndarray:
    init_bert()
    enc = _BERT_TOKENIZER(
        text,
        return_tensors='pt',
        padding=True,
        truncation=True,
        max_length=512
    )
    enc = {k: v.to(_DEVICE) for k, v in enc.items()}
    with torch.no_grad():
        out = _BERT_MODEL(**enc)
    hidden = out.last_hidden_state  # (1, L, 768)
    if pool_method == 'cls':
        emb = hidden[0, 0]

    else:
        raise ValueError(pool_method)
    return emb.detach().cpu().numpy()

Теперь:

* `init_bert` обеспечивает однократную загрузку
* Сокращает время векторизации корпуса в разы

Аналогично для Fasttext

In [61]:
_FASTTEXT_MODEL = None

def init_fasttext(lang: str = 'en'):
    global _FASTTEXT_MODEL
    if _FASTTEXT_MODEL is None:
        fasttext.util.download_model(lang, if_exists='ignore')
        _FASTTEXT_MODEL = fasttext.load_model(f'cc.{lang}.300.bin')

def get_fasttext_mean_embedding(text: str) -> List[float]:
    init_fasttext()
    words = normalize_pretokenize_text(text)
    if not words:
        return [0.0]*300
    vecs = [_FASTTEXT_MODEL.get_word_vector(w) for w in words]
    return np.mean(vecs, axis=0).tolist()

## Задание 6 (1.5 балла)
Реализовать обучение так, чтобы можно было поверх эмбеддингов, реализованных в предыдущих заданиях, обучить какую-то модель (вероятно неглубокую, например, CatBoost) на задаче классификации текстов ([IMDB](https://huggingface.co/datasets/stanfordnlp/imdb)).

### Единая длина признаков внутри одного метода

CatBoost (и большинство ML-моделей) требует одинаковую длину feature в каждой строке


Поэтому:
- строим vocab только по train, затем при векторизации test использем тот же vocab (а неизвестные слова игнор)
- PPMI/TF-IDF  важна консистентность корпуса (для честности лучше вычислять IDF только на train, а test использовать предрасчитанные значения)


In [62]:
def compute_idf(corpus: List[str], vocab: List[str]) -> Dict[str,float]:
    N = len(corpus)
    df = {w:0 for w in vocab}
    for doc in corpus:
        seen = set(normalize_pretokenize_text(doc))
        for w in seen:
            if w in df:
                df[w] += 1
    idf = {w: math.log(N / (1 + df[w])) for w in vocab}
    return idf

def tf_idf_vector(text: str, vocab: List[str], idf: Dict[str,float]) -> List[float]:
    words = normalize_pretokenize_text(text)
    cnt = Counter(words)
    total = len(words) or 1
    vec = []
    for w in vocab:
        tf = cnt.get(w,0)/total
        vec.append(tf * idf[w])
    return vec

In [63]:
def compute_ppmi_stats(corpus: List[str], vocab: List[str], window_size: int = 2):
    pair_counts = Counter()
    word_counts = Counter()
    context_counts = Counter()
    for doc in corpus:
        doc_words = normalize_pretokenize_text(doc)
        word_counts.update(doc_words)
        for i,w in enumerate(doc_words):
            start = max(0, i-window_size)
            end = min(len(doc_words), i+window_size+1)
            for j in range(start,end):
                if i==j: continue
                c = doc_words[j]
                pair_counts[(w,c)] += 1
                context_counts[c] += 1
    total_pairs = sum(pair_counts.values()) or 1
    return pair_counts, word_counts, context_counts, total_pairs

def ppmi_vector_fast(text: str,
                     vocab: List[str],
                     pair_counts: Counter,
                     word_counts: Counter,
                     context_counts: Counter,
                     total_pairs: int) -> List[float]:
    words = set(normalize_pretokenize_text(text))
    sum_words = sum(word_counts.values()) or 1
    sum_context = sum(context_counts.values()) or 1
    vec=[]
    for ctx in vocab:
        pmi_sum=0.0
        cnt=0
        for w in words:
            n = pair_counts.get((w,ctx),0)
            if n==0: continue
            p_wc = n/total_pairs
            p_w = word_counts[w]/sum_words
            p_c = context_counts[ctx]/sum_context
            pmi = math.log(p_wc/(p_w*p_c))
            pmi_sum += max(0,pmi)
            cnt +=1
        vec.append(pmi_sum/cnt if cnt else 0.0)
    return vec

vectorize_dataset тоже обновлю

как минимум отсутсвует `return vocab, vectorized_data, labels`

In [64]:
def vectorize_dataset(
    dataset_name: str = "imdb",
    vectorizer_type: str = "bow",
    split: str = "train",
    sample_size: int = 2500,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None,
    idf: Dict[str, float] = None,
    ppmi_stats: Tuple = None,
    top_k: int = 5000
) -> Tuple[List[str], Dict[str, int], Any, List[List[float]], List[int]]:
    """
    Возвращает:
      vocab, vocab_index,
      stats (idf для tfidf или (pair_counts, word_counts, context_counts, total_pairs) для ppmi),
      vectors, labels
    """
    dataset = datasets.load_dataset("imdb", split=split).shuffle(seed=42)  # CatBoost не обучается, когда y содержит единственное уникальное значение
    if sample_size:
        dataset = dataset.select(range(sample_size))

    texts = [item['text'] for item in dataset if 'text' in item and item['text'].strip()]
    labels = [item['label'] for item in dataset if 'label' in item]

    # Строим vocab только если не передан (т.е. train)
    if vocab is None or vocab_index is None:
        vocab, vocab_index = build_vocab(texts, top_k=top_k)

    # Предрасчёт статистик только на train
    if split == "train":
        if vectorizer_type == "tfidf" and idf is None:
            idf = compute_idf(texts, vocab)
        if vectorizer_type == "ppmi" and ppmi_stats is None:
            ppmi_stats = compute_ppmi_stats(texts, vocab)

    vectors = []
    for text in texts:
        if vectorizer_type == "one_hot":
            vectors.append(one_hot_vectorization(text, vocab, vocab_index))
        elif vectorizer_type == "bow":
            bow_dict = bag_of_words_vectorization(text)
            vectors.append([bow_dict.get(w, 0) for w in vocab])
        elif vectorizer_type == "tfidf":
            if idf is None:
                raise ValueError("idf must be provided for test split")
            vectors.append(tf_idf_vector(text, vocab, idf))
        elif vectorizer_type == "ppmi":
            if ppmi_stats is None:
                raise ValueError("ppmi_stats must be provided for test split")
            vectors.append(ppmi_vector_fast(text, vocab, *ppmi_stats))
        elif vectorizer_type == "fasttext":
            vectors.append(get_fasttext_mean_embedding(text))
        elif vectorizer_type == "bert":
            emb = get_bert_embeddings(text)
            vectors.append(emb.tolist())
        else:
            raise ValueError(f"Unknown vectorizer_type: {vectorizer_type}")

    # Проверка одинаковой длины
    if vectors:
        base_len = len(vectors[0])
        for v in vectors:
            assert len(v) == base_len, "Feature length mismatch"

    stats = idf if vectorizer_type == "tfidf" else ppmi_stats
    return vocab, vocab_index, stats, vectors, labels

1. после теста переопределю `one_hot_vectorization`

In [65]:
from catboost import CatBoostClassifier
from sklearn.metrics import classification_report, accuracy_score, f1_score
from sklearn.model_selection import train_test_split, cross_val_score, KFold

def train(
    embeddings_method: str = "bow",
    sample_size: int = 1500,
    random_state: int = 42
):
    print(f"\n=== {embeddings_method.upper()} ===")
    vocab, vocab_index, stats, X_train_raw, y_train = vectorize_dataset(
        "imdb",
        embeddings_method,
        "train",
        sample_size=sample_size
    )

    # Подставляем нужные статистики для test
    idf = stats if embeddings_method == "tfidf" else None
    ppmi_stats = stats if embeddings_method == "ppmi" else None

    _, _, _, X_test_raw, y_test = vectorize_dataset(
        "imdb",
        embeddings_method,
        "test",
        sample_size=sample_size,
        vocab=vocab,
        vocab_index=vocab_index,
        idf=idf,
        ppmi_stats=ppmi_stats
    )

    X_train_raw = np.array(X_train_raw, dtype=np.float32)
    y_train = np.array(y_train)
    X_test_raw = np.array(X_test_raw, dtype=np.float32)
    y_test = np.array(y_test)

    X_tr, X_val, y_tr, y_val = train_test_split(
        X_train_raw,
        y_train,
        test_size=0.2,
        random_state=random_state,
        stratify=y_train
    )

    model = CatBoostClassifier(
        iterations=250,
        depth=4,
        learning_rate=0.1,
        loss_function="Logloss",
        verbose=50,
        random_seed=random_state
    )
    model.fit(X_tr, y_tr, eval_set=(X_val, y_val), use_best_model=True)

    val_pred = model.predict(X_val)
    test_pred = model.predict(X_test_raw)

    val_acc = accuracy_score(y_val, val_pred)
    val_f1 = f1_score(y_val, val_pred)
    test_acc = accuracy_score(y_test, test_pred)
    test_f1 = f1_score(y_test, test_pred)

    print(f"Val: acc={val_acc:.4f} f1={val_f1:.4f}")
    print(f"Test: acc={test_acc:.4f} f1={test_f1:.4f}")

    return dict(val_acc=val_acc, val_f1=val_f1, test_acc=test_acc, test_f1=test_f1)

In [66]:
results = {}
for m in ["bow", "one_hot", "tfidf", "fasttext", "bert"]:
    results[m] = train(m, sample_size=800)
print(results)


=== BOW ===
0:	learn: 0.6808517	test: 0.6876472	best: 0.6876472 (0)	total: 15.8ms	remaining: 3.94s
50:	learn: 0.4210989	test: 0.5632028	best: 0.5617851 (49)	total: 222ms	remaining: 868ms
100:	learn: 0.2666398	test: 0.5254912	best: 0.5240647 (94)	total: 380ms	remaining: 560ms
150:	learn: 0.1845556	test: 0.5176552	best: 0.5137885 (145)	total: 539ms	remaining: 353ms
200:	learn: 0.1321614	test: 0.5112358	best: 0.5112358 (200)	total: 687ms	remaining: 167ms
249:	learn: 0.1026263	test: 0.5112262	best: 0.5091709 (237)	total: 854ms	remaining: 0us

bestTest = 0.5091709188
bestIteration = 237

Shrink model to first 238 iterations.
Val: acc=0.7500 f1=0.7531
Test: acc=0.7875 f1=0.7942

=== ONE_HOT ===
0:	learn: 0.6764145	test: 0.6883271	best: 0.6883271 (0)	total: 10.5ms	remaining: 2.61s
50:	learn: 0.4119559	test: 0.5670598	best: 0.5670598 (50)	total: 183ms	remaining: 713ms
100:	learn: 0.2752558	test: 0.5352554	best: 0.5346894 (97)	total: 413ms	remaining: 609ms
150:	learn: 0.1889934	test: 0.5230152

Не ожидал что будут такие хорошие метрики даже с такой реализацией

Более того, ohe не сильно хуже bert, но это скорее всего из-за маленткой выборки