# Домашнее задание "NLP. Часть 1"

In [None]:
import math
import re
import os
import random
import json
from collections import Counter, defaultdict
from typing import List, Dict, Tuple, Any

import torch
import numpy as np
import datasets
import fasttext
import fasttext.util
from transformers import BertTokenizer, BertModel

In [None]:
def seed_everything(seed: int):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(42)

In [None]:
def normalize_pretokenize_text(text: str) -> List[str]:
    text = text.lower()
    words = re.findall(r'\b\w+\b', text)
    return words

In [None]:
# This block is for tests only
test_corpus = [
    "the quick brown fox jumps over the lazy dog",
    "never jump over the lazy dog quickly",
    "brown foxes are quick and dogs are lazy"
]

def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
    all_words = []
    for text in texts:
        words = normalize_pretokenize_text(text)
        all_words.extend(words)
    vocab = sorted(set(all_words))
    vocab_index = {word: idx for idx, word in enumerate(vocab)}
    return vocab, vocab_index

vocab, vocab_index = build_vocab(test_corpus)

## Задание 1 (0.5 балла)
Реализовать One-Hot векторизацию текстов

In [None]:
def one_hot_vectorization(
    text: str,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None
) -> List[List[int]]:
    words = normalize_pretokenize_text(text)
    if vocab is None and vocab_index is None:
        vocab = sorted(set(words))
        vocab_index = {word: idx for idx, word in enumerate(vocab)}
    elif vocab_index is None:
        vocab_index = {word: idx for idx, word in enumerate(vocab)}
    return [list(map(int, np.eye(1, len(vocab_index), vocab_index[word]).flatten())) for word in words]
    

def test_one_hot_vectorization(
    vocab: List[str],
    vocab_index: Dict[str, int]
) -> bool:
    try:
        text = "the quick brown fox"
        result = one_hot_vectorization(text, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result[0]) != expected_length:
            return False

        words_in_text = normalize_pretokenize_text(text)
        for i, word in enumerate(words_in_text):
            if word in vocab_index:
                idx = vocab_index[word]
                if result[i][idx] != 1:
                    return False

        print("One-Hot-Vectors test PASSED")

        return True
    except Exception as e:
        print(f"One-Hot-Vectors test FAILED: {e}")
        return False

In [None]:
assert test_one_hot_vectorization(vocab, vocab_index)

## Задание 2 (0.5 балла)
Реализовать Bag-of-Words

In [None]:
def bag_of_words_vectorization(text: str) -> Dict[str, int]:
    words = normalize_pretokenize_text(text)
    return {word : words.count(word) for word in words}


def test_bag_of_words_vectorization() -> bool:
    try:
        text = "the the quick brown brown brown"
        result = bag_of_words_vectorization(text)

        if not isinstance(result, dict):
            return False

        if result.get('the', 0) != 2:
            return False
        if result.get('quick', 0) != 1:
            return False
        if result.get('brown', 0) != 3:
            return False
        if result.get('nonexistent', 0) != 0:
            return False

        print("Bad-of-Words test PASSED")
        return True
    except Exception as e:
        print(f"Bag-of-Words test FAILED: {e}")
        return False

In [None]:
assert test_bag_of_words_vectorization()

## Задание 3 (0.5 балла)
Реализовать TF-IDF

In [None]:
def tf_idf_vectorization(text: str, corpus: List[str] = None, vocab: List[str] = None, vocab_index: Dict[str, int] = None) -> List[float]:
    # не знаю зачем нужен vocab_index в здесь ибо по нему просто не найти tf-idf
    words = normalize_pretokenize_text(text)
    if vocab is None:
        vocab = sorted(set(words))
    tf = {word: words.count(word) / len(words) for word in set(words)}
    total_docs = len(corpus)
    idf = {
        word: np.log(total_docs / (sum(1 for doc in corpus if word in normalize_pretokenize_text(doc)) + 1)) + 1 
        for word in vocab
    }
    return [tf.get(word, 0) * idf.get(word, 0) for word in vocab]

