# Домашнее задание "NLP. Часть 1"

In [100]:
import math
import re
import os
import random
import json
from collections import Counter, defaultdict
from typing import List, Dict, Tuple, Any

import torch
import numpy as np
import datasets
#import fasttext
#import fasttext.util Эти библиотеки несовместимы с моей версией python (3.13), заменил их на другие (см. Задание 5)

from transformers import BertTokenizer, BertModel

In [101]:
def seed_everything(seed: int):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(42)

In [102]:
def normalize_pretokenize_text(text: str) -> List[str]:
    text = text.lower()
    words = re.findall(r'\b\w+\b', text)
    return words

In [103]:
# This block is for tests only
test_corpus = [
    "the quick brown fox jumps over the lazy dog",
    "never jump over the lazy dog quickly",
    "brown foxes are quick and dogs are lazy"
]

def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
    all_words = []
    for text in texts:
        words = normalize_pretokenize_text(text)
        all_words.extend(words)
    vocab = sorted(set(all_words))
    vocab_index = {word: idx for idx, word in enumerate(vocab)}
    return vocab, vocab_index

vocab, vocab_index = build_vocab(test_corpus)

## Задание 1 (0.5 балла)
Реализовать One-Hot векторизацию текстов

In [104]:
from typing import List, Dict
def one_hot_vectorization(text: str, vocab: List[str] = None, vocab_index: Dict[str, int] = None) -> List[int]:
    vector = [0] * len(vocab) # Вектор длинной в словарь
    words = normalize_pretokenize_text(text)
     
    for word in words: # Устанавливаем 1 для каждого слова
        if word in vocab_index:
            position = vocab_index[word]
            vector[position] = 1
    
    return vector

