In [1]:
# Данный ноутбук не использовал окружение google-colab
!python3.9 -m pip install catboost fasttext-wheel -q


[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: C:\Users\USER\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


In [2]:
!python3.9 -m pip install datasets




[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: C:\Users\USER\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


# Домашнее задание "NLP. Часть 1"

In [3]:
import math
import re
import os
import random
import json
from collections import Counter, defaultdict
from typing import List, Dict, Tuple, Any

import torch
import numpy as np
import datasets
import fasttext
import fasttext.util
from transformers import BertTokenizer, BertModel

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
def seed_everything(seed: int):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(42)

In [5]:
def normalize_pretokenize_text(text: str) -> List[str]:
    text = text.lower()
    words = re.findall(r'\b\w+\b', text)
    return words

In [6]:
# This block is for tests only
test_corpus = [
    "the quick brown fox jumps over the lazy dog",
    "never jump over the lazy dog quickly",
    "brown foxes are quick and dogs are lazy"
]

def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
    all_words = []
    for text in texts:
        words = normalize_pretokenize_text(text)
        all_words.extend(words)
    vocab = sorted(set(all_words))
    vocab_index = {word: idx for idx, word in enumerate(vocab)}
    return vocab, vocab_index

vocab, vocab_index = build_vocab(test_corpus)

## Задание 1 (0.5 балла)
Реализовать One-Hot векторизацию текстов

In [7]:
def one_hot_vectorization(
    text: str,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None
) -> List[List[int]]:
    words = normalize_pretokenize_text(text)
    
    one_hot_vectors = []
    vocab_size = len(vocab) if vocab is not None else len(vocab_index)
    
    for word in words:
        vector = [0] * vocab_size
        if word in vocab_index:
            idx = vocab_index[word]
            vector[idx] = 1  
        one_hot_vectors.append(vector)
    
    return one_hot_vectors


def test_one_hot_vectorization(
    vocab: List[str],
    vocab_index: Dict[str, int]
) -> bool:
    try:
        text = "the quick brown fox"
        result = one_hot_vectorization(text, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result[0]) != expected_length:
            return False

        words_in_text = normalize_pretokenize_text(text)
        for i, word in enumerate(words_in_text):
            if word in vocab_index:
                idx = vocab_index[word]
                if result[i][idx] != 1:
                    return False

        print("One-Hot-Vectors test PASSED")

        return True
    except Exception as e:
        print(f"One-Hot-Vectors test FAILED: {e}")
        return False

In [8]:
assert test_one_hot_vectorization(vocab, vocab_index)

One-Hot-Vectors test PASSED


## Задание 2 (0.5 балла)
Реализовать Bag-of-Words

In [9]:
def bag_of_words_vectorization(text: str) -> Dict[str, int]:
    words = normalize_pretokenize_text(text)
    bag_dict = {}

    for word in words:
        bag_dict[word] = bag_dict.get(word, 0) + 1
    
    return bag_dict
    

def test_bag_of_words_vectorization() -> bool:
    try:
        text = "the the quick brown brown brown"
        result = bag_of_words_vectorization(text)

        if not isinstance(result, dict):
            return False

        if result.get('the', 0) != 2:
            return False
        if result.get('quick', 0) != 1:
            return False
        if result.get('brown', 0) != 3:
            return False
        if result.get('nonexistent', 0) != 0:
            return False

        print("Bad-of-Words test PASSED")
        return True
    except Exception as e:
        print(f"Bag-of-Words test FAILED: {e}")
        return False

In [10]:
assert test_bag_of_words_vectorization()

Bad-of-Words test PASSED


## Задание 3 (0.5 балла)
Реализовать TF-IDF

In [11]:
def tf_idf_vectorization(text: str, corpus: List[str] = None, vocab: List[str] = None, vocab_index: Dict[str, int] = None) -> List[float]:
    # TF
    words = normalize_pretokenize_text(text)
    word_counts = Counter(words)
    total_words = len(words)
    
    tf_vector = [0.0] * len(vocab)
    for word, count in word_counts.items():
        if word in vocab_index:
            idx = vocab_index[word]
            tf_vector[idx] = count / total_words if total_words > 0 else 0
    
    # IDF
    doc_count = len(corpus)
    idf_vector = [0.0] * len(vocab)

    new_corpus = [normalize_pretokenize_text(doc) for doc in corpus]
    
    for i, word in enumerate(vocab):
        docs_with_word = sum(1 for doc in new_corpus if word in doc)
        idf_vector[i] = math.log(doc_count / docs_with_word) if docs_with_word > 0 else 0
    
    # TF-IDF
    tfidf_vector = [tf_vector[i] * idf_vector[i] for i in range(len(vocab))]
    
    return tfidf_vector


def test_tf_idf_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "the quick brown"
        result = tf_idf_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("TF-IDF test PASSED")
        return True
    except Exception as e:
        print(f"TF-IDF test FAILED: {e}")
        return False

In [12]:
assert test_tf_idf_vectorization(test_corpus, vocab, vocab_index)

TF-IDF test PASSED


## Задание 4 (1 балл)
Реализовать Positive Pointwise Mutual Information (PPMI).  
https://en.wikipedia.org/wiki/Pointwise_mutual_information
$$PPMI(word, context) = max(0, PMI(word, context))$$
$$PMI(word, context) = log \frac{P(word, context)}{P(word) P(context)} = log \frac{N(word, context)|(word, context)|}{N(word) N(context)}$$
где $N(word, context)$ -- число вхождений слова $word$ в окно $context$ (размер окна -- гиперпараметр)

In [13]:
def ppmi_vectorization(
    text: str,
    corpus: List[str] = None,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None,
    window_size: int = 2
) -> List[float]:

    word_context_counts = defaultdict(int)
    word_counts = defaultdict(int)
    context_counts = defaultdict(int)
    total_pairs = 0
    
    # обработка всего корпуса и сбор статистик
    for doc in corpus:
        words = normalize_pretokenize_text(doc)
        for i, target_word in enumerate(words):
            word_counts[target_word] += 1
            
            # начало и конец контекстного окна вокруг target_word
            start = max(0, i - window_size)
            end = min(len(words), i + window_size + 1)
            
            for j in range(start, end):
                # исключается само слово
                if i != j:
                    context_word = words[j]
                    word_context_counts[(target_word, context_word)] += 1
                    context_counts[context_word] += 1
                    total_pairs += 1
    
    # вычисление PPMI для текста
    words_in_text = normalize_pretokenize_text(text)
    ppmi_vector = [0.0] * len(vocab)
    
    for i, context_word in enumerate(vocab):
        ppmi_sum = 0.0
        count = 0
        
        for target_word in words_in_text:
            pair_count = word_context_counts.get((target_word, context_word), 0)
            if pair_count > 0:
                p_word_context = pair_count / total_pairs
                p_word = word_counts[target_word] / len([w for doc in corpus for w in normalize_pretokenize_text(doc)])
                p_context = context_counts[context_word] / total_pairs
                
                pmi = math.log(p_word_context / (p_word * p_context)) if p_word * p_context > 0 else 0
                ppmi = max(0, pmi)
                ppmi_sum += ppmi
                count += 1
        
        ppmi_vector[i] = ppmi_sum / count if count > 0 else 0.0
    
    return ppmi_vector

def test_ppmi_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "quick brown fox"
        result = ppmi_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("PPMI test PASSED")
        return True
    except Exception as e:
        print(f"PPMI test FAILED: {e}")
        return False

In [14]:
assert test_ppmi_vectorization(test_corpus, vocab, vocab_index)

PPMI test PASSED


## Задание 5 (1 балл)
Реализовать получение эмбеддингов из fasttext и bert (для bert лучше использовать CLS токен)

In [None]:
def get_fasttext_embeddings(text: str, model_path: str = None, model: any = None) -> List[np.ndarray]:
    if model is None:
        if not os.path.exists(model_path):
            fasttext.util.download_model('en', if_exists='ignore')
        model = fasttext.load_model(model_path)
        
    words = normalize_pretokenize_text(text)

    embeddings = []
    for word in words:
        embedding = model.get_word_vector(word)
        embeddings.append(embedding)
    
    return embeddings

In [16]:
def get_bert_embeddings(
    text: str,
    model_name: str = 'bert-base-uncased',
    pool_method: str = 'cls'
) -> np.ndarray:
    # Hint: Use CLS token embeddings

    tokenizer = BertTokenizer.from_pretrained(model_name)
    model = BertModel.from_pretrained(model_name)
    
    # токенизация текста
    inputs = tokenizer(text, return_tensors='pt', truncation=True, max_length=512, padding=True)
    
    with torch.no_grad():
        outputs = model(**inputs)
    
    # получение эмбеддингов
    if pool_method == 'cls':
        embedding = outputs.last_hidden_state[:, 0, :].numpy()[0]
    elif pool_method == 'mean':
        embedding = outputs.last_hidden_state.mean(dim=1).numpy()[0]
    else:
        raise ValueError("pool_method должен быть 'cls' или 'mean'")
    
    return embedding

## Задание 6 (1.5 балла)
Реализовать обучение так, чтобы можно было поверх эмбеддингов, реализованных в предыдущих заданиях, обучить какую-то модель (вероятно неглубокую, например, CatBoost) на задаче классификации текстов ([IMDB](https://huggingface.co/datasets/stanfordnlp/imdb)).

In [None]:
def my_load_dataset(dataset_name: str, split: str, sample_size: int):
    dataset = datasets.load_dataset(dataset_name, split=split)
    dataset = dataset.shuffle(seed=42)
    if sample_size:
        dataset = dataset.select(range(min(sample_size, len(dataset))))
    return dataset

In [None]:
def vectorize_dataset(
    dataset_name: str = "imdb",
    vectorizer_type: str = "bow",
    split: str = "train",
    sample_size: int = 2500,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None
) -> Tuple[Any, List, List]:

    dataset = my_load_dataset(dataset_name, split, sample_size)

    if sample_size:
        dataset = dataset.select(range(min(sample_size, len(dataset))))

    texts = [item['text'] for item in dataset if 'text' in item and item['text'].strip()]
    labels = [item['label'] for item in dataset if 'label' in item]

    def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
        all_words = []
        for text in texts:
            words = normalize_pretokenize_text(text)
            all_words.extend(words)
        vocab = sorted(set(all_words))
        vocab_index = {word: idx for idx, word in enumerate(vocab)}
        return vocab, vocab_index

    # всегда создается vocab_index, если нужен для методов со словарем
    if vectorizer_type in ["one_hot", "bow", "tfidf", "ppmi"]:
        if vocab is None:
            vocab, vocab_index = build_vocab(texts)
            print(f"Created new vocabulary for {split}: {len(vocab)} words")
        else:
            vocab_index = {word: idx for idx, word in enumerate(vocab)}
            print(f"Using external vocabulary for {split}: {len(vocab)} words")

    vectorized_data = []
    processed_texts = []
    
    for i, text in enumerate(texts):
        if i % 100 == 0:
            print(i, '/', len(texts), 'texts done')
        if i == 5:
            break
            
        processed_texts.append(text)
        
        if vectorizer_type == "one_hot":
            word_vectors = one_hot_vectorization(text, vocab, vocab_index)
            if word_vectors:
                doc_vector = np.sum(word_vectors, axis=0)
                vectorized_data.append(doc_vector.tolist())
            else:
                vectorized_data.append([0] * len(vocab))
        elif vectorizer_type == "bow":
            bow_dict = bag_of_words_vectorization(text)
            vector = [bow_dict.get(word, 0) for word in vocab]
            vectorized_data.append(vector)
        elif vectorizer_type == "tfidf":
            vector = tf_idf_vectorization(text, processed_texts, vocab, vocab_index)
            vectorized_data.append(vector)
        elif vectorizer_type == "ppmi":
            vector = ppmi_vectorization(text, processed_texts, vocab, vocab_index)
            vectorized_data.append(vector)
        elif vectorizer_type == "fasttext":
            embeddings = get_fasttext_embeddings(text)
            if embeddings:
                avg_embedding = np.mean(embeddings, axis=0)
                vectorized_data.append(avg_embedding.tolist())
            else:
                vectorized_data.append([0] * 300)
        elif vectorizer_type == "bert":
            embedding = get_bert_embeddings(text)
            vectorized_data.append(embedding.tolist())
        else:
            raise ValueError(f"Unknown vectorizer type: {vectorizer_type}")
    
    return vocab, vectorized_data, labels[:len(vectorized_data)]

In [None]:
from catboost import CatBoostClassifier
from sklearn.metrics import classification_report, accuracy_score, f1_score
from sklearn.model_selection import cross_val_score, StratifiedKFold
import numpy as np

def train(
    embeddings_method="bow",
    test_size=0.2,
    val_size=0.2,
    cv_folds=5
):
    vocab_train, X_train, y_train = vectorize_dataset("imdb", embeddings_method, "train")
    vocab_test, X_test, y_test = vectorize_dataset("imdb", embeddings_method, "test", 
                                                  vocab=vocab_train)

    print(f"Training with {embeddings_method} embeddings")
    print(f"Train data: {len(X_train)} samples, Test data: {len(X_test)} samples")

    X_train = np.array(X_train)
    y_train = np.array(y_train)
    X_test = np.array(X_test)
    y_test = np.array(y_test)

    # проверка, что представлено больше 1 класса
    unique_train = np.unique(y_train)
    unique_test = np.unique(y_test)
    if len(unique_train) < 2:
        print(f"Warning: Only one class present in training data: {unique_train}")
        return None, 0, 0
    if len(unique_test) < 2:
        print(f"Warning: Only one class present in test data: {unique_test}")
        return None, 0, 0

    # вывод в виде словаря: сколько представителей какого класса
    print(f"Train classes: {dict(zip(*np.unique(y_train, return_counts=True)))}")
    print(f"Test classes: {dict(zip(*np.unique(y_test, return_counts=True)))}")
    
    model = CatBoostClassifier(
        iterations=100,
        learning_rate=0.1,
        depth=6,
        random_seed=42,
        verbose=False
    )

    try:
        kf = StratifiedKFold(n_splits=min(cv_folds, len(unique_train)), shuffle=True, random_state=42)
        cv_scores = cross_val_score(model, X_train, y_train, cv=kf, scoring='accuracy')
        print(f"Cross-validation scores: {cv_scores}")
        print(f"Mean CV accuracy: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
    except Exception as e:
        print(f"Cross-validation failed: {e}")
        cv_scores = [0]
    
    model.fit(X_train, y_train, verbose=False)
    
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='binary')
    
    print(f"Test Accuracy: {accuracy:.4f}")
    print(f"Test F1-score: {f1:.4f}")
    print("\nClassification Report:")
    print(classification_report(y_test, y_pred))
    
    return model, accuracy, f1

In [24]:
for embeddings_method in ["bow", "one_hot", "tfidf", "ppmi", "fasttext", "bert"]:
    print('=' * 100)
    train(embeddings_method=embeddings_method)

Created new vocabulary for train: 27892 words
0 / 2500 texts done
Using external vocabulary for test: 27892 words
0 / 2500 texts done
Training with bow embeddings
Train data: 5 samples, Test data: 5 samples
Train classes: {np.int64(0): np.int64(2), np.int64(1): np.int64(3)}
Test classes: {np.int64(0): np.int64(2), np.int64(1): np.int64(3)}
Cross-validation scores: [0.66666667 0.5       ]
Mean CV accuracy: 0.5833 (+/- 0.1667)
Test Accuracy: 0.6000
Test F1-score: 0.7500

Classification Report:
              precision    recall  f1-score   support

           0       0.00      0.00      0.00         2
           1       0.60      1.00      0.75         3

    accuracy                           0.60         5
   macro avg       0.30      0.50      0.38         5
weighted avg       0.36      0.60      0.45         5



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Created new vocabulary for train: 27892 words
0 / 2500 texts done
Using external vocabulary for test: 27892 words
0 / 2500 texts done
Training with one_hot embeddings
Train data: 5 samples, Test data: 5 samples
Train classes: {np.int64(0): np.int64(2), np.int64(1): np.int64(3)}
Test classes: {np.int64(0): np.int64(2), np.int64(1): np.int64(3)}
Cross-validation scores: [0.66666667 0.5       ]
Mean CV accuracy: 0.5833 (+/- 0.1667)
Test Accuracy: 0.6000
Test F1-score: 0.7500

Classification Report:
              precision    recall  f1-score   support

           0       0.00      0.00      0.00         2
           1       0.60      1.00      0.75         3

    accuracy                           0.60         5
   macro avg       0.30      0.50      0.38         5
weighted avg       0.36      0.60      0.45         5



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Created new vocabulary for train: 27892 words
0 / 2500 texts done
Using external vocabulary for test: 27892 words
0 / 2500 texts done
Training with tfidf embeddings
Train data: 5 samples, Test data: 5 samples
Train classes: {np.int64(0): np.int64(2), np.int64(1): np.int64(3)}
Test classes: {np.int64(0): np.int64(2), np.int64(1): np.int64(3)}
Cross-validation scores: [0.66666667 0.5       ]
Mean CV accuracy: 0.5833 (+/- 0.1667)
Test Accuracy: 0.6000
Test F1-score: 0.7500

Classification Report:
              precision    recall  f1-score   support

           0       0.00      0.00      0.00         2
           1       0.60      1.00      0.75         3

    accuracy                           0.60         5
   macro avg       0.30      0.50      0.38         5
weighted avg       0.36      0.60      0.45         5



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Created new vocabulary for train: 27892 words
0 / 2500 texts done
Using external vocabulary for test: 27892 words
0 / 2500 texts done
Training with ppmi embeddings
Train data: 5 samples, Test data: 5 samples
Train classes: {np.int64(0): np.int64(2), np.int64(1): np.int64(3)}
Test classes: {np.int64(0): np.int64(2), np.int64(1): np.int64(3)}
Cross-validation scores: [0.  0.5]
Mean CV accuracy: 0.2500 (+/- 0.5000)
Test Accuracy: 0.6000
Test F1-score: 0.7500

Classification Report:
              precision    recall  f1-score   support

           0       0.00      0.00      0.00         2
           1       0.60      1.00      0.75         3

    accuracy                           0.60         5
   macro avg       0.30      0.50      0.38         5
weighted avg       0.36      0.60      0.45         5

0 / 2500 texts done
Downloading https://dl.fbaipublicfiles.com/fasttext/vectors-crawl/cc.en.300.bin.gz


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))






0 / 2500 texts done




Training with fasttext embeddings
Train data: 5 samples, Test data: 5 samples
Train classes: {np.int64(0): np.int64(2), np.int64(1): np.int64(3)}
Test classes: {np.int64(0): np.int64(2), np.int64(1): np.int64(3)}
Cross-validation scores: [0.33333333 0.        ]
Mean CV accuracy: 0.1667 (+/- 0.3333)
Test Accuracy: 0.4000
Test F1-score: 0.4000

Classification Report:
              precision    recall  f1-score   support

           0       0.33      0.50      0.40         2
           1       0.50      0.33      0.40         3

    accuracy                           0.40         5
   macro avg       0.42      0.42      0.40         5
weighted avg       0.43      0.40      0.40         5

0 / 2500 texts done


Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


0 / 2500 texts done
Training with bert embeddings
Train data: 5 samples, Test data: 5 samples
Train classes: {np.int64(0): np.int64(2), np.int64(1): np.int64(3)}
Test classes: {np.int64(0): np.int64(2), np.int64(1): np.int64(3)}
Cross-validation scores: [0.  0.5]
Mean CV accuracy: 0.2500 (+/- 0.5000)
Test Accuracy: 0.6000
Test F1-score: 0.7500

Classification Report:
              precision    recall  f1-score   support

           0       0.00      0.00      0.00         2
           1       0.60      1.00      0.75         3

    accuracy                           0.60         5
   macro avg       0.30      0.50      0.38         5
weighted avg       0.36      0.60      0.45         5



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
