In [1]:
# Данный ноутбук использовал окружение google-colab
# %pip install catboost fasttext -q

# Домашнее задание "NLP. Часть 1"

In [2]:
import math
import re
import os
import random
import json
from collections import Counter, defaultdict
from typing import List, Dict, Tuple, Any

import torch
import numpy as np
import datasets
import fasttext
import fasttext.util
from transformers import BertTokenizer, BertModel

2025-11-04 15:41:17.345110: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-11-04 15:41:17.832686: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2025-11-04 15:41:19.225975: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.


In [3]:
def seed_everything(seed: int):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(42)

In [4]:
def normalize_pretokenize_text(text: str) -> List[str]:
    text = text.lower()
    words = re.findall(r'\b\w+\b', text)
    return words

In [5]:
# This block is for tests only
test_corpus = [
    "the quick brown fox jumps over the lazy dog",
    "never jump over the lazy dog quickly",
    "brown foxes are quick and dogs are lazy"
]

def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
    all_words = []
    for text in texts:
        words = normalize_pretokenize_text(text)
        all_words.extend(words)
    vocab = sorted(set(all_words))
    vocab_index = {word: idx for idx, word in enumerate(vocab)}
    return vocab, vocab_index

vocab, vocab_index = build_vocab(test_corpus)

## Задание 1 (0.5 балла)
Реализовать One-Hot векторизацию текстов

In [6]:
def one_hot_vectorization(text: str, vocab: List[str] = None, vocab_index: Dict[str, int] = None) -> List[int]:
    vector_size = len(vocab)
    res = []
    words = normalize_pretokenize_text(text)
    for word in words:
        vec = [0] * vector_size
        vec[vocab_index[word]] = 1
        res.append(vec)
    return res

def test_one_hot_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "the quick brown fox"
        result = one_hot_vectorization(text, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result[0]) != expected_length:
            return False

        words_in_text = normalize_pretokenize_text(text)
        for i, word in enumerate(words_in_text):
            if word in vocab_index:
                idx = vocab_index[word]
                if result[i][idx] != 1:
                    return False

        print("One-Hot-Vectors test PASSED")

        return True
    except Exception as e:
        print(f"One-Hot-Vectors test FAILED: {e}")
        return False

In [7]:
assert test_one_hot_vectorization(test_corpus, vocab, vocab_index)


One-Hot-Vectors test PASSED


## Задание 2 (0.5 балла)
Реализовать Bag-of-Words

In [8]:
def bag_of_words_vectorization(text: str) -> Dict[str, int]:
    return dict(Counter(normalize_pretokenize_text(text)))

def test_bag_of_words_vectorization() -> bool:
    try:
        text = "the the quick brown brown brown"
        result = bag_of_words_vectorization(text)

        if not isinstance(result, dict):
            return False

        if result.get('the', 0) != 2:
            return False
        if result.get('quick', 0) != 1:
            return False
        if result.get('brown', 0) != 3:
            return False
        if result.get('nonexistent', 0) != 0:
            return False

        print("Bad-of-Words test PASSED")
        return True
    except Exception as e:
        print(f"Bag-of-Words test FAILED: {e}")
        return False

In [9]:
assert test_bag_of_words_vectorization()

Bad-of-Words test PASSED


## Задание 3 (0.5 балла)
Реализовать TF-IDF

In [10]:
from math import log

def tf_transform(vector):
    doc_terms_count = sum(vector)
    print(doc_terms_count)
    if sum(vector) == 0:
        return vector
    else:
        return list(map(lambda k : round(k / doc_terms_count, 3), vector))

def idf_transform(count_matrix):
    D = len(count_matrix)

    n = len(count_matrix[0])
    
    counter_list = [0] * n
    for line in count_matrix:
        for j in range(n):
            if line[j] > 0:
                counter_list[j] += 1

    return [round(log((D + 1) / (el + 1)) + 1, 1) for el in counter_list]

In [11]:
from CountVectorizer import CountVectorizer
from typing import List, Dict

