In [None]:
!pip install natasha -q

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import classification_report, confusion_matrix
import os
import re
import json
from collections import Counter
from tqdm.notebook import tqdm 

# --- Глобальные настройки ---
# Пути для Kaggle
DATA_PATH = '/kaggle/input/toxic-russian-comments-from-pikabu-and-2ch/russian_comments_from_2ch_pikabu.csv'
WORKING_DIR = '/kaggle/working/'

# Параметры модели
VOCAB_SIZE = 20000      # Размер словаря (увеличено для более качественного словаря)
EMBED_DIM = 256         # Размерность эмбеддингов
MAX_SEQ_LEN = 128       # Максимальная длина последовательности
NUM_FILTERS = 256       # Количество сверточных фильтров
KERNEL_SIZES = [3, 4, 5]# Размеры ядер сверток
NUM_CLASSES = 2
DROPOUT_RATE = 0.3

# Параметры обучения
BATCH_SIZE = 64         
LEARNING_RATE = 1e-5
EPOCHS = 20             

# Выбор устройства (GPU, если доступно)
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Используется устройство: {DEVICE}")

# Пути для сохранения артефактов
VOCAB_PATH = os.path.join(WORKING_DIR, 'vocab.json')
MODEL_FP32_PATH = os.path.join(WORKING_DIR, 'solo_cnn_fp32.pth')
MODEL_INT8_PATH = os.path.join(WORKING_DIR, 'solo_cnn_int8.pth')

In [None]:
df = pd.read_csv(DATA_PATH)
df.dropna(subset=['comment', 'toxic'], inplace=True)
df['toxic'] = df['toxic'].astype(int)
df = df.rename(columns={'toxic': 'label'}) # Переименуем для единообразия

# Разделяем данные
train_df, val_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df['label'])
print(f"Размер обучающей выборки: {len(train_df)}")
print(f"Размер валидационной выборки: {len(val_df)}")

# Рассчитываем веса классов для борьбы с дисбалансом
class_weights = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(train_df['label']),
    y=train_df['label'].to_numpy()
)
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float).to(DEVICE)
print(f"\nРассчитанные веса для классов [0, 1]: {class_weights_tensor}")

In [None]:
from natasha import (Segmenter, Doc)
import re
from collections import Counter
from tqdm.notebook import tqdm


segmenter = Segmenter()

def robust_tokenizer(text):
    """
    Надежная токенизация:
    1. Правильное разделение на токены с помощью Natasha.
    2. Очистка от мусора (не-слов).
    3. Приведение к нижнему регистру.
    Лемматизация НЕ используется для максимальной надежности.
    """
    if not isinstance(text, str) or not text.strip(): 
        return []
    
    text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
    text = re.sub(r'\@\w+', '', text)
    
    doc = Doc(text)
    doc.segment(segmenter)
    
    # Просто берем текст токена, если это слово, и приводим к нижнему регистру
    tokens = [
        token.text.lower() for token in doc.tokens 
        if token.text.isalpha() # Оставляем только токены, состоящие из букв
    ]
            
    return tokens

def build_vocab_robust(texts, vocab_size):
    token_counts = Counter()
    for text in tqdm(texts, desc="Построение словаря"):
        token_counts.update(robust_tokenizer(text))
        
    vocab = {'<pad>': 0, '<unk>': 1}
    for token, _ in token_counts.most_common(vocab_size - 2):
        vocab[token] = len(vocab)
    return vocab

print("Построение словаря на обучающих данных (надежный режим)...")
# Убедитесь, что в Ячейке 2 вы переименовали 'comment' в 'text'
vocab = build_vocab_robust(train_df['comment'].to_list(), VOCAB_SIZE)
print(f"Размер построенного словаря: {len(vocab)}")

if len(vocab) < 10:
    print("\n!!! КРИТИЧЕСКАЯ ОШИБКА: Словарь все еще пуст.")
else:
    print("\nСловарь успешно построен.")
    with open(VOCAB_PATH, 'w', encoding='utf-8') as f:
        json.dump(vocab, f, ensure_ascii=False, indent=4)
    print(f"Словарь сохранен в: {VOCAB_PATH}")

In [None]:
def tokenize_text_robust(text, vocab, max_len):
    # Вызываем правильную функцию
    tokens = robust_tokenizer(text)
    
    # Преобразуем токены в ID
    indexed_tokens = [vocab.get(token, vocab['<unk>']) for token in tokens]
    
    # Обрезка или дополнение (Padding)
    padding = [vocab['<pad>']] * (max_len - len(indexed_tokens))
    indexed_tokens = indexed_tokens[:max_len] + padding[:max(0, max_len - len(indexed_tokens))]
    
    return torch.tensor(indexed_tokens, dtype=torch.long)