def test_tf_idf_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "the quick brown"
        result = tf_idf_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("TF-IDF test PASSED")
        return True
    except Exception as e:
        print(f"TF-IDF test FAILED: {e}")
        return False

In [None]:
assert test_tf_idf_vectorization(test_corpus, vocab, vocab_index)

## Задание 4 (1 балл)
Реализовать Positive Pointwise Mutual Information (PPMI).  
https://en.wikipedia.org/wiki/Pointwise_mutual_information
$$PPMI(word, context) = max(0, PMI(word, context))$$
$$PMI(word, context) = log \frac{P(word, context)}{P(word) P(context)} = log \frac{N(word, context)|(word, context)|}{N(word) N(context)}$$
где $N(word, context)$ -- число вхождений слова $word$ в окно $context$ (размер окна -- гиперпараметр)

In [None]:
def ppmi_vectorization(
    text: str,
    corpus: List[str] = None,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None,
    window_size: int = 2
) -> List[float]:
    words = normalize_pretokenize_text(text)
    if vocab_index is None:
        vocab_index = {word: idx for idx, word in enumerate(vocab)}
    co_occur = defaultdict(lambda: defaultdict(int))
    word_freq = defaultdict(int)
    for doc in corpus:
        doc_words = normalize_pretokenize_text(doc)
        for i, target in enumerate(doc_words):
            word_freq[target] += 1
            for j in range(max(0, i - window_size), min(len(doc_words), i + window_size + 1)):
                if i != j:
                    context = doc_words[j]
                    co_occur[target][context] += 1
    total_pairs = sum(sum(context.values()) for context in co_occur.values())
    vector = [0.0] * len(vocab)
    for word in vocab:
        if word not in word_freq:
            continue  
        p_word = word_freq[word] / len(corpus)
        ppmi_values = []
        for context_word in words:
            if context_word not in word_freq:
                continue
            p_context = word_freq[context_word] / len(corpus)
            p_joint = co_occur[word][context_word] / total_pairs
            if p_joint > 0:
                pmi = math.log(p_joint / (p_word * p_context))
                ppmi_values.append(max(0, pmi))
        if ppmi_values:
            vector[vocab_index[word]] = sum(ppmi_values) / len(ppmi_values)
    return vector

def test_ppmi_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "quick brown fox"
        result = ppmi_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("PPMI test PASSED")
        return True
    except Exception as e:
        print(f"PPMI test FAILED: {e}")
        return False

In [None]:
assert test_ppmi_vectorization(test_corpus, vocab, vocab_index)


## Задание 5 (1 балл)
Реализовать получение эмбеддингов из fasttext и bert (для bert лучше использовать CLS токен)

In [None]:
def get_fasttext_embeddings(text: str, model_path: str = None, model: any = None) -> List[np.ndarray]:
    if model is None:
        if model_path is None:
            raise ValueError("Ни импорта ни пути")
        model = fasttext.load_model(model_path)
    words = text.split()
    embeddings = []
    for word in words:
        if word in model.words:
            embeddings.append(model[word])
    return embeddings

In [None]:
def get_bert_embeddings(
    text: str,
    model_name: str = 'bert-base-uncased',
    pool_method: str = 'cls'
) -> np.ndarray:
    tokenizer = BertTokenizer.from_pretrained(model_name)
    model = BertModel.from_pretrained(model_name)
    inputs = tokenizer(text, return_tensors='pt', truncation=True, padding=True)
    with torch.no_grad():
        outputs = model(**inputs)
    last_hidden_states = outputs.last_hidden_state
    return last_hidden_states[:, 0, :].numpy().squeeze()