def test_one_hot_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "the quick brown fox"
        result = one_hot_vectorization(text, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        words_in_text = normalize_pretokenize_text(text)
        for word in words_in_text:
            if word in vocab_index:
                idx = vocab_index[word]
                if result[idx] != 1:
                    return False

        print("One-Hot-Vectors test PASSED")

        return True
    except Exception as e:
        print(f"One-Hot-Vectors test FAILED: {e}")
        return False

In [105]:
assert test_one_hot_vectorization(test_corpus, vocab, vocab_index)

One-Hot-Vectors test PASSED


## Задание 2 (0.5 балла)
Реализовать Bag-of-Words

In [106]:
def bag_of_words_vectorization(text: str) -> Dict[str, int]:
    words = normalize_pretokenize_text(text)
    
    bow_dict = {} # Создаем словарь для подсчета частот
    
    for word in words: # Подсчитываем частоту каждого слова
        bow_dict[word] = bow_dict.get(word, 0) + 1
    
    return bow_dict
def test_bag_of_words_vectorization() -> bool:
    try:
        text = "the the quick brown brown brown"
        result = bag_of_words_vectorization(text)

        if not isinstance(result, dict):
            return False

        if result.get('the', 0) != 2:
            return False
        if result.get('quick', 0) != 1:
            return False
        if result.get('brown', 0) != 3:
            return False
        if result.get('nonexistent', 0) != 0:
            return False

        print("Bad-of-Words test PASSED")
        return True
    except Exception as e:
        print(f"Bag-of-Words test FAILED: {e}")
        return False

In [107]:
assert test_bag_of_words_vectorization()

Bad-of-Words test PASSED


## Задание 3 (0.5 балла)
Реализовать TF-IDF

In [108]:

# Подсчет idf
def compute_idf(corpus, vocab):
        idf_dict = {}
        total_docs = len(corpus)
      
        tokenized_corpus = [set(normalize_pretokenize_text(doc)) for doc in corpus]
        
        for i, word in enumerate(vocab):
            doc_count = 0
            for doc_tokens in tokenized_corpus:
                if word in doc_tokens:
                    doc_count += 1
           
            if doc_count == 0:
                idf_dict[word] = math.log(total_docs + 1)  
            else:
                idf_dict[word] = math.log((total_docs + 1) / (doc_count + 1))
        
        return idf_dict
# Пришлось оптимизировать и кэшировать данные, так как в продакшене этот метод работал очень ОЧЕНЬ медленно
def tf_idf_vectorization(text: str, corpus: List[str] = None, vocab: List[str] = None, vocab_index: Dict[str, int] = None) -> List[float]:
    
  
    try:
        # Быстрая проверка входных данных
        if not text or not corpus or not vocab:
            return [0.0] * len(vocab) if vocab else []
        
        # Кэшируем вычисление IDF 
        if not hasattr(tf_idf_vectorization, 'idf_cache'):
            tf_idf_vectorization.idf_cache = {}
        
        cache_key = f"{len(corpus)}_{len(vocab)}"
        if cache_key not in tf_idf_vectorization.idf_cache:
            tf_idf_vectorization.idf_cache[cache_key] = compute_idf(corpus, vocab)
        
        idf_dict = tf_idf_vectorization.idf_cache[cache_key]
        
        # TF-IDF вычисление
        vector = [0.0] * len(vocab)
        words = normalize_pretokenize_text(text)
        total_words = len(words)
        
        if total_words == 0:
            return vector
        
        word_freq = {}
        for word in words:
            word_freq[word] = word_freq.get(word, 0) + 1
        
        for word, index in vocab_index.items():
            if word in word_freq:
                tf = word_freq[word] / total_words
                idf = idf_dict.get(word, 0.0)
                vector[index] = tf * idf
        
        return vector
        
    except Exception as e:
        print(f"Ошибка в TF-IDF: {e}")
        return [0.0] * len(vocab) if vocab else []

    
def test_tf_idf_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "the quick brown"
        result = tf_idf_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("TF-IDF test PASSED")
        return True
    except Exception as e:
        print(f"TF-IDF test FAILED: {e}")
        return False

In [109]:
assert test_tf_idf_vectorization(test_corpus, vocab, vocab_index)

TF-IDF test PASSED


## Задание 4 (1 балл)
Реализовать Positive Pointwise Mutual Information (PPMI).  
https://en.wikipedia.org/wiki/Pointwise_mutual_information
$$PPMI(word, context) = max(0, PMI(word, context))$$
$$PMI(word, context) = log \frac{P(word, context)}{P(word) P(context)} = log \frac{N(word, context)|(word, context)|}{N(word) N(context)}$$
где $N(word, context)$ -- число вхождений слова $word$ в окно $context$ (размер окна -- гиперпараметр)

In [110]:
def ppmi_vectorization(
    text: str,
    corpus: List[str] = None,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None,
    window_size: int = 2
) -> List[float]:
    
    vocab_size = len(vocab)
    
    
    N_matrix = np.zeros((vocab_size, vocab_size), dtype=int) # Матрица совместной встречаемости
    word_counts = defaultdict(int)  # Счетчик частот слов
    total_pairs = 0  # Общее количество пар
    
    for doc in corpus: # Проходим по каждому документу в корпусе
        words = normalize_pretokenize_text(doc)
        
        for i, target_word in enumerate(words):
                
            target_idx = vocab_index[target_word]
            word_counts[target_word] += 1
            
            # Задаем начало и конец контекстного окна
            start_context_window = max(0, i - window_size)
            end_context_window = min(len(words), i + window_size + 1)
            
            for j in range(start_context_window, end_context_window):
                if j == i: # Пропускаем само слово
                    continue
                    
                context_word = words[j]
                if context_word in vocab_index:
                    context_idx = vocab_index[context_word]
                    N_matrix[target_idx][context_idx] += 1 # Увеличиваем счетчик в матрице совместимости
                    total_pairs += 1
    
    # Теперь обрабатываем входной текст
    text_words = normalize_pretokenize_text(text)
    vector = [0.0] * vocab_size
    
    results = [] # Здесь будем хранить PPMI векторы каждого слова
    
    for target_word in text_words: # Для каждого слова во входном тексте
        
        target_idx = vocab_index[target_word]
        word_vector = [0.0] * vocab_size
        
        for context_word, context_idx in vocab_index.items(): # Вычисляем PPMI для каждого слова словаря
            # Сколько раз target_word и context_word встречались вместе
            ij_together = N_matrix[target_idx][context_idx]
            
            if ij_together > 0:
                n_i = word_counts.get(target_word, 0)
                n_j = word_counts.get(context_word, 0)
                
                if n_i > 0 and n_j > 0:
                    pmi = math.log((ij_together * total_pairs) / (n_i * n_j))
                    ppmi = max(0.0, pmi)
                    word_vector[context_idx] = ppmi
        
        results.append(word_vector)
    
   
    if results:  
        for i in range(vocab_size):
            total = sum(word_vector[i] for word_vector in results)
            vector[i] = total / len(results)
    
    return vector 

def test_ppmi_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "quick brown fox"
        result = ppmi_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("PPMI test PASSED")
        return True
    except Exception as e:
        print(f"PPMI test FAILED: {e}")
        return False

In [111]:
assert test_ppmi_vectorization(test_corpus, vocab, vocab_index)

PPMI test PASSED


## Задание 5 (1 балл)
Реализовать получение эмбеддингов из fasttext и bert (для bert лучше использовать CLS токен)

In [112]:

import gensim.downloader as api # Использую gensim вместо fasttext

def get_fasttext_embeddings(text: str, model_path: str = None, model: any = None) -> List[np.ndarray]:

    if model is None:
        model = api.load('fasttext-wiki-news-subwords-300')
    words = text.lower().split()
    
    word_vectors = []  # Здесь хранми векторы каждого слова
    
    for word in words:
        try:
            vector = model.wv[word]
            word_vectors.append(vector)
            
        except KeyError: # Обрабатываем отсутствие слова в модели (ну а вдруг)
            
            vector_size = model.wv.vector_size  # Узнаем размерность векторов модели
            zero_vector = np.zeros(vector_size)  # Создаем вектор из нулей
            
            word_vectors.append(zero_vector)
    return word_vectors

In [113]:
def get_bert_embeddings(
    text: str,
    model_name: str = 'bert-base-uncased',
    pool_method: str = 'cls'
) -> np.ndarray:
    try:
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModel.from_pretrained(model_name)
    except Exception as e:
        tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
        model = AutoModel.from_pretrained('bert-base-uncased')
    
    inputs = tokenizer(
        text,
        return_tensors='pt',           # Возвращать PyTorch тензоры
        truncation=True,               # Обрезать длинные тексты
        max_length=512,                # Максимальная длина BERT
        padding=True,                  # Дополнять до одинаковой длины
        add_special_tokens=True        # Добавлять [CLS] и [SEP]
    )
    
    tokens = tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])
    model.eval()
    
    with torch.no_grad():  
        outputs = model(**inputs)
    
    last_hidden_state = outputs.last_hidden_state
    pooler_output = outputs.pooler_output
   
    if pool_method == 'cls':
        cls_embedding = pooler_output[0].numpy()
        result_vector = cls_embedding
        
    elif pool_method == 'mean':
        attention_mask = inputs['attention_mask']
        
        token_embeddings = last_hidden_state * attention_mask.unsqueeze(-1)
        sum_embeddings = torch.sum(token_embeddings, dim=1)
        sum_mask = torch.sum(attention_mask, dim=1, keepdim=True)
        
        mean_embeddings = sum_embeddings / sum_mask
        result_vector = mean_embeddings[0].numpy()  
        
    elif pool_method == 'max':
        attention_mask = inputs['attention_mask']
        token_embeddings = last_hidden_state * attention_mask.unsqueeze(-1)
        token_embeddings[token_embeddings == 0] = -float('inf')
        
        max_embeddings = torch.max(token_embeddings, dim=1)[0]
        result_vector = max_embeddings[0].numpy()

    return result_vector