# Изначально была такая сигнатура, но из-за созданного класса Count Vectorizer не нужны параметры vocab и vocab_index: def tf_idf_vectorization(text: str, corpus: List[str] = None, vocab: List[str] = None, vocab_index: Dict[str, int] = None) -> List[float]:
def tf_idf_vectorization(text: str, corpus: List[str] = None) -> List[float]:
    vectorizer = CountVectorizer()
    count_matrix = vectorizer.fit_transform(corpus)
    res_idf = idf_transform(count_matrix)
    
    vector = vectorizer.transform(text)
    res_tf = tf_transform(vector)
    return (np.array(res_tf) * np.array(res_idf)).tolist()

# Аналогично: def test_tf_idf_vectorization(corpus, vocab, vocab_index) -> bool:
def test_tf_idf_vectorization(corpus) -> bool:
    try:
        text = "the quick brown"
        result = tf_idf_vectorization(text, corpus)
        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("TF-IDF test PASSED")
        return True
    except Exception as e:
        print(f"TF-IDF test FAILED: {e}")
        return False

In [12]:
assert test_tf_idf_vectorization(test_corpus)

3
TF-IDF test PASSED


Реализация всего TF-IDF можно найти в файле `tfidf.ipynb`

## Задание 4 (1 балл)
Реализовать Positive Pointwise Mutual Information (PPMI).  
https://en.wikipedia.org/wiki/Pointwise_mutual_information
$$PPMI(word, context) = max(0, PMI(word, context))$$
$$PMI(word, context) = log \frac{P(word, context)}{P(word) P(context)} = log \frac{N(word, context)|(word, context)|}{N(word) N(context)}$$
где $N(word, context)$ -- число вхождений слова $word$ в окно $context$ (размер окна -- гиперпараметр)

In [13]:
def ppmi_vectorization(
    text: str,
    corpus: List[str] = None,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None,
    window_size: int = 3
) -> List[float]:

    # Сначала обучение модели, где результат PPMI матрица
    total_windows = 0 
    matrix = np.zeros((len(vocab), len(vocab)))
    for sent in corpus:
        tokens = sent.lower().split()

        for i, center_word in enumerate(tokens):
            center_word_idx = vocab_index[center_word]
            
            start = max(0, i - window_size)
            end = min(len(tokens), i + window_size + 1)
            for j in range(start, end):
                if i == j: 
                    continue
                context_word = tokens[j]
                context_word_idx = vocab_index[context_word]

                matrix[center_word_idx, context_word_idx] += 1
                total_windows += 1

    N_center = matrix.sum(axis= 1)
    N_context = matrix.sum(axis= 0)
    ppmi_matrix = np.maximum(0, np.log((matrix * total_windows) / (N_center.reshape(-1, 1) * N_context.reshape(1, -1) + 1e-6)))

    # Инференс - просто берем слова из ppmi и усредняем их
    inf_tokens = text.lower().split()
    vectors = []
    for token in inf_tokens:
        vectors.append(ppmi_matrix[vocab_index[token]])
    
    return np.mean(vectors, axis= 0).tolist()

def test_ppmi_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "quick brown fox"
        result = ppmi_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("PPMI test PASSED")
        return True
    except Exception as e:
        print(f"PPMI test FAILED: {e}")
        return False

In [14]:
assert test_ppmi_vectorization(test_corpus, vocab, vocab_index)

PPMI test PASSED


  ppmi_matrix = np.maximum(0, np.log((matrix * total_windows) / (N_center.reshape(-1, 1) * N_context.reshape(1, -1) + 1e-6)))


## Задание 5 (1 балл)
Реализовать получение эмбеддингов из fasttext и bert (для bert лучше использовать CLS токен)

In [24]:
def get_fasttext_embeddings(text: str, model_path: str = 'cc.ru.300.bin', model: any = None) -> List[np.ndarray]:
    tokens = normalize_pretokenize_text(text)
    if model is None:
        if model_path:
            model = fasttext.load_model(model_path)
        else:
            print("Нужно передать саму модель или путь к ней")
    model : fasttext.FastText
    
    vectors = []
    for token in tokens:
        vectors.append(model.get_word_vector(token))

    return vectors

In [16]:
get_fasttext_embeddings("Как дела ?", 'cc.ru.300.bin')