## Задание 6 (1.5 балла)
Реализовать обучение так, чтобы можно было поверх эмбеддингов, реализованных в предыдущих заданиях, обучить какую-то модель (вероятно неглубокую, например, CatBoost) на задаче классификации текстов ([IMDB](https://huggingface.co/datasets/stanfordnlp/imdb)).

In [None]:
def vectorize_dataset(
    dataset_name: str = "imdb",
    vectorizer_type: str = "bow",
    split: str = "train",
    sample_size: int = 2500
) -> Tuple[Any, List, List]:

    dataset = datasets.load_dataset(dataset_name, split=split)

    if sample_size:
        dataset = dataset.select(range(min(sample_size, len(dataset))))

    texts = [item['text'] for item in dataset if 'text' in item and item['text'].strip()]
    labels = [item['label'] for item in dataset if 'label' in item]

    def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
        all_words = []
        for text in texts:
            words = normalize_pretokenize_text(text)
            all_words.extend(words)
        vocab = sorted(set(all_words))
        vocab_index = {word: idx for idx, word in enumerate(vocab)}
        return vocab, vocab_index

    vocab, vocab_index = build_vocab(texts)

    vectorized_data = []
    for text in texts:
        if vectorizer_type == "one_hot":
            vectorized_data.append(one_hot_vectorization(text, vocab, vocab_index))
        elif vectorizer_type == "bow":
            bow_dict = bag_of_words_vectorization(text)
            vector = [bow_dict.get(word, 0) for word in vocab]
            vectorized_data.append(vector)
        elif vectorizer_type == "tfidf":
            vectorized_data.append(tf_idf_vectorization(text, texts, vocab, vocab_index))
        elif vectorizer_type == "ppmi":
            vectorized_data.append(ppmi_vectorization(text, texts, vocab, vocab_index))
        elif vectorizer_type == "fasttext":
            embeddings = get_fasttext_embeddings(text)
            if embeddings:
                avg_embedding = np.mean(embeddings, axis=0)
                vectorized_data.append(avg_embedding.tolist())
            else:
                vectorized_data.append([0] * 300)
        elif vectorizer_type == "bert":
            embedding = get_bert_embeddings(text)
            vectorized_data.append(embedding.tolist())
        else:
            raise ValueError(f"Unknown vectorizer type: {vectorizer_type}")
    print(vectorized_data, labels)
    return vectorized_data, labels

In [None]:
from catboost import CatBoostClassifier
from sklearn.metrics import classification_report, accuracy_score, f1_score
from sklearn.model_selection import train_test_split, cross_val_score, KFold

def train(
    embeddings_method="bow",
    test_size=0.2,
    val_size=0.2,
    cv_folds=5
):
    X_train, y_train = vectorize_dataset("imdb", embeddings_method, "train", sample_size=2500)
    X_test, y_test = vectorize_dataset("imdb", embeddings_method, "test", sample_size=500)
    X_train = np.array(X_train)
    X_test = np.array(X_test)
    y_train = np.array(y_train)
    y_test = np.array(y_test)
    X_train, X_val, y_train, y_val = train_test_split(
        X_train, y_train, 
        test_size=val_size, 
        random_state=42,
        stratify=y_train
    )
    model = CatBoostClassifier(iterations=1000, learning_rate=0.05, depth=5, random_seed=42, verbose=False)
    model.fit(X_train, y_train, eval_set=(X_val, y_val), early_stopping_rounds=50, verbose=True)
    kf = KFold(n_splits=cv_folds, shuffle=True, random_state=42)
    cv_scores = cross_val_score(model, X_train, y_train, cv=kf, scoring='accuracy')
    print(f"CV Accuracy: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    
    print(f"\nMethod: {embeddings_method}")
    print(f"Test Accuracy: {accuracy:.4f}")
    print(f"Test F1-score: {f1:.4f}")
    print("\nClassification Report:")
    print(classification_report(y_test, y_pred))

In [None]:
for embeddings_method in ["bow", "one_hot", "tfidf", "ppmi", "fasttext", "bert"]:
    train(embeddings_method=embeddings_method)