In [1]:
# Данный ноутбук использовал окружение google-colabф
%pip install catboost fasttext -q

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/73.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m73.4/73.4 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m99.2/99.2 MB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for fasttext (pyproject.toml) ... [?25l[?25hdone


# Домашнее задание "NLP. Часть 1"

In [2]:
import math
import re
import os
import random
import json
from collections import Counter, defaultdict
from typing import List, Dict, Tuple, Any

import torch
import numpy as np
import datasets
import fasttext
import fasttext.util
from transformers import BertTokenizer, BertModel

In [3]:
def seed_everything(seed: int):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(42)

In [4]:
def normalize_pretokenize_text(text: str) -> List[str]:
    text = text.lower()
    words = re.findall(r'\b\w+\b', text)
    return words

In [5]:
# This block is for tests only
test_corpus = [
    "the quick brown fox jumps over the lazy dog",
    "never jump over the lazy dog quickly",
    "brown foxes are quick and dogs are lazy"
]

def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
    all_words = []
    for text in texts:
        words = normalize_pretokenize_text(text)
        all_words.extend(words)
    vocab = sorted(set(all_words))
    vocab_index = {word: idx for idx, word in enumerate(vocab)}
    return vocab, vocab_index

vocab, vocab_index = build_vocab(test_corpus)

## Задание 1 (0.5 балла)
Реализовать One-Hot векторизацию текстов

In [6]:
def one_hot_vectorization(
    text: str,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None
) -> List[List[int]]:

    if vocab is None or vocab_index is None:
        raise ValueError("vocab/vacab_index = None")

    words = normalize_pretokenize_text(text)
    one_hot = []
    for word in words:
        vector = [0] * len(vocab)
        if word in vocab_index:
            idx = vocab_index[word]
            vector[idx] = 1
        one_hot.append(vector)
    return one_hot

def test_one_hot_vectorization(
    vocab: List[str],
    vocab_index: Dict[str, int]
) -> bool:
    try:
        text = "the quick brown fox"
        result = one_hot_vectorization(text, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result[0]) != expected_length:
            return False

        words_in_text = normalize_pretokenize_text(text)
        for i, word in enumerate(words_in_text):
            if word in vocab_index:
                idx = vocab_index[word]
                if result[i][idx] != 1:
                    return False

        print("One-Hot-Vectors test PASSED")

        return True
    except Exception as e:
        print(f"One-Hot-Vectors test FAILED: {e}")
        return False

In [7]:
assert test_one_hot_vectorization(vocab, vocab_index)

One-Hot-Vectors test PASSED


## Задание 2 (0.5 балла)
Реализовать Bag-of-Words

In [8]:
def bag_of_words_vectorization(text: str) -> Dict[str, int]:
    words = normalize_pretokenize_text(text)
    word_counts = {}
    for word in words:
        if word in word_counts:
            word_counts[word] += 1
        else:
            word_counts[word] = 1
    return word_counts

def test_bag_of_words_vectorization() -> bool:
    try:
        text = "the the quick brown brown brown"
        result = bag_of_words_vectorization(text)

        if not isinstance(result, dict):
            return False

        if result.get('the', 0) != 2:
            return False
        if result.get('quick', 0) != 1:
            return False
        if result.get('brown', 0) != 3:
            return False
        if result.get('nonexistent', 0) != 0:
            return False

        print("Bad-of-Words test PASSED")
        return True
    except Exception as e:
        print(f"Bag-of-Words test FAILED: {e}")
        return False

In [9]:
assert test_bag_of_words_vectorization()

Bad-of-Words test PASSED


## Задание 3 (0.5 балла)
Реализовать TF-IDF

In [10]:
def tf_idf_vectorization(text: str, corpus: List[str] = None, vocab: List[str] = None, vocab_index: Dict[str, int] = None) -> List[float]:

    if corpus is None or vocab is None or vocab_index is None:
        raise ValueError("_None_")

    tfidf_vector = [0.0] * len(vocab)
    words = normalize_pretokenize_text(text)

    tf_counts = {}
    for word in words:
        if word in tf_counts:
            tf_counts[word] += 1
        else:
            tf_counts[word] = 1

    doc_freq = {}
    for word_in_vocab in vocab:
        doc_freq[word_in_vocab] = 0

    num_documents = len(corpus)
    for one_document_from_corpus in corpus:
        unique_words_in_doc = set(normalize_pretokenize_text(one_document_from_corpus))
        for word in unique_words_in_doc:
            if word in doc_freq:
                doc_freq[word] += 1

    idf_scores = {}
    for word_in_vocab in vocab:
        df_t = doc_freq[word_in_vocab]
        idf_scores[word_in_vocab] = math.log(num_documents / (df_t + 1))

    for word_from_our_text, tf_value in tf_counts.items():
        if word_from_our_text in vocab_index:
            word_idx = vocab_index[word_from_our_text]
            idf_value = idf_scores[word_from_our_text]
            tfidf_vector[word_idx] = tf_value * idf_value

    return tfidf_vector

def test_tf_idf_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "the quick brown"
        result = tf_idf_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("TF-IDF test PASSED")
        return True
    except Exception as e:
        print(f"TF-IDF test FAILED: {e}")
        return False

In [11]:
assert test_tf_idf_vectorization(test_corpus, vocab, vocab_index)

TF-IDF test PASSED


## Задание 4 (1 балл)
Реализовать Positive Pointwise Mutual Information (PPMI).  
https://en.wikipedia.org/wiki/Pointwise_mutual_information
$$PPMI(word, context) = max(0, PMI(word, context))$$
$$PMI(word, context) = log \frac{P(word, context)}{P(word) P(context)} = log \frac{N(word, context)|(word, context)|}{N(word) N(context)}$$
где $N(word, context)$ -- число вхождений слова $word$ в окно $context$ (размер окна -- гиперпараметр)

In [12]:
def ppmi_vectorization(
    text: str,
    corpus: List[str] = None,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None,
    window_size: int = 2
) -> List[float]:

    if corpus is None or vocab is None or vocab_index is None:
        raise ValueError("corpus, vocab, and vocab_index must be provided")

    vocab_size = len(vocab)
    ppmi_matrix = np.zeros((vocab_size, vocab_size), dtype=np.float32)

    word_freq = Counter() # N(word) - сколько раз слово встречается в корпусе
    pair_freq = Counter() # N(word, context) - сколько раз пара встречается в окне
    total_pairs = 0       # |(word, context)| - общее количество всех пар в окнах

    for doc in corpus:
        words = normalize_pretokenize_text(doc)

        for i, word in enumerate(words):
            word_freq[word] += 1 # частота каждого слова

            start_window = max(0, i - window_size)
            end_window = min(len(words), i + window_size + 1)

            for j in range(start_window, end_window):
                if i != j:
                    context_word = words[j]
                    pair_freq[(word, context_word)] += 1
                    total_pairs += 1

    epsilon = 1e-10

    for target_word_str in vocab:
        target_idx = vocab_index[target_word_str]
        N_word = word_freq[target_word_str]

        for context_word_str in vocab:
            context_idx = vocab_index[context_word_str]
            N_context = word_freq[context_word_str]

            N_word_context = pair_freq[(target_word_str, context_word_str)]

            pmi = 0.0
            if N_word_context > 0 and N_word > 0 and N_context > 0 and total_pairs > 0:
                numerator = N_word_context * total_pairs
                denominator = N_word * N_context

                if denominator > epsilon:
                    pmi = math.log(numerator / denominator)

            ppmi_matrix[target_idx, context_idx] = max(0.0, pmi)

    words = normalize_pretokenize_text(text)
    text_ppmi_vector = np.zeros(vocab_size, dtype=np.float32)
    word_count_in_input_text_in_vocab = 0

    for word in words:
        if word in vocab_index:
            word_idx = vocab_index[word]
            text_ppmi_vector += ppmi_matrix[word_idx, :]
            word_count_in_input_text_in_vocab += 1

    if word_count_in_input_text_in_vocab > 0:
        text_ppmi_vector /= word_count_in_input_text_in_vocab

    return text_ppmi_vector.tolist()

def test_ppmi_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "quick brown fox"
        result = ppmi_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("PPMI test PASSED")
        return True
    except Exception as e:
        print(f"PPMI test FAILED: {e}")
        return False

In [13]:
assert test_ppmi_vectorization(test_corpus, vocab, vocab_index)

PPMI test PASSED


## Задание 5 (1 балл)
Реализовать получение эмбеддингов из fasttext и bert (для bert лучше использовать CLS токен)

In [None]:
def get_fasttext_embeddings(text: str, model: any) -> List[np.ndarray]:

    if model is None:
        raise ValueError("Передай модель в фукнцию")

    words = normalize_pretokenize_text(text)

    embeddings = []

    for word in words:
        word_vector = model.get_word_vector(word)
        embeddings.append(word_vector)

    return embeddings

In [None]:
def get_bert_embeddings(
    text: str,
    bert_tokenizer: BertTokenizer,
    bert_model: BertModel
) -> np.ndarray:

    if bert_tokenizer is None or bert_model is None:
        raise ValueError("Передай модель в фукнцию и токенизатор")

    inputs = bert_tokenizer(text, return_tensors='pt', padding=True, truncation=True)

    with torch.no_grad():
        outputs = bert_model(**inputs)

    cls_embedding = outputs.last_hidden_state[:, 0, :]

    return cls_embedding.cpu().numpy()[0]

In [None]:
fasttext.util.download_model('en', if_exists='ignore')
model_fast = fasttext.load_model('cc.en.300.bin')

bert_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertModel.from_pretrained('bert-base-uncased')

## Задание 6 (1.5 балла)
Реализовать обучение так, чтобы можно было поверх эмбеддингов, реализованных в предыдущих заданиях, обучить какую-то модель (вероятно неглубокую, например, CatBoost) на задаче классификации текстов ([IMDB](https://huggingface.co/datasets/stanfordnlp/imdb)).

In [None]:
def vectorize_dataset(
    dataset_name: str = "imdb",
    vectorizer_type: str = "bow",
    split: str = "train",
    sample_size: int = 2500,
    ft_model: Any = None,  # модели FastText
    bert_tokenizer: Any = None, # для BERT токенизатора
    bert_model: Any = None      # для BERT модели
) -> Tuple[List[str], List[Any], List[int]]:

    dataset = datasets.load_dataset(dataset_name, split=split)

    if sample_size:
        dataset = dataset.select(range(min(sample_size, len(dataset))))

    texts = [item['text'] for item in dataset if 'text' in item and item['text'].strip()]
    labels = [item['label'] for item in dataset if 'label' in item]

    def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
        all_words = []
        for text in texts:
            words = normalize_pretokenize_text(text)
            all_words.extend(words)
        vocab = sorted(set(all_words))
        vocab_index = {word: idx for idx, word in enumerate(vocab)}
        return vocab, vocab_index

    vocab, vocab_index = build_vocab(texts)

    vectorized_data = []
    for text in texts:
        if vectorizer_type == "one_hot":
            vectorized_data.append(one_hot_vectorization(text, vocab, vocab_index))
        elif vectorizer_type == "bow":
            bow_dict = bag_of_words_vectorization(text)
            vector = [bow_dict.get(word, 0) for word in vocab]
            vectorized_data.append(vector)
        elif vectorizer_type == "tfidf":
            vectorized_data.append(tf_idf_vectorization(text, texts, vocab, vocab_index))
        elif vectorizer_type == "ppmi":
            vectorized_data.append(ppmi_vectorization(text, texts, vocab, vocab_index))
        elif vectorizer_type == "fasttext":
            embeddings = get_fasttext_embeddings(text, ft_model)
            if embeddings:
                avg_embedding = np.mean(embeddings, axis=0)
                vectorized_data.append(avg_embedding.tolist())
            else:
                vectorized_data.append([0] * 300)
        elif vectorizer_type == "bert":
            embedding = get_bert_embeddings(text, bert_tokenizer, bert_model)
            vectorized_data.append(embedding.tolist())
        else:
            raise ValueError(f"Unknown vectorizer type: {vectorizer_type}")

    return vocab, vectorized_data, labels

In [None]:
from catboost import CatBoostClassifier
from sklearn.metrics import classification_report, accuracy_score, f1_score
from sklearn.model_selection import train_test_split, cross_val_score, KFold

def train(
    embeddings_method="bow",
    test_size=0.2,
    val_size=0.2,
    cv_folds=5
):
    vocab, X, y = vectorize_dataset("imdb", embeddings_method, "train")
    _, X_test, y_test = vectorize_dataset("imdb", embeddings_method, "test")

    # Your code here

In [None]:
for embeddings_method in ["bow", "one_hot", "tfidf", "ppmi", "fasttext", "bert"]:
    train(embeddings_method=embeddings_method)