# Обучение определения фейковых фактов о COVID и вакцинации

In [1]:
import math

import torch
import pandas as pd
import numpy as np

In [2]:
device = 'cpu'
if torch.backends.mps.is_available():
    device = 'mps'
if torch.cuda.is_available():
    device = 'cuda'

In [3]:
from pathlib import Path

DATA_PATH = Path('data/')
DATA_PATH.mkdir(parents=True, exist_ok=True)

DATA_CACHE = Path('data/cache_dir/')
DATA_CACHE.mkdir(parents=True, exist_ok=True)

DATA_PATH_SAVE_MODELS = Path('data/models/')
DATA_PATH_SAVE_MODELS.mkdir(parents=True, exist_ok=True)

DATA_SYNTHETIC = Path('synthetic/')
DATA_SYNTHETIC.mkdir(parents=True, exist_ok=True)

pd.set_option('display.max_colwidth', 500) 

In [4]:
MODEL_NAME = "Falah/News_Detection"
TRAIN_DF_NAME = "covid_vaccine_fake_clear.xlsx"
MAX_LENGTH = 128
BATCH_SIZE = 128

# Датасет

In [None]:
data_df = pd.read_excel(DATA_PATH / TRAIN_DF_NAME)
data_df.head(1)

In [6]:
data_df.rename(columns={'is_fake': 'label'}, inplace=True)
data_df = data_df.fillna("")

for col in data_df.select_dtypes(include=["object", "bool"]).columns:
    data_df[col] = data_df[col].astype(str)

In [7]:
# label_id2idx = {int(key): int(idx) for idx, key in enumerate(data_df['label_id'].unique())}

# idx2label_id = dict([(v, k) for k, v in label_id2idx.items()])

# idx2label = {k: df_messages[df_messages['label_id'] == v]['label'].iloc[0] for k, v in idx2label_id.items()}

# label2idx = dict([(v, k) for k, v in idx2label.items()])

# df_messages['label_idx'] = df_messages['label_id'].apply(lambda x: label_id2idx[x])

# df_messages.head(1)

In [None]:
NUM_CLASSES = 2
NUM_CLASSES

In [None]:
from sklearn.model_selection import train_test_split

train_val_df, test_df = train_test_split(data_df, test_size=0.1, stratify=data_df["label"], random_state=42, shuffle=True)
train_df, val_df = train_test_split(train_val_df, test_size=0.1, stratify=train_val_df["label"], random_state=42, shuffle=True)

print(f"Размер тренировочного набора: {len(train_df)}")
print(f"Размер валидационного набора: {len(val_df)}")
print(f"Размер тестового набора: {len(test_df)}")


In [None]:
train_df.info()

In [11]:
from datasets import Dataset

train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)
test_dataset = Dataset.from_pandas(test_df)

# Модель

## Пример использования исходной модели

In [None]:
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, cache_dir=DATA_CACHE)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, cache_dir=DATA_CACHE)

classifier = pipeline("text-classification", model=model, tokenizer=tokenizer)


In [None]:
from transformers import  AutoConfig

config = AutoConfig.from_pretrained(MODEL_NAME)

print("Количество меток:", config.num_labels)
print("Имена меток:", config.id2label if hasattr(config, "id2label") else "Метки не указаны")


In [None]:

text_real = '''
A fire engulfed a building in the Turkish ski resort of Kartalkaya in Bolu on Tuesday night. The kitchen staff tried to put out the fire for about 40 minutes. 
The flame entered the chimney through the hood, and soon reached the roof of the hotel. The kitchen staff ran out of the hotel, and the guests could not find out about the fire in time because the fire alarm went off.
'''
text_fake = "Nuclear winter is near, the dogs have declared war on the cats and invaded their state."

result_real = classifier(text_real)
result_fake = classifier(text_fake)

# Выводим результаты
print("Результат для реального текста:", result_real)
print("Результат для фейкового текста:", result_fake)


## Дообучение модели

In [15]:
from typing import Dict, Any

import matplotlib.pyplot as plt
from transformers import (
    RobertaTokenizer,
    RobertaForSequenceClassification,
    Trainer,
    TrainingArguments,
    DataCollatorWithPadding,
)
from sklearn.metrics import accuracy_score, f1_score
from datasets import DatasetDict, Dataset


