In [1]:
# Данный ноутбук использовал окружение google-colab
%pip install catboost fasttext -q

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/73.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m73.4/73.4 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m99.2/99.2 MB[0m [31m7.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for fasttext (pyproject.toml) ... [?25l[?25hdone


# Домашнее задание "NLP. Часть 1"

In [2]:
import math
import re
import os
import random
import json
from collections import Counter, defaultdict
from typing import List, Dict, Tuple, Any

import torch
import numpy as np
import datasets
import fasttext
import fasttext.util
from transformers import BertTokenizer, BertModel

from catboost import CatBoostClassifier
from sklearn.metrics import classification_report, accuracy_score, f1_score
from sklearn.model_selection import train_test_split, cross_val_score, KFold

In [3]:
def seed_everything(seed: int):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(42)

In [4]:
def normalize_pretokenize_text(text: str) -> List[str]:
    text = text.lower()
    words = re.findall(r'\b\w+\b', text)
    return words

In [5]:
# This block is for tests only
test_corpus = [
    "the quick brown fox jumps over the lazy dog",
    "never jump over the lazy dog quickly",
    "brown foxes are quick and dogs are lazy"
]

def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
    all_words = []
    for text in texts:
        words = normalize_pretokenize_text(text)
        all_words.extend(words)
    vocab = ['<UNK>'] + sorted(set(all_words))
    vocab_index = {word: idx for idx, word in enumerate(vocab)}
    return vocab, vocab_index

vocab, vocab_index = build_vocab(test_corpus)

## Задание 1 (0.5 балла)
Реализовать One-Hot векторизацию текстов

In [6]:
def one_hot_vectorization(
    text: str,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None
) -> List[List[int]]:
    tokens = normalize_pretokenize_text(text)
    if vocab is None:
        vocab = sorted(set(tokens))
    if vocab_index is None:
        vocab_index = {word: idx for idx, word in enumerate(vocab)}

    vocab_size = len(vocab)
    tokens_index = [vocab_index.get(token, 0) for token in tokens]
    one_hot_vectors = np.eye(vocab_size, dtype=int)
    return one_hot_vectors[tokens_index].tolist()

def test_one_hot_vectorization(
    vocab: List[str],
    vocab_index: Dict[str, int]
) -> bool:
    try:
        text = "the quick brown fox"
        result = one_hot_vectorization(text, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result[0]) != expected_length:
            return False

        words_in_text = normalize_pretokenize_text(text)
        for i, word in enumerate(words_in_text):
            if word in vocab_index:
                idx = vocab_index[word]
                if result[i][idx] != 1:
                    return False

        print("One-Hot-Vectors test PASSED")

        return True
    except Exception as e:
        print(f"One-Hot-Vectors test FAILED: {e}")
        return False

In [7]:
assert test_one_hot_vectorization(vocab, vocab_index)

One-Hot-Vectors test PASSED


## Задание 2 (0.5 балла)
Реализовать Bag-of-Words

In [8]:
def bag_of_words_vectorization(text: str) -> Dict[str, int]:
    tokens = normalize_pretokenize_text(text)

    counter = {}
    for token in tokens:
        counter[token] = counter.get(token, 0) + 1

    return counter

def test_bag_of_words_vectorization() -> bool:
    try:
        text = "the the quick brown brown brown"
        result = bag_of_words_vectorization(text)

        if not isinstance(result, dict):
            return False

        if result.get('the', 0) != 2:
            return False
        if result.get('quick', 0) != 1:
            return False
        if result.get('brown', 0) != 3:
            return False
        if result.get('nonexistent', 0) != 0:
            return False

        print("Bad-of-Words test PASSED")
        return True
    except Exception as e:
        print(f"Bag-of-Words test FAILED: {e}")
        return False

In [9]:
assert test_bag_of_words_vectorization()

Bad-of-Words test PASSED


## Задание 3 (0.5 балла)
Реализовать TF-IDF

In [10]:
def tf_idf_vectorization(text: str, corpus: List[str] = None, vocab: List[str] = None, vocab_index: Dict[str, int] = None) -> List[float]:
    tokens = normalize_pretokenize_text(text)

    if corpus is None:
        corpus = [text]
    if vocab is None:
        vocab, _ = build_vocab(corpus)
    if vocab_index is None:
        vocab_index = {word: idx for idx, word in enumerate(vocab)}

    vocab_size = len(vocab)

    token_counts = Counter(tokens)
    count_tokens = len(tokens)
    tf_vector = np.array([token_counts.get(token, 0) / count_tokens for token in vocab])

    num_docs = len(corpus)
    doc_frequency = np.zeros(vocab_size)

    for doc in corpus:
        doc_tokens = set(normalize_pretokenize_text(doc))
        for i, token in enumerate(vocab):
            if token in doc_tokens:
                doc_frequency[i] += 1

    idf_vector = np.log(num_docs / (doc_frequency + 1))
    tf_idf_vector = tf_vector * idf_vector

    return tf_idf_vector.tolist()

def test_tf_idf_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "the quick brown"
        result = tf_idf_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("TF-IDF test PASSED")
        return True
    except Exception as e:
        print(f"TF-IDF test FAILED: {e}")
        return False

In [11]:
assert test_tf_idf_vectorization(test_corpus, vocab, vocab_index)

TF-IDF test PASSED


## Задание 4 (1 балл)
Реализовать Positive Pointwise Mutual Information (PPMI).  
https://en.wikipedia.org/wiki/Pointwise_mutual_information
$$PPMI(word, context) = max(0, PMI(word, context))$$
$$PMI(word, context) = log \frac{P(word, context)}{P(word) P(context)} = log \frac{N(word, context)|(word, context)|}{N(word) N(context)}$$
где $N(word, context)$ -- число вхождений слова $word$ в окно $context$ (размер окна -- гиперпараметр)

In [12]:
def ppmi_vectorization(
    text: str,
    corpus: List[str] = None,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None,
    window_size: int = 2
) -> List[float]:
    tokens = normalize_pretokenize_text(text)

    if corpus is None:
        corpus = [text]
    if vocab is None:
        vocab, _ = build_vocab(corpus)
    if vocab_index is None:
        vocab_index = {word: idx for idx, word in enumerate(vocab)}

    vocab_size = len(vocab)
    corpus_stats = defaultdict(int)
    word_counts = Counter()
    total_pairs = 0

    for doc in corpus:
        doc_tokens = normalize_pretokenize_text(doc)
        for i, word in enumerate(doc_tokens):
            word_counts[word] += 1

            start = max(0, i - window_size)
            end = min(len(doc_tokens), i + window_size + 1)

            for j in range(start, end):
                if i != j:
                    context = doc_tokens[j]
                    corpus_stats[(word, context)] += 1
                    total_pairs += 1

    ppmi_vector = [0.0] * vocab_size

    for token in set(tokens):
        for i, context_word in enumerate(vocab):
            n_word_context = corpus_stats[(token, context_word)]

            if n_word_context == 0:
                continue

            n_word = word_counts[token]
            n_context = word_counts[context_word]

            pmi = math.log((n_word_context * total_pairs) / (n_word * n_context))
            ppmi = max(0, pmi)

            ppmi_vector[i] += ppmi

    return ppmi_vector

def test_ppmi_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "quick brown fox"
        result = ppmi_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("PPMI test PASSED")
        return True
    except Exception as e:
        print(f"PPMI test FAILED: {e}")
        return False

In [13]:
assert test_ppmi_vectorization(test_corpus, vocab, vocab_index)

PPMI test PASSED


## Задание 5 (1 балл)
Реализовать получение эмбеддингов из fasttext и bert (для bert лучше использовать CLS токен)

In [14]:
fasttext.util.download_model('en', if_exists='ignore')
fasttext_model = fasttext.load_model('cc.en.300.bin')

Downloading https://dl.fbaipublicfiles.com/fasttext/vectors-crawl/cc.en.300.bin.gz



In [15]:
def get_fasttext_embeddings(text: str, model: any = None) -> List[np.ndarray]:
    tokens = normalize_pretokenize_text(text)
    embeddings = [model.get_word_vector(token) for token in tokens]

    return embeddings

In [16]:
def get_bert_embeddings(
    text: str,
    model_name: str = 'bert-base-uncased',
    pool_method: str = 'cls'
) -> np.ndarray:
    tokenizer = BertTokenizer.from_pretrained(model_name)
    model = BertModel.from_pretrained(model_name)
    model.eval()

    inputs = tokenizer(text, truncation=True, max_length=512, return_tensors='pt')

    with torch.no_grad():
        outputs = model(**inputs)

    last_hidden_state = outputs.last_hidden_state
    embedding = last_hidden_state[:, 0, :].squeeze()

    return embedding.numpy()

## Задание 6 (1.5 балла)
Реализовать обучение так, чтобы можно было поверх эмбеддингов, реализованных в предыдущих заданиях, обучить какую-то модель (вероятно неглубокую, например, CatBoost) на задаче классификации текстов ([IMDB](https://huggingface.co/datasets/stanfordnlp/imdb)).

Вынесем создание датасетов и словаря за пределы функции vectorize_texts - это экономит память, которой почти не хватает, если не использовать батчи. 

In [32]:
train_dataset = datasets.load_dataset("imdb", split="train")
test_dataset = datasets.load_dataset("imdb", split="test")

train_dataset = train_dataset.shuffle(seed=42).select(range(100))
test_dataset = test_dataset.shuffle(seed=42).select(range(100))

train_texts = [item['text'] for item in train_dataset]
train_labels = [item['label'] for item in train_dataset]
test_texts = [item['text'] for item in test_dataset]
test_labels = [item['label'] for item in test_dataset]

vocab, vocab_index = build_vocab(train_texts)

Generating train split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating unsupervised split:   0%|          | 0/50000 [00:00<?, ? examples/s]

Для обучения вместо one-hot будет использовать multi-hot. По сути это такой же one-hot, но вместо матрицы будем выдавать одну строку - массив размера vocab_size, в котором стоит 1 стоят на позициях токенов, которые есть в тексте. Сделано это было для того, чтобы решить проблему разных размерностей one-hot матриц, т.к. тексты, содержащие разное кол-во слов, будут иметь разное кол-во строк в one-hot матрице. 

In [33]:
def multi_hot_vectorization(text, vocab, vocab_index):
    tokens = normalize_pretokenize_text(text)
    vocab_size = len(vocab)

    multi_hot_vector = [0] * vocab_size

    for token in tokens:
        idx = vocab_index.get(token, 0)
        multi_hot_vector[idx] = 1

    return multi_hot_vector

In [34]:
def vectorize_texts(texts, vectorizer_type):
    vectorized_data = []

    for text in texts:
        if vectorizer_type == "one_hot":
            vectorized_data.append(multi_hot_vectorization(text, vocab, vocab_index))
        elif vectorizer_type == "bow":
            bow_dict = bag_of_words_vectorization(text)
            vector = [bow_dict.get(word, 0) for word in vocab]
            vectorized_data.append(vector)
        elif vectorizer_type == "tfidf":
            vectorized_data.append(tf_idf_vectorization(text, texts, vocab, vocab_index))
        elif vectorizer_type == "ppmi":
            vectorized_data.append(ppmi_vectorization(text, texts, vocab, vocab_index))
        elif vectorizer_type == "fasttext":
            embeddings = get_fasttext_embeddings(text, fasttext_model)
            if embeddings:
                avg_embedding = np.mean(embeddings, axis=0)
                vectorized_data.append(avg_embedding.tolist())
            else:
                vectorized_data.append([0] * 300)
        elif vectorizer_type == "bert":
            embedding = get_bert_embeddings(text)
            vectorized_data.append(embedding.tolist())
        else:
            raise ValueError(f"Unknown vectorizer type: {vectorizer_type}")

    return vectorized_data

In [35]:
def train(embeddings_method):
    print(f"{embeddings_method} start.")
    X = vectorize_texts(train_texts, embeddings_method)
    X_test = vectorize_texts(test_texts, embeddings_method)

    X = np.array(X)
    y = np.array(train_labels)
    X_test = np.array(X_test)
    y_test = np.array(test_labels)

    X_train, X_val, y_train, y_val = train_test_split(
        X, y,
        test_size=0.2,
        random_state=42,
        stratify=y
    )

    model = CatBoostClassifier(
        iterations=500,
        learning_rate=0.1,
        depth=6,
        verbose=500,
        early_stopping_rounds=50
    )

    print(f"{embeddings_method} fit.")
    model.fit(X_train, y_train)

    y_test_pred = model.predict(X_test)
    test_acc = accuracy_score(y_test, y_test_pred)
    test_f1 = f1_score(y_test, y_test_pred)

    print(f"{embeddings_method}: test accuracy={test_acc}, test f1={test_f1}")

    del X_train, X_val, y_train, y_val, y_test_pred

    return test_acc, test_f1

In [36]:
results = {}

for embeddings_method in ["bow", "one_hot", "tfidf", "ppmi", "fasttext", "bert"]:
    acc, f1 = train(embeddings_method)
    results[embeddings_method] = {'accuracy': acc, 'f1': f1}

bow start.
bow fit.
0:	learn: 0.6428838	total: 41.2ms	remaining: 20.6s
499:	learn: 0.0016805	total: 12.8s	remaining: 0us
bow: test accuracy=0.6, test f1=0.5918367346938775
one_hot start.
one_hot fit.
0:	learn: 0.6566856	total: 25.5ms	remaining: 12.7s
499:	learn: 0.0020489	total: 13s	remaining: 0us
one_hot: test accuracy=0.63, test f1=0.6105263157894737
tfidf start.
tfidf fit.
0:	learn: 0.6621224	total: 31.6ms	remaining: 15.8s
499:	learn: 0.0016805	total: 16.5s	remaining: 0us
tfidf: test accuracy=0.64, test f1=0.5714285714285714
ppmi start.
ppmi fit.
0:	learn: 0.6348993	total: 200ms	remaining: 1m 39s
499:	learn: 0.0015972	total: 1m 25s	remaining: 0us
ppmi: test accuracy=0.52, test f1=0.6521739130434783
fasttext start.
fasttext fit.
0:	learn: 0.6276676	total: 35.3ms	remaining: 17.6s
499:	learn: 0.0018762	total: 14.4s	remaining: 0us
fasttext: test accuracy=0.63, test f1=0.5934065934065934
bert start.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

bert fit.
0:	learn: 0.6287975	total: 172ms	remaining: 1m 25s
499:	learn: 0.0011828	total: 36.9s	remaining: 0us
bert: test accuracy=0.64, test f1=0.64


In [37]:
print(results)

{'bow': {'accuracy': 0.6, 'f1': 0.5918367346938775}, 'one_hot': {'accuracy': 0.63, 'f1': 0.6105263157894737}, 'tfidf': {'accuracy': 0.64, 'f1': 0.5714285714285714}, 'ppmi': {'accuracy': 0.52, 'f1': 0.6521739130434783}, 'fasttext': {'accuracy': 0.63, 'f1': 0.5934065934065934}, 'bert': {'accuracy': 0.64, 'f1': 0.64}}


### Выводы
Хоть на большом корпусе обучить и не получилось из-за слишком долгой векторизации, поэтому результаты не совсем достоверные. Но даже на небольшом корпусе видно преимущество bert перед остальными видами векторизации. Достаточно долго векторизуются tf-idf и ppmi. bow векторизуется быстрее всего. bert и fasttext как будто векторизуются быстрее чем tf-idf