## Задание 6 (1.5 балла)
Реализовать обучение так, чтобы можно было поверх эмбеддингов, реализованных в предыдущих заданиях, обучить какую-то модель (вероятно неглубокую, например, CatBoost) на задаче классификации текстов ([IMDB](https://huggingface.co/datasets/stanfordnlp/imdb)).

In [114]:
import numpy as np
from typing import List, Dict, Tuple, Any
from datasets import load_dataset
import datasets
from catboost import CatBoostClassifier
from sklearn.metrics import classification_report, accuracy_score, f1_score
from sklearn.model_selection import train_test_split, cross_val_score, KFold
import warnings
warnings.filterwarnings('ignore')

def vectorize_with_vocab(
    texts: List[str],
    labels: List[int],
    vocab: List[str],
    vocab_index: Dict[str, int],
    vectorizer_type: str = "bow"
) -> List[List[float]]:
    """
    Векторизует тексты используя заданный словарь
    """
    vectorized_data = []
    
    for i, text in enumerate(texts):
        try:
            if vectorizer_type == "one_hot":
                vector = one_hot_vectorization(text, vocab, vocab_index)
                
            elif vectorizer_type == "bow":
                bow_dict = bag_of_words_vectorization(text)
                vector = [bow_dict.get(word, 0) for word in vocab]  # Используем общий vocab
                
            elif vectorizer_type == "tfidf":
                vector = tf_idf_vectorization(text, texts, vocab, vocab_index)
                
            elif vectorizer_type == "ppmi":
                vector = ppmi_vectorization(text, texts, vocab, vocab_index)
                
            elif vectorizer_type == "fasttext":
                embeddings = get_fasttext_embeddings(text)
                if embeddings and len(embeddings) > 0:
                    avg_embedding = np.mean(embeddings, axis=0)
                    vector = avg_embedding.tolist()
                else:
                    vector = [0.0] * 300
                    
            elif vectorizer_type == "bert":
                embedding = get_bert_embeddings(text)
                vector = embedding.tolist()
                
            else:
                raise ValueError(f"Unknown vectorizer type: {vectorizer_type}")
            
            vectorized_data.append(vector)
            
        except Exception as e:
            print(f"   Ошибка при векторизации текста {i}: {e}")
            vector_size = len(vocab) if vectorizer_type in ["one_hot", "bow", "tfidf", "ppmi"] else 300
            zero_vector = [0.0] * vector_size
            vectorized_data.append(zero_vector)
    
    return vectorized_data


def load_dataset(
    dataset_name: str = "imdb",
    split: str = "train", 
    sample_size: int = 2500
) -> Tuple[List[str], List[int]]:
    dataset = datasets.load_dataset(dataset_name, split=split)
    
    if sample_size:
        # Разделяем датасет по классам
        negative_examples = [item for item in dataset if item['label'] == 0]
        positive_examples = [item for item in dataset if item['label'] == 1]
        
        # Берем равное количество из каждого класса
        half_size = sample_size // 2
        selected_negative = negative_examples[:half_size]
        selected_positive = positive_examples[:half_size]
        
        # Объединяем и перемешиваем
        balanced_dataset = selected_negative + selected_positive
        import random
        random.shuffle(balanced_dataset)
        
        texts = [item['text'] for item in balanced_dataset]
        labels = [item['label'] for item in balanced_dataset]
    else:
        # Берем весь датасет
        texts = [item['text'] for item in dataset]
        labels = [item['label'] for item in dataset]
    
    return texts, labels

def vectorize_dataset(
    dataset_name: str = "imdb",
    vectorizer_type: str = "bow",
    split: str = "train",
    sample_size: int = 2500,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None
) -> Tuple[Any, List, List]:
   
    texts, labels = load_dataset(dataset_name, split, sample_size)

    if vocab is None:
        vocab, vocab_index = build_vocab(texts)
    
    vectorized_data = vectorize_with_vocab(texts, labels, vocab, vocab_index, vectorizer_type)
    
    return vocab, vocab_index, vectorized_data, labels


In [119]:
def train(
    embeddings_method="bow",
    test_size=0.2,
    val_size=0.2,
    cv_folds=1,
    sample_size=2500
):
    print(f"Обучение с методом {embeddings_method.upper()}")
  
    vocab, vocab_index, X_train_full, y_train_full = vectorize_dataset(
        "imdb", embeddings_method, "train", sample_size
    )
    
  
    _, _, X_test, y_test = vectorize_dataset(
        "imdb", embeddings_method, "test", sample_size, vocab, vocab_index
    )
    
    X_train_full = np.array(X_train_full)
    y_train_full = np.array(y_train_full)
    X_test = np.array(X_test)
    y_test = np.array(y_test)
    
    X_train, X_val, y_train, y_val = train_test_split(
        X_train_full, y_train_full, 
        test_size=val_size, 
        random_state=42,
        stratify=y_train_full
    )
    
    model = CatBoostClassifier(
        iterations=500,
        learning_rate=0.05,
        depth=4,
        loss_function='Logloss',
        random_seed=42,
        verbose=100,
        early_stopping_rounds=20
    )
    
    try:
        kf = KFold(n_splits=cv_folds, shuffle=True, random_state=42)
        cv_scores = cross_val_score(
            model, X_train, y_train, 
            cv=kf, 
            scoring='accuracy'
        )
        
    except Exception as e:
        cv_scores = np.array([0.0])
    
    model.fit(
        X_train, y_train,
        eval_set=(X_val, y_val),
    )
    
    y_val_pred = model.predict(X_val)
    val_accuracy = accuracy_score(y_val, y_val_pred)
    val_f1 = f1_score(y_val, y_val_pred, average='weighted')
    
    print(f"\nРезультаты на Validation Set:")
    print(f"Accuracy: {val_accuracy:.4f}")
    print(f"F1-Score: {val_f1:.4f}")
    
    y_test_pred = model.predict(X_test)
    test_accuracy = accuracy_score(y_test, y_test_pred)
    test_f1 = f1_score(y_test, y_test_pred, average='weighted')

    print(f"Результаты на Test Set:")
    print(f"Accuracy: {test_accuracy:.4f}")
    print(f"F1-Score: {test_f1:.4f}")
    
    results = {
        'method': embeddings_method,
        'cv_mean_accuracy': cv_scores.mean() if len(cv_scores) > 0 else 0.0,
        'cv_std': cv_scores.std() if len(cv_scores) > 0 else 0.0,
        'val_accuracy': val_accuracy,
        'val_f1': val_f1,
        'test_accuracy': test_accuracy,
        'test_f1': test_f1,
        'feature_dim': X_train.shape[1],
        'train_size': X_train.shape[0],
        'vocab_size': len(vocab)
    }
    
    return model, results

In [124]:
def compare_succesfull_methods(): # К сожанелию не получилось добиться быстрых результатов для последних трех методов
    
    methods = ["bow", "one_hot", "tfidf"]  #["ppmi", "fasttext", "bert"]
    
    all_results = []
    
    for method in methods:
        try:
            
            model, results = train(embeddings_method=method)
            all_results.append(results)
            
        except Exception as e:
            continue
    
    
    for result in all_results:
        print(f"{result['method']:<12} {result['test_accuracy']:<14.4f} "
              f"{result['test_f1']:<12.4f} {result['cv_mean_accuracy']:<12.4f} "
              f"{result['feature_dim']:<10}")
    best_result = max(all_results, key=lambda x: x['test_accuracy'])
    print(f"\n лучший метод {best_result['method']}")
    print(f"Test Accuracy: {best_result['test_accuracy']:.4f}")
    print(f"Test F1-Score: {best_result['test_f1']:.4f}")


In [125]:
compare_succesfull_methods()

Обучение с методом BOW
0:	learn: 0.6836860	test: 0.6853198	best: 0.6853198 (0)	total: 54ms	remaining: 27s
100:	learn: 0.4615247	test: 0.5137950	best: 0.5137950 (100)	total: 4.76s	remaining: 18.8s
200:	learn: 0.3631448	test: 0.4518181	best: 0.4518181 (200)	total: 9.56s	remaining: 14.2s
300:	learn: 0.2889382	test: 0.4160813	best: 0.4160813 (300)	total: 14.3s	remaining: 9.48s
400:	learn: 0.2392126	test: 0.3995551	best: 0.3995394 (396)	total: 19.1s	remaining: 4.7s
499:	learn: 0.2043874	test: 0.3889598	best: 0.3889598 (499)	total: 23.5s	remaining: 0us

bestTest = 0.388959803
bestIteration = 499


Результаты на Validation Set:
Accuracy: 0.8540
F1-Score: 0.8537
Результаты на Test Set:
Accuracy: 0.8224
F1-Score: 0.8224
Обучение с методом ONE_HOT
0:	learn: 0.6856473	test: 0.6879025	best: 0.6879025 (0)	total: 65ms	remaining: 32.4s
100:	learn: 0.4672740	test: 0.5044890	best: 0.5044890 (100)	total: 6.71s	remaining: 26.5s
200:	learn: 0.3683094	test: 0.4449037	best: 0.4449037 (200)	total: 13.4s	rema