class VaccineFakeClassifierTrainer:
    def __init__(
        self,
        model_name: str,
        num_labels: int,
        train_dataset: Dataset,
        val_dataset: Dataset,
        test_dataset: Dataset,
        cache_dir=DATA_CACHE,
        output_dir=DATA_PATH_SAVE_MODELS,
        **kwargs
    ):
        self.tokenizer: RobertaTokenizer = RobertaTokenizer.from_pretrained(model_name, cache_dir=cache_dir)
        self.model = RobertaForSequenceClassification.from_pretrained(
            model_name, num_labels=num_labels, cache_dir=cache_dir)
        
        self.train_dataset = train_dataset
        self.val_dataset = val_dataset
        self.test_dataset = test_dataset
        
        for dataset, name in [
            (train_dataset, "train_dataset"),
            (val_dataset, "val_dataset"),
            (test_dataset, "test_dataset"),
        ]:
            if "text" not in dataset.column_names or "label" not in dataset.column_names:
                raise ValueError(f"{name} должен содержать колонки 'text' и 'label'.")
        
        self.output_dir = output_dir
        self.kwargs = kwargs
        
        self.tokenized_datasets = self._tokenize_datasets()
        self.training_args = self._set_training_args()
        self.trainer = self._initialize_trainer()

    def _tokenize_datasets(self):
        max_length = self.kwargs.get("max_length", 64)
        def preprocess_function(
            examples: Dict[str, Any]
        ) -> Dict[str, Any]:
            return self.tokenizer(
                examples["text"],
                max_length=max_length,
                padding="max_length",
                truncation=True,
            )

        tokenized_datasets = self.train_dataset.map(preprocess_function, batched=True)
        val_tokenized = self.val_dataset.map(preprocess_function, batched=True)
        test_tokenized = self.test_dataset.map(preprocess_function, batched=True)

        return DatasetDict({
            "train": tokenized_datasets,
            "validation": val_tokenized,
            "test": test_tokenized,
        })

    def _set_training_args(self):
        return TrainingArguments(
            output_dir=self.output_dir,
            eval_strategy="epoch",
            save_strategy="epoch",
            learning_rate=self.kwargs.get("learning_rate", 2e-5),
            per_device_train_batch_size=self.kwargs.get("per_device_train_batch_size", 16),
            per_device_eval_batch_size=self.kwargs.get("per_device_eval_batch_size", 16),
            num_train_epochs=self.kwargs.get("num_train_epochs", 5),
            weight_decay=self.kwargs.get("weight_decay", 0.01),
            save_total_limit=self.kwargs.get("save_total_limit", 2),
            load_best_model_at_end=True,
            metric_for_best_model=self.kwargs.get("metric_for_best_model", "accuracy"),
            logging_dir=self.kwargs.get("logging_dir", "./logs"),
            logging_steps=self.kwargs.get("logging_steps", 10),
        )

    def _compute_metrics(self, eval_pred) -> Dict[str, float]:
        logits, labels = eval_pred
        predictions = logits.argmax(axis=-1)
        return {
            "accuracy": accuracy_score(labels, predictions),
            "f1": f1_score(labels, predictions, average="weighted"),
        }

    def _initialize_trainer(self) -> Trainer:
        data_collator = DataCollatorWithPadding(tokenizer=self.tokenizer)
        return Trainer(
            model=self.model,
            args=self.training_args,
            train_dataset=self.tokenized_datasets["train"],
            eval_dataset=self.tokenized_datasets["validation"],
            tokenizer=self.tokenizer,
            data_collator=data_collator,
            compute_metrics=self._compute_metrics,
        )

    def train(self) -> None:
        self.trainer.train()
        self.model.save_pretrained(self.output_dir / 'vaccine_fake_model')
        self.tokenizer.save_pretrained(self.output_dir / 'vaccine_fake_model')

    def evaluate(self) -> Dict[str, float]:
        return self.trainer.evaluate(self.tokenized_datasets["test"])

    def visualize_metrics(self, logs: Dict[str, Any]) -> None:

        # Визуализация метрик
        epochs = list(range(1, len(logs["train_loss"]) + 1))
        plt.figure(figsize=(10, 5))

        plt.subplot(1, 2, 1)
        plt.plot(epochs, logs["train_loss"], label="Train Loss")
        plt.plot(epochs, logs["val_loss"], label="Validation Loss")
        plt.xlabel("Epochs")
        plt.ylabel("Loss")
        plt.legend()
        plt.title("Loss Over Epochs")

        plt.subplot(1, 2, 2)
        plt.plot(epochs, logs["accuracy"], label="Accuracy")
        plt.xlabel("Epochs")
        plt.ylabel("Accuracy")
        plt.legend()
        plt.title("Accuracy Over Epochs")

        plt.tight_layout()
        plt.show()


