In [3]:
import gzip
import re
from dataclasses import dataclass
from typing import List, Iterator
from pathlib import Path

import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.svm import LinearSVC
from sklearn.metrics import classification_report, accuracy_score

from gensim.models import Word2Vec
import pymorphy3
from nltk.corpus import stopwords
import nltk

In [4]:
try:
    stop_words = set(stopwords.words('russian'))
except:
    nltk.download('stopwords')
    stop_words = set(stopwords.words('russian'))

morph = pymorphy3.MorphAnalyzer()

In [5]:
@dataclass
class NewsArticle:
    """Класс для представления новостной статьи"""
    category: str
    title: str
    text: str


def load_data(filepath: str) -> List[NewsArticle]:
    articles = []
    
    if filepath.endswith('.gz'):
        open_func = gzip.open
        mode = 'rt'
    else:
        open_func = open
        mode = 'r'
    
    with open_func(filepath, mode, encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            
            parts = line.split('\t')
            if len(parts) >= 3:
                category, title, text = parts[0], parts[1], parts[2]
                articles.append(NewsArticle(
                    category=category.strip(),
                    title=title.strip(),
                    text=text.strip()
                ))
    
    return articles

In [6]:
def preprocess_text(text: str, normalize: bool = True, remove_stopwords: bool = True) -> List[str]:
    """
    Предобработка текста: токенизация, нормализация, удаление стоп-слов
    """
    # Токенизация (разбиение на слова)
    words = re.findall(r'\b[а-яёa-z]+\b', text.lower())
    
    processed_words = []
    for word in words:
        if normalize:
            parsed = morph.parse(word)[0]
            word = parsed.normal_form
        
        if remove_stopwords and word in stop_words:
            continue
        
        if len(word) < 2:
            continue
        
        processed_words.append(word)
    
    return processed_words

In [7]:
def prepare_corpus(articles: List[NewsArticle], normalize: bool = True, remove_stopwords: bool = True) -> List[List[str]]:
    """
    Подготавливает корпус предложений для обучения Word2Vec
    """
    corpus = []
    for article in articles:
        full_text = f"{article.title} {article.text}"
        tokens = preprocess_text(full_text, normalize, remove_stopwords)
        if tokens:
            corpus.append(tokens)
    return corpus

In [8]:
def train_word2vec(corpus: List[List[str]], vector_size: int = 100, window: int = 5, 
                   min_count: int = 2, workers: int = 4) -> Word2Vec:
    """
    Обучает модель Word2Vec на корпусе текстов
    """
    model = Word2Vec(
        sentences=corpus,
        vector_size=vector_size,
        window=window,
        min_count=min_count,
        workers=workers,
        sg=0
    )
    return model

In [9]:
def document_to_vector_avg(document_tokens: List[str], word2vec_model: Word2Vec) -> np.ndarray:
    """
    Базовый метод: преобразует документ в вектор путем усреднения векторов слов
    """
    vectors = []
    for token in document_tokens:
        if token in word2vec_model.wv:
            vectors.append(word2vec_model.wv[token])
    
    if vectors:
        return np.mean(vectors, axis=0)
    else:
        return np.zeros(word2vec_model.vector_size)

In [10]:
def document_to_vector_weighted(document_tokens: List[str], word2vec_model: Word2Vec, 
                                word_freq: dict = None) -> np.ndarray:
    """
    Альтернативный метод: взвешенное усреднение векторов слов (TF)
    """
    vectors = []
    weights = []
    
    if word_freq is None:
        word_freq = {}
        for token in document_tokens:
            word_freq[token] = word_freq.get(token, 0) + 1
    
    total_freq = sum(word_freq.values())
    if total_freq == 0:
        return np.zeros(word2vec_model.vector_size)
    
    for token in document_tokens:
        if token in word2vec_model.wv:
            vectors.append(word2vec_model.wv[token])
            weights.append(word_freq.get(token, 1) / total_freq)
    
    if vectors:
        weights = np.array(weights)
        weights = weights / weights.sum()
        return np.average(vectors, axis=0, weights=weights)
    else:
        return np.zeros(word2vec_model.vector_size)

In [11]:
def compute_idf(corpus: List[List[str]], word2vec_model: Word2Vec) -> dict:
    """
    Вычисляет IDF (Inverse Document Frequency) для слов в корпусе
    
    Args:
        corpus: корпус документов
        word2vec_model: модель Word2Vec (для фильтрации слов)
    
    Returns:
        Словарь {слово: idf}
    """
    from math import log
    
    # Подсчитываем количество документов, содержащих каждое слово
    doc_freq = {}
    total_docs = len(corpus)
    
    for doc_tokens in corpus:
        unique_tokens = set(doc_tokens)
        for token in unique_tokens:
            if token in word2vec_model.wv:
                doc_freq[token] = doc_freq.get(token, 0) + 1
    
    # Вычисляем IDF
    idf = {}
    for token, df in doc_freq.items():
        # IDF = log(общее_количество_документов / количество_документов_с_словом)
        idf[token] = log(total_docs / (df + 1))  # +1 для избежания деления на 0
    
    return idf

In [12]:
def document_to_vector_tfidf(document_tokens: List[str], word2vec_model: Word2Vec,
                             word_freq: dict, idf_dict: dict) -> np.ndarray:
    """
    Улучшенный метод: взвешенное усреднение с использованием TF-IDF
    """
    vectors = []
    weights = []
    
    total_freq = sum(word_freq.values())
    if total_freq == 0:
        return np.zeros(word2vec_model.vector_size)
    
    for token in document_tokens:
        if token in word2vec_model.wv and token in idf_dict:
            vectors.append(word2vec_model.wv[token])
            tf = word_freq.get(token, 0) / total_freq
            idf = idf_dict[token]
            weights.append(tf * idf)
    
    if vectors:
        weights = np.array(weights)
        weights = weights / (weights.sum() + 1e-10)
        return np.average(vectors, axis=0, weights=weights)
    else:
        return np.zeros(word2vec_model.vector_size)

In [13]:
def document_to_vector_maxpool(document_tokens: List[str], word2vec_model: Word2Vec) -> np.ndarray:
    """
    Метод максимального пулинга: берет максимальное значение по каждой размерности
    """
    vectors = []
    for token in document_tokens:
        if token in word2vec_model.wv:
            vectors.append(word2vec_model.wv[token])
    
    if vectors:
        return np.max(vectors, axis=0)
    else:
        return np.zeros(word2vec_model.vector_size)

In [14]:
def document_to_vector_combined(document_tokens: List[str], word2vec_model: Word2Vec) -> np.ndarray:
    """
    Комбинированный метод: конкатенация усреднения и максимального пулинга
    """
    avg_vec = document_to_vector_avg(document_tokens, word2vec_model)
    max_vec = document_to_vector_maxpool(document_tokens, word2vec_model)
    return np.concatenate([avg_vec, max_vec])

In [15]:
def vectorize_documents(corpus: List[List[str]], word2vec_model: Word2Vec, 
                       method: str = 'avg', idf_dict: dict = None) -> np.ndarray:
    """
    Преобразует корпус документов в матрицу векторов
    """
    vectors = []
    for doc_tokens in corpus:
        if method == 'weighted':
            word_freq = {}
            for token in doc_tokens:
                word_freq[token] = word_freq.get(token, 0) + 1
            vec = document_to_vector_weighted(doc_tokens, word2vec_model, word_freq)
        elif method == 'tfidf':
            if idf_dict is None:
                raise ValueError("idf_dict required for tfidf method")
            word_freq = {}
            for token in doc_tokens:
                word_freq[token] = word_freq.get(token, 0) + 1
            vec = document_to_vector_tfidf(doc_tokens, word2vec_model, word_freq, idf_dict)
        elif method == 'maxpool':
            vec = document_to_vector_maxpool(doc_tokens, word2vec_model)
        elif method == 'combined':
            vec = document_to_vector_combined(doc_tokens, word2vec_model)
        else:  # 'avg'
            vec = document_to_vector_avg(doc_tokens, word2vec_model)
        vectors.append(vec)
    
    return np.array(vectors)

In [16]:
data_path = './data/news.txt.gz'
articles = load_data(data_path)
print(f"Загружено статей: {len(articles)}")
print(f"Пример категорий: {set([a.category for a in articles[:10]])}")

Загружено статей: 10000
Пример категорий: {'economics', 'style', 'media', 'forces', 'culture', 'sport'}


In [17]:
corpus = prepare_corpus(articles, normalize=True, remove_stopwords=True)
print(f"Подготовлено предложений для обучения: {len(corpus)}")

Подготовлено предложений для обучения: 10000


In [18]:
print("Обучение Word2Vec модели...")
word2vec_model = train_word2vec(corpus, vector_size=100, window=5, min_count=2)
print(f"Размер словаря модели: {len(word2vec_model.wv)}")
print(f"Примеры слов в модели: {list(word2vec_model.wv.key_to_index.keys())[:10]}")

Обучение Word2Vec модели...
Размер словаря модели: 39046
Примеры слов в модели: ['год', 'это', 'который', 'россия', 'свой', 'также', 'компания', 'сообщать', 'стать', 'российский']


In [19]:
train_articles, test_articles = train_test_split(
    articles, test_size=0.2, random_state=42, stratify=[a.category for a in articles]
)
print(f"Обучающая выборка: {len(train_articles)} статей")
print(f"Тестовая выборка: {len(test_articles)} статей")

Обучающая выборка: 8000 статей
Тестовая выборка: 2000 статей


In [20]:
train_corpus = prepare_corpus(train_articles, normalize=True, remove_stopwords=True)
test_corpus = prepare_corpus(test_articles, normalize=True, remove_stopwords=True)

print("Векторизация документов (метод: усреднение)...")
X_train = vectorize_documents(train_corpus, word2vec_model, method='avg')
X_test = vectorize_documents(test_corpus, word2vec_model, method='avg')
y_train = [article.category for article in train_articles]
y_test = [article.category for article in test_articles]
print(f"Размерность векторов: {X_train.shape}")

Векторизация документов (метод: усреднение)...
Размерность векторов: (8000, 100)


In [21]:
svm_classifier = LinearSVC(random_state=42, max_iter=1000)
svm_classifier.fit(X_train, y_train)
y_pred = svm_classifier.predict(X_test)

accuracy = accuracy_score(y_test, y_pred)
print(f"\nТочность классификации (базовый метод): {accuracy:.4f}")
print("\nОтчет о классификации:")
print(classification_report(y_test, y_pred))

results = {}
results['avg'] = accuracy


Точность классификации (базовый метод): 0.8065

Отчет о классификации:
              precision    recall  f1-score   support

    business       0.67      0.19      0.30        72
     culture       0.86      0.91      0.89       279
   economics       0.76      0.89      0.82       275
      forces       0.72      0.76      0.74       154
        life       0.72      0.79      0.76       273
       media       0.80      0.75      0.77       295
     science       0.82      0.79      0.80       286
       sport       0.96      0.97      0.96       288
       style       0.79      0.69      0.74        39
      travel       0.79      0.38      0.52        39

    accuracy                           0.81      2000
   macro avg       0.79      0.71      0.73      2000
weighted avg       0.80      0.81      0.80      2000



In [22]:
print("\n--- Метод 1: Взвешенное усреднение по TF ---")
X_train_weighted = vectorize_documents(train_corpus, word2vec_model, method='weighted')
X_test_weighted = vectorize_documents(test_corpus, word2vec_model, method='weighted')
svm_weighted = LinearSVC(random_state=42, max_iter=1000)
svm_weighted.fit(X_train_weighted, y_train)
y_pred_weighted = svm_weighted.predict(X_test_weighted)
accuracy_weighted = accuracy_score(y_test, y_pred_weighted)
results['weighted'] = accuracy_weighted
print(f"Точность: {accuracy_weighted:.4f}")


--- Метод 1: Взвешенное усреднение по TF ---
Точность: 0.7855


In [23]:
print("\n--- Метод 2: TF-IDF взвешивание ---")
print("Вычисление IDF...")
idf_dict = compute_idf(train_corpus, word2vec_model)
print(f"Вычислено IDF для {len(idf_dict)} слов")

X_train_tfidf = vectorize_documents(train_corpus, word2vec_model, method='tfidf', idf_dict=idf_dict)
X_test_tfidf = vectorize_documents(test_corpus, word2vec_model, method='tfidf', idf_dict=idf_dict)
svm_tfidf = LinearSVC(random_state=42, max_iter=1000)
svm_tfidf.fit(X_train_tfidf, y_train)
y_pred_tfidf = svm_tfidf.predict(X_test_tfidf)
accuracy_tfidf = accuracy_score(y_test, y_pred_tfidf)
results['tfidf'] = accuracy_tfidf
print(f"Точность: {accuracy_tfidf:.4f}")


--- Метод 2: TF-IDF взвешивание ---
Вычисление IDF...
Вычислено IDF для 37362 слов
Точность: 0.7870


In [24]:
print("\n--- Метод 3: Максимальный пулинг ---")
X_train_maxpool = vectorize_documents(train_corpus, word2vec_model, method='maxpool')
X_test_maxpool = vectorize_documents(test_corpus, word2vec_model, method='maxpool')
svm_maxpool = LinearSVC(random_state=42, max_iter=1000)
svm_maxpool.fit(X_train_maxpool, y_train)
y_pred_maxpool = svm_maxpool.predict(X_test_maxpool)
accuracy_maxpool = accuracy_score(y_test, y_pred_maxpool)
results['maxpool'] = accuracy_maxpool
print(f"Точность: {accuracy_maxpool:.4f}")


--- Метод 3: Максимальный пулинг ---
Точность: 0.6815


In [25]:
print("\n--- Метод 4: Комбинированный (avg + maxpool) ---")
X_train_combined = vectorize_documents(train_corpus, word2vec_model, method='combined')
X_test_combined = vectorize_documents(test_corpus, word2vec_model, method='combined')
svm_combined = LinearSVC(random_state=42, max_iter=1000)
svm_combined.fit(X_train_combined, y_train)
y_pred_combined = svm_combined.predict(X_test_combined)
accuracy_combined = accuracy_score(y_test, y_pred_combined)
results['combined'] = accuracy_combined
print(f"Точность: {accuracy_combined:.4f}")


--- Метод 4: Комбинированный (avg + maxpool) ---
Точность: 0.8100


In [26]:
best_method = max(results, key=results.get)
best_accuracy = results[best_method]
print(f"\n--- Детальный отчет для лучшего метода: {best_method} ---")
if best_method == 'avg':
    print(classification_report(y_test, y_pred))
elif best_method == 'weighted':
    print(classification_report(y_test, y_pred_weighted))
elif best_method == 'tfidf':
    print(classification_report(y_test, y_pred_tfidf))
elif best_method == 'maxpool':
    print(classification_report(y_test, y_pred_maxpool))
elif best_method == 'combined':
    print(classification_report(y_test, y_pred_combined))


--- Детальный отчет для лучшего метода: combined ---
              precision    recall  f1-score   support

    business       0.52      0.32      0.40        72
     culture       0.87      0.90      0.89       279
   economics       0.77      0.85      0.81       275
      forces       0.74      0.77      0.76       154
        life       0.72      0.81      0.76       273
       media       0.82      0.76      0.79       295
     science       0.82      0.79      0.80       286
       sport       0.95      0.97      0.96       288
       style       0.80      0.72      0.76        39
      travel       0.68      0.38      0.49        39

    accuracy                           0.81      2000
   macro avg       0.77      0.73      0.74      2000
weighted avg       0.81      0.81      0.81      2000



In [27]:
print("\n" + "=" * 60)
print("Сравнение всех методов:")
print("=" * 60)
sorted_results = sorted(results.items(), key=lambda x: x[1], reverse=True)
for i, (method, acc) in enumerate(sorted_results, 1):
    improvement = acc - accuracy
    marker = " ← ЛУЧШИЙ" if method == best_method else ""
    print(f"{i}. {method:15s}: {acc:.4f} ({improvement:+.4f}){marker}")
print("=" * 60)


Сравнение всех методов:
1. combined       : 0.8100 (+0.0035) ← ЛУЧШИЙ
2. avg            : 0.8065 (+0.0000)
3. tfidf          : 0.7870 (-0.0195)
4. weighted       : 0.7855 (-0.0210)
5. maxpool        : 0.6815 (-0.1250)