class SoloCNNDataset(Dataset):
    def __init__(self, texts, labels, vocab, max_len):
        self.texts = texts
        self.labels = torch.tensor(labels, dtype=torch.long)
        self.vocab = vocab
        self.max_len = max_len

    def __len__(self): 
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        return {
            'input_ids': tokenize_text_robust(text, self.vocab, self.max_len),
            'labels': self.labels[idx]
        }


train_dataset = SoloCNNDataset(train_df['comment'].to_list(), train_df['label'].to_list(), vocab, MAX_SEQ_LEN)
val_dataset = SoloCNNDataset(val_df['comment'].to_list(), val_df['label'].to_list(), vocab, MAX_SEQ_LEN)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

print("DataLoader'ы успешно созданы и синхронизированы с Ячейкой 3.")

In [None]:
class SoloCNNTextClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_filters, kernel_sizes, num_classes, dropout_rate, pad_idx=0):
        super(SoloCNNTextClassifier, self).__init__()
        
        self.quant = torch.quantization.QuantStub()
        self.dequant = torch.quantization.DeQuantStub()
        
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=pad_idx)
        
        self.convs = nn.ModuleList([
            nn.Sequential(
                nn.Conv1d(in_channels=embed_dim, out_channels=num_filters, kernel_size=k),
                nn.BatchNorm1d(num_filters),
                nn.ReLU(inplace=True)
            )
            for k in kernel_sizes
        ])
        
        self.dropout = nn.Dropout(dropout_rate)
        self.fc = nn.Linear(len(kernel_sizes) * num_filters, num_classes)

    def forward(self, x):
        # 1. Мир FP32: Embedding работает с float
        embedded = self.embedding(x).permute(0, 2, 1)
        
        # 2. Пересекаем "мост" в мир INT8
        quantized_embedded = self.quant(embedded)
        
        # 3. Мир INT8: Все операции здесь будут квантованы
        # Используем квантованный тензор на входе
        conved = [conv(quantized_embedded) for conv in self.convs]
        
        pooled = [F.max_pool1d(c, c.shape[2]).squeeze(2) for c in conved]
        concatenated = self.dropout(torch.cat(pooled, dim=1))
        
        # 4. Проходим через последний квантованный слой (Linear)
        quantized_logits = self.fc(concatenated)
        
        # 5. Пересекаем "мост" обратно в мир FP32 для вывода
        output_logits = self.dequant(quantized_logits)
        
        return output_logits

model = SoloCNNTextClassifier(
    vocab_size=len(vocab), embed_dim=EMBED_DIM, num_filters=NUM_FILTERS,
    kernel_sizes=KERNEL_SIZES, num_classes=NUM_CLASSES, dropout_rate=DROPOUT_RATE
).to(DEVICE)

print(model)
print(f"\nКоличество обучаемых параметров: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")

In [None]:
print("--- Проверка одного батча из train_loader ---")
# Получаем один батч данных
sample_batch = next(iter(train_loader))
input_ids = sample_batch['input_ids']
labels = sample_batch['labels']

print(f"Размер тензора input_ids: {input_ids.shape}")
print(f"Размер тензора labels: {labels.shape}")
print(f"Примеры меток в батче: {labels[:10]}")
print(f'Соотношение классов в батче (примерно): {labels.float().mean():.2f} (0=нетокс, 1=токс)')

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss() # Оставляем без весов


scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=len(train_loader) * EPOCHS)

best_f1_score = 0.0
GRADIENT_CLIP_VALUE = 1.0