# Обучение

In [16]:
training_params = {
    # "evaluation_strategy": "epoch",               # Оценка после каждой эпохи
    "learning_rate": 5e-5,                        # Скорость обучения
    "per_device_train_batch_size": BATCH_SIZE,             # Размер батча для тренировки
    "per_device_eval_batch_size": BATCH_SIZE,              # Размер батча для валидации
    "num_train_epochs": 1,                        # Количество эпох обучения
    "weight_decay": 0.01,                         # Коэффициент регуляризации             # Загружать лучшую модель в конце
    "logging_dir": DATA_PATH / "logs",            # Папка для логов
    "max_length": MAX_LENGTH,                            # Максимальная длина токенизированного текста
}


In [None]:
trainer = VaccineFakeClassifierTrainer(
    model_name="roberta-base",
    num_labels=NUM_CLASSES,
    train_dataset=train_dataset,
    val_dataset=val_dataset,
    test_dataset=test_dataset,
    cache_dir=DATA_CACHE,
    output_dir=DATA_PATH_SAVE_MODELS,
    **training_params,
)

In [18]:
trainer.train()

In [None]:
results = trainer.evaluate()
print("Результаты на тестовом наборе:", results)

In [None]:
# logs = trainer.trainer.state.log_history

# train_loss = [log["loss"] for log in logs if "loss" in log]
# val_loss = [log["eval_loss"] for log in logs if "eval_loss" in log]
# accuracy = [log["eval_accuracy"] for log in logs if "eval_accuracy" in log]

# logs_to_visualize = {
#     "train_loss": train_loss,
#     "val_loss": val_loss,
#     "accuracy": accuracy,
# }

# trainer.visualize_metrics(logs_to_visualize)


# Тестирование

In [20]:
from typing import Dict, Tuple
import pandas as pd
from transformers import RobertaTokenizer, RobertaForSequenceClassification


