In [2]:
!pip install catboost fasttext -q

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/73.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m73.4/73.4 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m99.2/99.2 MB[0m [31m9.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for fasttext (pyproject.toml) ... [?25l[?25hdone


In [3]:
import math
import re
import os
import random
import json
from collections import Counter, defaultdict
from typing import List, Dict, Tuple, Any
from functools import cache

import torch
import numpy as np
import datasets
import fasttext
import fasttext.util
from transformers import BertTokenizer, BertModel

In [4]:
def seed_everything(seed: int):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(42)

In [5]:
def normalize_pretokenize_text(text: str) -> List[str]:
    text = text.lower()
    words = re.findall(r'\b\w+\b', text)
    return words

In [10]:
normalize_pretokenize_text('foo, BAr bAzz!')

['foo', 'bar', 'bazz']

In [14]:
# This block is for tests only
test_corpus = [
    "the quick brown fox jumps over the lazy dog",
    "never jump over the lazy dog quickly",
    "brown foxes are quick and dogs are lazy"
]

def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
    all_words = []
    for text in texts:
        words = normalize_pretokenize_text(text)
        all_words.extend(words)
    vocab = sorted(set(all_words))
    vocab_index = {word: idx for idx, word in enumerate(vocab)}
    return vocab, vocab_index

vocab, vocab_index = build_vocab(test_corpus)
vocab_index

{'and': 0,
 'are': 1,
 'brown': 2,
 'dog': 3,
 'dogs': 4,
 'fox': 5,
 'foxes': 6,
 'jump': 7,
 'jumps': 8,
 'lazy': 9,
 'never': 10,
 'over': 11,
 'quick': 12,
 'quickly': 13,
 'the': 14}

Задание 1. Реализация One-Hot векторизации:

In [6]:
def one_hot_vectorization(
    text: str,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None
) -> List[List[int]]:
    words = normalize_pretokenize_text(text)
    result = []
    for word in words:
        if word in vocab:
            idx = vocab_index[word]
            one_hotted = np.zeros(len(vocab))
            one_hotted[idx] = 1
            result.append(one_hotted)
    return result

def test_one_hot_vectorization(
    vocab: List[str],
    vocab_index: Dict[str, int]
) -> bool:
    try:
        text = "the quick brown fox"
        result = one_hot_vectorization(text, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result[0]) != expected_length:
            return False

        words_in_text = normalize_pretokenize_text(text)
        for i, word in enumerate(words_in_text):
            if word in vocab_index:
                idx = vocab_index[word]
                if result[i][idx] != 1:
                    return False

        print("One-Hot-Vectors test PASSED")

        return True
    except Exception as e:
        print(f"One-Hot-Vectors test FAILED: {e}")
        return False

In [23]:
assert test_one_hot_vectorization(vocab, vocab_index)

One-Hot-Vectors test PASSED


Задание 2. Реализация Bag-of-words

In [7]:
def bag_of_words_vectorization(text: str) -> Dict[str, int]:
    result = {}
    words = normalize_pretokenize_text(text)
    for word in words:
        result[word] = words.count(word)
    return result

def test_bag_of_words_vectorization() -> bool:
    try:
        text = "the the quick brown brown brown"
        result = bag_of_words_vectorization(text)

        if not isinstance(result, dict):
            return False

        if result.get('the', 0) != 2:
            return False
        if result.get('quick', 0) != 1:
            return False
        if result.get('brown', 0) != 3:
            return False
        if result.get('nonexistent', 0) != 0:
            return False

        print("Bad-of-Words test PASSED")
        return True
    except Exception as e:
        print(f"Bag-of-Words test FAILED: {e}")
        return False

In [30]:
assert test_bag_of_words_vectorization()

Bad-of-Words test PASSED


Задание 3. Реализация TF-IDF

In [8]:
def tf_idf_vectorization(text: str, corpus: List[str] = None, vocab: List[str] = None, vocab_index: Dict[str, int] = None) -> List[float]:
    words = normalize_pretokenize_text(text)
    result = []

    for word in vocab:
        tf = words.count(word) / len(words)

        docs_with_word = [doc for doc in corpus if word in normalize_pretokenize_text(doc)]
        idf = math.log( len(corpus) / len(docs_with_word) )

        result.append(tf * idf)
    return result

def test_tf_idf_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "the quick brown"
        result = tf_idf_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("TF-IDF test PASSED")
        return True
    except Exception as e:
        print(f"TF-IDF test FAILED: {e}")
        return False

In [54]:
assert test_tf_idf_vectorization(test_corpus, vocab, vocab_index)

TF-IDF test PASSED


Задание 4. Реализация Positive Pointwise Mutual Information (PPMI)

In [9]:
def ppmi_vectorization(
    text: str,
    corpus: List[str] = None,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None,
    window_size: int = 2
) -> List[float]:
    words = normalize_pretokenize_text(text)
    result = []

    word_freq = {}
    for word in words:
        if word in vocab_index:
            word_freq[word] = word_freq[word] + 1 if word in word_freq else 1

    pairs = {}
    total_pairs = 0

    for i, target_word in enumerate(words):
        start = max(0, i - window_size)
        end = min(len(words), i + window_size + 1)

        for j in range(start, end):
            if words[j] in vocab_index and i != j:
                context_word = words[j]
                pair = (target_word, context_word)
                pairs[pair] = pairs[pair] + 1 if pair in pairs else 1
                total_pairs += 1

    result = [0.0] * len(vocab)

    for (word, context), N_word_context in pairs.items():
        P_word = word_freq[word] / len(words)

        P_context = word_freq[context] / len(words)

        P_word_context = N_word_context / total_pairs

        if P_word > 0 and P_context > 0:
            pmi = math.log(P_word_context / (P_word * P_context))
            ppmi = max(0, pmi)

            word_idx = vocab_index[word]
            result[word_idx] += ppmi
    return result

def test_ppmi_vectorization(corpus, vocab, vocab_index) -> bool:
    try:
        text = "quick brown fox"
        result = ppmi_vectorization(text, corpus, vocab, vocab_index)

        if not isinstance(result, list):
            return False

        expected_length = len(vocab)
        if len(result) != expected_length:
            return False

        for val in result:
            if not isinstance(val, float):
                return False

        print("PPMI test PASSED")
        return True
    except Exception as e:
        print(f"PPMI test FAILED: {e}")
        return False

In [69]:
assert test_ppmi_vectorization(test_corpus, vocab, vocab_index)

PPMI test PASSED


Задание 5. Реализация получения эмбеддингов из fasttext и bert

In [10]:
def get_fasttext_embeddings(text: str, model_path: str = None, model: any = None) -> List[np.ndarray]:
    words = normalize_pretokenize_text(text)
    embeddings = []

    if model is None: model = fasttext.load_model(model_path)

    for word in words:
        word_embedding = model.get_word_vector(word)
        embeddings.append(word_embedding)
    return embeddings

@cache
def get_bert_model_and_tokenizer(model_name: str = 'bert-base-uncased'):
    tokenizer = BertTokenizer.from_pretrained(model_name)
    model = BertModel.from_pretrained(model_name)
    return tokenizer, model

def get_bert_embeddings(
    text: str,
    model_name: str = 'bert-base-uncased',
    pool_method: str = 'cls'
) -> np.ndarray:

    tokenizer, model = get_bert_model_and_tokenizer(model_name)

    inputs = tokenizer(text, return_tensors='pt', truncation=True, padding=True)
    with torch.no_grad(): outputs = model(**inputs)

    last_hidden_states = outputs.last_hidden_state

    embeddings = last_hidden_states[:, 0, :].numpy()[0]

    return embeddings

In [12]:
get_bert_embeddings('dog')

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

array([-3.49031001e-01,  2.51017064e-01, -8.05725753e-02, -9.32775438e-02,
       -1.42596230e-01,  2.12208368e-02,  2.83211410e-01,  1.72544390e-01,
       -3.20468843e-01,  1.23535302e-02,  1.53326178e-02,  4.00188118e-02,
       -7.95494691e-02,  2.74241745e-01,  3.93711068e-02, -1.47185430e-01,
       -3.26915026e-01,  4.87013400e-01,  2.70929545e-01, -3.03769678e-01,
       -1.23750336e-01, -2.69255191e-01, -2.86421537e-01, -1.46041602e-01,
        1.03357621e-01,  7.58455247e-02, -2.34735111e-04,  9.65043083e-02,
        3.94507498e-02, -7.09338933e-02,  9.79221091e-02,  7.94589799e-03,
       -1.13117211e-01,  9.06939432e-02,  4.23097573e-02,  2.36044805e-02,
        9.08403546e-02, -1.89302206e-01,  1.07727990e-01,  7.62883648e-02,
        2.25753844e-01,  9.55467373e-02,  1.60798654e-01,  4.06255350e-02,
        3.96400169e-02, -2.46499553e-01, -1.88536489e+00, -1.79066598e-01,
       -1.97380245e-01, -1.30142733e-01,  1.54616132e-01, -7.34338537e-02,
        3.62376124e-01, -

Задание 6. Обучение CatBoost на задаче классификации текстов IMDB поверх реализованных эмбеддингов

In [11]:
@cache
def load_cached_dataset(dataset_name: str, split: str, sample_size: int):
    dataset = datasets.load_dataset(dataset_name, split=split)
    dataset = dataset.shuffle(seed=42)
    if sample_size:
        dataset = dataset.select(range(min(sample_size, len(dataset))))
    return dataset

def vectorize_dataset(
    dataset_name: str = "imdb",
    vectorizer_type: str = "bow",
    split: str = "train",
    sample_size: int = 2500,
    bert_tokenizer=None,
    bert_model=None,
    vocab: List[str] = None,
    vocab_index: Dict[str, int] = None
) -> Tuple[Any, List, List]:

    dataset = load_cached_dataset(dataset_name, split, sample_size)

    if sample_size:
        dataset = dataset.select(range(min(sample_size, len(dataset))))

    texts = [item['text'] for item in dataset if 'text' in item and item['text'].strip()]
    labels = [item['label'] for item in dataset if 'label' in item]

    def build_vocab(texts: List[str]) -> Tuple[List[str], Dict[str, int]]:
        all_words = []
        for text in texts:
            words = normalize_pretokenize_text(text)
            all_words.extend(words)
        vocab = sorted(set(all_words))
        vocab_index = {word: idx for idx, word in enumerate(vocab)}
        return vocab, vocab_index

    if vectorizer_type in ["one_hot", "bow", "tfidf", "ppmi"]:
        vocab, vocab_index = build_vocab(texts)
    else:
        vocab, vocab_index = None, None

    vectorized_data = []
    for i, text in enumerate(texts):
        if i % 100 == 0:
            print(i, '/', len(texts), 'texts done')
        if vectorizer_type == "one_hot":
            word_vectors = one_hot_vectorization(text, vocab, vocab_index)
            if word_vectors:
                doc_vector = np.sum(word_vectors, axis=0)
                vectorized_data.append(doc_vector.tolist())
            else:
                vectorized_data.append([0] * len(vocab))
        elif vectorizer_type == "bow":
            bow_dict = bag_of_words_vectorization(text)
            vector = [bow_dict.get(word, 0) for word in vocab]
            vectorized_data.append(vector)
        elif vectorizer_type == "tfidf":
            vectorized_data.append(tf_idf_vectorization(text, texts, vocab, vocab_index))
        elif vectorizer_type == "ppmi":
            vectorized_data.append(ppmi_vectorization(text, texts, vocab, vocab_index))
        elif vectorizer_type == "bert":
            embedding = get_bert_embeddings(text)
            vectorized_data.append(embedding.tolist())
        else:
            raise ValueError(f"Unknown vectorizer type: {vectorizer_type}")
    return vocab, vectorized_data, labels

In [None]:
from catboost import CatBoostClassifier
from sklearn.metrics import classification_report, accuracy_score
from sklearn.model_selection import train_test_split, cross_val_score, KFold

def train(
    embeddings_method="bow",
    test_size=0.2,
    val_size=0.2,
    cv_folds=3
):
    if embeddings_method in ["one_hot", "bow", "tfidf", "ppmi"]:
        vocab, X, y = vectorize_dataset("imdb", embeddings_method, "train", sample_size=1000)

        vocab_index = {word: idx for idx, word in enumerate(vocab)}

        _, X_test, y_test = vectorize_dataset(
            "imdb", embeddings_method, "test", sample_size=250,
            vocab=vocab, vocab_index=vocab_index
        )
    else:
        vocab, X, y = vectorize_dataset("imdb", embeddings_method, "train", sample_size=1000)
        _, X_test, y_test = vectorize_dataset("imdb", embeddings_method, "test", sample_size=250)

    if len(X[0]) != len(X_test[0]): # костыль, был конфликт размерностей
        min_dim = min(len(X[0]), len(X_test[0]))
        X = [x[:min_dim] for x in X]
        X_test = [x[:min_dim] for x in X_test]

    X, y = np.array(X), np.array(y)
    X_test, y_test = np.array(X_test), np.array(y_test)

    X_train, X_val, y_train, y_val = train_test_split(
        X, y, test_size=val_size, random_state=42, stratify=y
    )

    model = CatBoostClassifier(
        iterations=500,
        learning_rate=0.1,
        depth=6,
        loss_function='Logloss',
        random_seed=42,
        verbose=100,
        early_stopping_rounds=50
    )

    print("Training...")
    model.fit(
        X_train, y_train,
        eval_set=(X_val, y_val),
        verbose=100
    )

    print("Cross-validation...")
    cv = KFold(n_splits=cv_folds, shuffle=True, random_state=42)
    cv_scores = cross_val_score(model, X, y, cv=cv, scoring='accuracy')
    print(f"CV Accuracy: {cv_scores.mean():.4f}")

    y_pred = model.predict(X_test)
    y_pred_proba = model.predict_proba(X_test)[:, 1]

    accuracy = accuracy_score(y_test, y_pred)

    print(f"Results for {embeddings_method}:")
    print(f"Accuracy: {accuracy:.4f}")
    print("Classification Report:")
    print(classification_report(y_test, y_pred))

    return model, accuracy

Обучаем:

In [14]:
# подсчет tf-idf оказался слишком долгим, а с fasttext проблема: не помещается у меня в памяти, а в colab 7.5 ГБ загружаются неприлично долго... :(
results = {}
for embeddings_method in ["one_hot", "bow", "ppmi", "bert"]:
    try:
        model, accuracy = train(embeddings_method=embeddings_method)
        results[embeddings_method] = {
            'model': model,
            'accuracy': accuracy
        }
    except Exception as e:
        print(f"Error with {embeddings_method}: {e}")
        results[embeddings_method] = None

print("FINAL:")
for method, result in results.items():
    if result is not None:
        print(f"{method:10} | Accuracy: {result['accuracy']:.4f}")

0 / 1000 texts done
100 / 1000 texts done
200 / 1000 texts done
300 / 1000 texts done
400 / 1000 texts done
500 / 1000 texts done
600 / 1000 texts done
700 / 1000 texts done
800 / 1000 texts done
900 / 1000 texts done
0 / 250 texts done
100 / 250 texts done
200 / 250 texts done
Training...
0:	learn: 0.6785358	test: 0.6812431	best: 0.6812431 (0)	total: 74.6ms	remaining: 37.2s
100:	learn: 0.3069737	test: 0.5116942	best: 0.5107432 (99)	total: 2.7s	remaining: 10.7s
200:	learn: 0.1543643	test: 0.4800938	best: 0.4765730 (191)	total: 6.43s	remaining: 9.57s
300:	learn: 0.0990950	test: 0.4669781	best: 0.4666417 (296)	total: 10.2s	remaining: 6.72s
400:	learn: 0.0677991	test: 0.4652631	best: 0.4594003 (367)	total: 12.8s	remaining: 3.16s
Stopped by overfitting detector  (50 iterations wait)

bestTest = 0.4594002796
bestIteration = 367

Shrink model to first 368 iterations.
Cross-validation...
0:	learn: 0.6812362	total: 25.5ms	remaining: 12.7s
100:	learn: 0.2757939	total: 2.46s	remaining: 9.72s
200

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

100 / 1000 texts done
200 / 1000 texts done
300 / 1000 texts done
400 / 1000 texts done
500 / 1000 texts done
600 / 1000 texts done
700 / 1000 texts done
800 / 1000 texts done
900 / 1000 texts done
0 / 250 texts done
100 / 250 texts done
200 / 250 texts done
Training...
0:	learn: 0.6658904	test: 0.6786301	best: 0.6786301 (0)	total: 405ms	remaining: 3m 21s
100:	learn: 0.0620269	test: 0.4354091	best: 0.4335890 (99)	total: 26.4s	remaining: 1m 44s
Stopped by overfitting detector  (50 iterations wait)

bestTest = 0.4335890295
bestIteration = 99

Shrink model to first 100 iterations.
Cross-validation...
0:	learn: 0.6589944	total: 387ms	remaining: 3m 13s
100:	learn: 0.0439015	total: 24.2s	remaining: 1m 35s
200:	learn: 0.0085480	total: 50s	remaining: 1m 14s
300:	learn: 0.0037553	total: 1m 15s	remaining: 49.7s
400:	learn: 0.0023701	total: 1m 40s	remaining: 24.9s
499:	learn: 0.0019270	total: 2m 6s	remaining: 0us
0:	learn: 0.6572140	total: 260ms	remaining: 2m 9s
100:	learn: 0.0490899	total: 24.2s

Итак, нормальную метрику получили только при BERT-эмбеддингах