[array([-0.02120868, -0.05426134, -0.03531325,  0.06754214,  0.0250404 ,
         0.01246092, -0.01154722, -0.00579364, -0.02345451,  0.03992337,
         0.12516876,  0.09337895,  0.00602307, -0.04591477,  0.04416794,
         0.00277691,  0.0861214 , -0.0433574 ,  0.04291598,  0.03403168,
        -0.03611547, -0.0012368 , -0.06990376,  0.04342816,  0.00067888,
        -0.02622069, -0.00836486, -0.06487729,  0.01786926, -0.00451234,
        -0.14011997,  0.05188541, -0.1190051 , -0.05928254,  0.06616351,
        -0.03893173,  0.11439636, -0.41910422, -0.10590064,  0.00920938,
        -0.05634374,  0.00225484,  0.00622392,  0.06729869, -0.11813046,
         0.02609018, -0.12659658,  0.03402869,  0.05733868,  0.00780046,
         0.00788761,  0.05200024, -0.03592969,  0.14234933, -0.00050532,
         0.05685663, -0.05067255, -0.04757975, -0.04882739, -0.01509907,
        -0.26460168, -0.01719018,  0.02710446, -0.11518938,  0.04558586,
         0.01119858, -0.00612038,  0.08909725,  0.0

In [17]:
def get_bert_embeddings(
    text: str,
    model_name: str = 'bert-base-uncased',
    pool_method: str = 'cls'
) -> np.ndarray:
    tokenizer = BertTokenizer.from_pretrained(model_name)
    model = BertModel.from_pretrained(model_name)

    model.eval()
    inputs = tokenizer(normalize_pretokenize_text(text), return_tensors= 'pt', truncation= True, padding= True, max_length= 512)
    with torch.no_grad():
        outputs = model(**inputs)
    last_hidden_state = outputs.last_hidden_state
    if pool_method == 'cls':
        sentence_embedding = last_hidden_state[0, 0, :]
    return sentence_embedding.cpu().numpy()

get_bert_embeddings("How many tokens")

array([-2.11662680e-01,  1.05942465e-01, -9.13959071e-02,  3.88246588e-02,
        4.02548164e-02, -9.39927995e-02,  1.00112945e-01,  1.49683088e-01,
        8.21771380e-03, -2.24750638e-01, -3.08279507e-02, -1.32319078e-01,
       -4.91843149e-02,  2.67478794e-01,  4.93385792e-02,  4.20542397e-02,
       -2.13875383e-01,  3.60601954e-02,  1.09178804e-01, -1.12872906e-01,
       -4.41864803e-02, -7.53838615e-03, -2.10865125e-01,  1.78922825e-02,
        9.60436314e-02,  2.21718960e-02,  4.33516242e-02, -3.18819851e-01,
        7.56375492e-02, -2.98691168e-02,  6.72985017e-02,  6.21551834e-02,
       -5.56395911e-02,  1.18431300e-01, -3.26865703e-01, -1.15149096e-02,
       -7.18370313e-04,  2.31557377e-02, -2.00062916e-01,  2.31627543e-02,
        9.22629088e-02, -1.53513283e-01,  2.75566250e-01, -1.05859660e-01,
       -7.10882246e-02, -4.85530645e-02, -1.68746352e+00, -2.08344217e-02,
       -2.86590874e-01, -2.08238006e-01, -3.52022871e-02,  3.82979847e-02,
        3.51018786e-01,  

## Задание 6 (1.5 балла)
Реализовать обучение так, чтобы можно было поверх эмбеддингов, реализованных в предыдущих заданиях, обучить какую-то модель (вероятно неглубокую, например, CatBoost) на задаче классификации текстов ([IMDB](https://huggingface.co/datasets/stanfordnlp/imdb)).

In [18]:
def vectorize_dataset(
    dataset_name: str = "stanfordnlp/imdb",
    vectorizer_type: str = "bow",
    split: str = "train",
    sample_size: int = 2500,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None
) -> Tuple[Any, List, List]:

    dataset = datasets.load_dataset(dataset_name, split=split)

    if sample_size:
        dataset = dataset.shuffle(seed=42)
        dataset = dataset.select(range(min(sample_size, len(dataset))))

    texts = [item['text'] for item in dataset if 'text' in item and item['text'].strip()]
    labels = [item['label'] for item in dataset if 'label' in item]

    def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
        all_words = []
        for text in texts:
            words = normalize_pretokenize_text(text)
            all_words.extend(words)
        vocab = sorted(set(all_words))
        vocab_index = {word: idx for idx, word in enumerate(vocab)}
        return vocab, vocab_index
    
    if vocab is None or vocab_index is None:
        print("Словарь не передан, строим новый на основе текущих данных...")    
        vocab, vocab_index = build_vocab(texts)
    else:
        print("Используем существующий словарь...")

    vectorized_data = []
    for text in texts:
        if vectorizer_type == "one_hot":
            vectorized_data.append(one_hot_vectorization(text, vocab, vocab_index))
        elif vectorizer_type == "bow":
            bow_dict = bag_of_words_vectorization(text)
            vector = [bow_dict.get(word, 0) for word in vocab]
            vectorized_data.append(vector)
        elif vectorizer_type == "tfidf":
            vectorized_data.append(tf_idf_vectorization(text, texts))
        elif vectorizer_type == "ppmi":
            vectorized_data.append(ppmi_vectorization(text, texts, vocab, vocab_index))
        elif vectorizer_type == "fasttext":
            embeddings = get_fasttext_embeddings(text)
            if embeddings:
                avg_embedding = np.mean(embeddings, axis=0)
                vectorized_data.append(avg_embedding.tolist())
            else:
                vectorized_data.append([0] * 300)
        elif vectorizer_type == "bert":
            embedding = get_bert_embeddings(text)
            vectorized_data.append(embedding.tolist())
        else:
            raise ValueError(f"Unknown vectorizer type: {vectorizer_type}")
    return vocab, vocab_index, vectorized_data, labels

In [19]:
from catboost import CatBoostClassifier
from sklearn.metrics import classification_report, accuracy_score, f1_score
from sklearn.model_selection import train_test_split, cross_val_score, KFold

def train(
    embeddings_method="bow",
    test_size=0.2,
    val_size=0.2,
    cv_folds=5
):
    print(f"=== Метод {embeddings_method} ===")
    vocab, vocab_index, X, y = vectorize_dataset("stanfordnlp/imdb", embeddings_method, "train")

    model_for_cv = CatBoostClassifier(iterations=100,
                                      verbose=0,
                                      random_state=42,
                                      task_type='GPU')


    kfold = KFold(n_splits=cv_folds, shuffle=True, random_state=42)

    scores = cross_val_score(model_for_cv, X, y, cv=kfold, scoring='accuracy')

    print("\n--- Результаты кросс-валидации ---")
    print(f"Точность на каждом фолде: {np.round(scores, 4)}")
    print(f"Средняя точность (CV Accuracy): {scores.mean():.4f} (+/- {scores.std():.4f})")
    print("=" * 50)

    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size= val_size, stratify= y)
    _, _, X_test, y_test = vectorize_dataset("stanfordnlp/imdb", embeddings_method, "test", vocab= vocab, vocab_index= vocab_index)

    model = CatBoostClassifier(iterations= 100,
                               verbose= 0,
                               random_state= 42,
                               task_type= 'GPU')
    model.fit(X_train, 
              y_train, 
              eval_set= (X_val, y_val))

    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average= 'weighted')
    report = classification_report(y_test, y_pred)
    
    print("\n--- Результаты ---")
    print(f"Точность (Accuracy) на тестовой выборке: {accuracy:.4f}")
    print(f"F1-мера (Weighted) на тестовой выборке: {f1:.4f}")
    print("\nПолный отчет по классификации:")
    print(report)
    print(f"{'='*50}")

In [22]:
train(embeddings_method='bow')

=== Метод bow ===
Словарь не передан, строим новый на основе текущих данных...

--- Результаты кросс-валидации ---
Точность на каждом фолде: [0.782 0.836 0.792 0.778 0.812]
Средняя точность (CV Accuracy): 0.8000 (+/- 0.0215)
Используем существующий словарь...

--- Результаты ---
Точность (Accuracy) на тестовой выборке: 0.8196
F1-мера (Weighted) на тестовой выборке: 0.8191

Полный отчет по классификации:
              precision    recall  f1-score   support

           0       0.85      0.77      0.81      1243
           1       0.79      0.87      0.83      1257

    accuracy                           0.82      2500
   macro avg       0.82      0.82      0.82      2500
weighted avg       0.82      0.82      0.82      2500