class VaccineFakeClassifier:
    def __init__(self, model_path: str, idx2label: Dict[int, str]):
        """
        Инициализация модели и токенайзера для тестирования.
        
        :param model_path: Путь к дообученной модели.
        :param idx2label: Словарь для сопоставления индексов классов с текстовыми метками.
        """
        self.tokenizer: RobertaTokenizer = RobertaTokenizer.from_pretrained(model_path)
        self.model: RobertaForSequenceClassification = RobertaForSequenceClassification.from_pretrained(model_path)
        self.idx2label = idx2label
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(self.device)

    def predict_class(self, text: str) -> Tuple[str, float]:
        """
        Определение класса текста и его вероятности.
        
        :param text: Входной текст.
        :return: Текстовая метка класса и вероятность.
        """
        # Токенизация текста
        inputs = self.tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
        # Перенос тензоров на устройство модели
        inputs = {key: val.to(self.device) for key, val in inputs.items()}
        
        # Предсказание
        self.model.eval()
        with torch.no_grad():
            outputs = self.model(**inputs)
            logits = outputs.logits
            predicted_idx = logits.argmax(axis=-1).item()
            score = logits.softmax(dim=-1).max().item()  # Вероятность предсказанного класса
        
        return self.idx2label.get(predicted_idx, "Unknown"), score


    def list_batch_predict(self, texts: list) -> pd.DataFrame:
        """
        Определение классов для списка текстов и вероятностей.

        :param texts: pandas Series с текстами.
        :return: DataFrame с предсказанными метками и вероятностями.
        """
        # Токенизация текстов
        inputs = self.tokenizer(list(texts), return_tensors="pt", truncation=True, padding=True, max_length=128)
        
        # Перенос тензоров на устройство модели
        inputs = {key: val.to(self.device) for key, val in inputs.items()}
        
        # Предсказание
        self.model.eval()
        with torch.no_grad():
            outputs = self.model(**inputs)
            logits = outputs.logits
            predicted_indices = logits.argmax(axis=-1).tolist()
            scores = logits.softmax(dim=-1).max(dim=-1).values.tolist()  # Вероятности

        # Формирование DataFrame с результатами
        return pd.DataFrame({
            "predicted_label": [self.idx2label.get(idx, "Unknown") for idx in predicted_indices],
            "score": scores
        })


    def batch_predict(self, texts: pd.Series, batch_size: int = 16) -> pd.DataFrame:
        """
        Определение классов для списка текстов и вероятностей с учётом размера батча.
        
        :param texts: pandas Series с текстами.
        :param batch_size: Размер батча для обработки.
        :return: DataFrame с предсказанными метками и вероятностями.
        """
        self.model.eval()
        predictions = []
        scores = []

        for i in range(0, len(texts), batch_size):
            batch_texts = texts[i:i + batch_size].tolist()
            inputs = self.tokenizer(batch_texts, return_tensors="pt", truncation=True, padding=True, max_length=128)
            inputs = {key: val.to(self.device) for key, val in inputs.items()}

            with torch.no_grad():
                outputs = self.model(**inputs)
                logits = outputs.logits
                batch_predictions = logits.argmax(axis=-1).tolist()
                batch_scores = logits.softmax(dim=-1).max(dim=-1).values.tolist()

                predictions.extend(batch_predictions)
                scores.extend(batch_scores)

        return pd.DataFrame({
            "predicted_label": [self.idx2label.get(idx, "Unknown") for idx in predictions],
            "score": scores
        })

    def test_model(self, test_df: pd.DataFrame, batch_size: int = 16) -> Tuple[Dict[str, float], pd.DataFrame]:
        """
        Тестирование модели на тестовом DataFrame с учётом батчей.
        
        :param test_df: DataFrame с колонками 'text' и 'label'.
        :param batch_size: Размер батча для обработки.
        :return: Метрики точности и F1-меры, DataFrame с результатами.
        """
        from sklearn.metrics import accuracy_score, f1_score

        # Проверяем наличие необходимых колонок
        if "text" not in test_df.columns or "label" not in test_df.columns:
            raise ValueError("DataFrame должен содержать колонки 'text' и 'label'.")

        # Предсказания
        predictions_df = self.batch_predict(test_df["text"], batch_size=batch_size)

        # Добавляем истинные метки и правильность предсказаний
        results_df = test_df.copy()
        results_df["predicted_label"] = predictions_df["predicted_label"]
        results_df["score"] = predictions_df["score"]
        results_df["correct"] = results_df["label"] == results_df["predicted_label"].map({v: k for k, v in self.idx2label.items()})

        # Вычисление метрик
        accuracy = accuracy_score(results_df["label"], results_df["predicted_label"].map({v: k for k, v in self.idx2label.items()}))
        f1 = f1_score(results_df["label"], results_df["predicted_label"].map({v: k for k, v in self.idx2label.items()}), average="weighted")

        return {"accuracy": accuracy, "f1": f1, "results_df": results_df}

In [None]:
model_path = DATA_PATH_SAVE_MODELS / "vaccine_fake_model"

idx2label = {
    0: "Real",
    1: "Fake"
}

classifier = VaccineFakeClassifier(model_path=model_path, idx2label=idx2label)

In [None]:
text = "About one in three adults used household cleaners and disinfectants unsafely to prevent such as using bleach on food products and improperly using household cleaners and disinfectants on hands or skin. Read more in."
predicted_class = classifier.predict_class(text)
print(f"Класс текста: {predicted_class}")


In [None]:
# Определение классов для нескольких текстов
texts = [
    "About one in three adults used household cleaners and disinfectants unsafely to prevent such as using bleach on food products and improperly using household cleaners and disinfectants on hands or skin. Read more in",
    "Drinking boiled hikaw-hikaw after doing suob/tuob or steam inhalation therapy will cure anyone with COVID-19",
    "2nd week of Jan 🇨🇳 had mapped the genome &amp; shared it with WHO &amp; with wider 🌍. We rapidly published a “how to” on building a PCR test for from our partner lab in 🇩🇪. In the 3rd week WHO identified &amp; began contracting for validated production of these tests-"
]
predicted_classes = classifier.list_batch_predict(texts)
print(f"Классы текстов: {predicted_classes}")


In [None]:
test_results = classifier.test_model(test_df)

print("Метрики:", test_results['accuracy'],  test_results['f1'])


In [None]:
test_results['results_df'].sample(10)