for epoch in range(1, EPOCHS + 1):
    model.train()
    total_train_loss = 0
    train_pbar = tqdm(train_loader, desc=f"Эпоха {epoch}/{EPOCHS} [Обучение]")
    for batch in train_pbar:
        input_ids, labels = batch['input_ids'].to(DEVICE), batch['labels'].to(DEVICE)
        optimizer.zero_grad()
        logits = model(input_ids)
        loss = criterion(logits, labels)
        total_train_loss += loss.item()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), GRADIENT_CLIP_VALUE)
        optimizer.step()
        
        # Шаг планировщика после каждого батча
        scheduler.step()
        
        train_pbar.set_postfix({'loss': loss.item(), 'lr': scheduler.get_last_lr()[0]})

    
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        val_pbar = tqdm(val_loader, desc=f"Эпоха {epoch}/{EPOCHS} [Валидация]")
        for batch in val_pbar:
            input_ids, labels = batch['input_ids'].to(DEVICE), batch['labels'].to(DEVICE)
            logits = model(input_ids)
            preds = torch.argmax(logits, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    report = classification_report(all_labels, all_preds, output_dict=True, zero_division=0)
    f1_toxic = report['1']['f1-score']
    print(f"\nЭпоха {epoch}: Train Loss: {total_train_loss / len(train_loader):.4f}, F1-score (Токсичный): {f1_toxic:.4f}")

    if f1_toxic > best_f1_score:
        best_f1_score = f1_toxic
        torch.save(model.state_dict(), MODEL_FP32_PATH)
        print(f"  -> Новая лучшая модель сохранена! F1-score: {best_f1_score:.4f}")

In [None]:
# Загружаем лучшую модель
model.load_state_dict(torch.load(MODEL_FP32_PATH))
model.to(DEVICE)
model.eval()

all_preds, all_labels = [], []
with torch.no_grad():
    for batch in tqdm(val_loader, desc="Финальная оценка"):
        input_ids, labels = batch['input_ids'].to(DEVICE), batch['labels'].to(DEVICE)
        logits = model(input_ids)
        preds = torch.argmax(logits, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

print("\n--- Отчет по классификации для лучшей FP32 модели ---")
print(classification_report(all_labels, all_preds, target_names=['Нетоксичный (0)', 'Токсичный (1)']))

print("\n--- Матрица ошибок ---")
# [[TN, FP],
#  [FN, TP]]
print(confusion_matrix(all_labels, all_preds))

In [None]:
import torch.quantization
from torch.ao.quantization import QConfigMapping, float_qparams_weight_only_qconfig

# 1. Загружаем FP32 модель на CPU
cpu_model = SoloCNNTextClassifier(
    vocab_size=len(vocab), embed_dim=EMBED_DIM, num_filters=NUM_FILTERS,
    kernel_sizes=KERNEL_SIZES, num_classes=NUM_CLASSES, dropout_rate=DROPOUT_RATE
)
cpu_model.load_state_dict(torch.load(MODEL_FP32_PATH, map_location="cpu"))

# 2. Переводим модель в режим инференса (ОБЯЗАТЕЛЬНО перед слиянием)
cpu_model.eval()

# Мы проходим по каждому нашему сверточному блоку и сливаем Conv, BN и ReLU в один модуль.
print("Слияние модулей (fusing)...")
for conv_module in cpu_model.convs:
    # Имена '0', '1', '2' соответствуют порядку слоев в nn.Sequential
    torch.quantization.fuse_modules(conv_module, ['0', '1', '2'], inplace=True)
print("Слияние завершено.")

# 3. Настраиваем конфигурацию квантизации (как и раньше)
cpu_model.qconfig = torch.quantization.get_default_qconfig('fbgemm')
cpu_model.embedding.qconfig = torch.quantization.float_qparams_weight_only_qconfig

# 4. Готовим модель к калибровке. Теперь она увидит слитые модули.
torch.quantization.prepare(cpu_model, inplace=True)

# 5. Калибровка (остается без изменений)
print("Калибровка модели...")
with torch.no_grad():
    train_loader_iter = iter(train_loader)
    for i in range(10): 
        try:
            batch = next(train_loader_iter)
            cpu_model(batch['input_ids'])
        except StopIteration:
            break

# 6. Конвертация (остается без изменений)
torch.quantization.convert(cpu_model, inplace=True)
print("Модель успешно квантована!")

# 7. Сохраняем (остается без изменений)
scripted_quantized_model = torch.jit.script(cpu_model)
torch.jit.save(scripted_quantized_model, MODEL_INT8_PATH)
print(f"Квантованная INT8 модель сохранена в: {MODEL_INT8_PATH}")

In [None]:
# Загружаем артефакты (модель и словарь)
quantized_model = torch.jit.load(MODEL_INT8_PATH)
quantized_model.eval()

with open(VOCAB_PATH, 'r', encoding='utf-8') as f:
    loaded_vocab = json.load(f)

# Создаем функцию-пайплайн для предсказания
def predict_toxicity(text, model, vocab, max_len=128, threshold=0.5):
    input_ids = tokenize_text_robust(text, vocab, max_len).unsqueeze(0)
    
    with torch.no_grad():
        logits = model(input_ids)
        probabilities = F.softmax(logits, dim=1).squeeze()
    
    score_toxic = probabilities[1].item()
    is_toxic = score_toxic > threshold
    
    return {
        "text": text,
        "is_toxic": is_toxic,
        "toxic_score": f"{score_toxic:.4f}"
    }

# Тестовые предложения
test_sentences = [
    "какой прекрасный сегодня день, желаю всем счастья!",
    "ты ведешь себя как полный дурак и придурок",
    "Отличная работа, команда!",
    "автор, ты неправ и пишешь какую-то чушь",
    "Это просто худший сервис из всех, что я пробовал."
]

print("\n--- Тест инференса на новых данных ---")
for sentence in test_sentences:
    prediction = predict_toxicity(sentence, quantized_model, loaded_vocab)
    print(prediction)