In [None]:
import pandas as pd
import numpy as np
import evaluate
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, AutoConfig
import time
import os
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
from datasets import Dataset, DatasetDict
import torch

In [None]:
import yaml
import logging
from datetime import datetime

# YAML config
try:
    with open(r".\config.yaml", "r") as f:
        config = yaml.safe_load(f)
except Exception as e:
    raise

# Logger
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(name)s - %(funcName)s - %(message)s",
    filename=config["log_dir"] +
    f"{datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.log",
    filemode="w"
)
logger = logging.getLogger(__name__)

logger.info("Config file and logger setup completed.")

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
try:
    df = pd.read_excel(config["data_path"], index_col=0)
except FileNotFoundError:
    logger.error("File not found.  Ensure it is in the correct location.")
    raise
except Exception as e:
    logger.error(f"Error reading Excel file: {e}")
    raise

In [None]:
# Data analysis
null_count = df.isnull().sum()
logger.info(f"Null count: {null_count}")

df_copy = df.copy().dropna()

In [None]:
# Plot class distribution
df_copy["label"].value_counts(ascending=True).plot.bar(color=["green", "blue"])
plt.title("Label Distribution")
plt.show()

In [None]:
# Calculate average tokens per word
np.mean(df_copy["title"].str.split().apply(len)*1.5)

df_copy["title_tokens"] = df_copy["title"].str.split().apply(len)*1.5
df_copy["text_tokens"] = df_copy["text"].str.split().apply(len)*1.5

fig, ax = plt.subplots(1, 2, figsize=(15, 5))

ax[0].hist(df_copy["title_tokens"], bins=50, color="green")
ax[1].hist(df_copy["text_tokens"], bins=50, color="blue")

In [None]:
# Data Process
train, test = train_test_split(
    df_copy, test_size=0.3, stratify=df_copy["label"])
test, validation = train_test_split(
    test, test_size=1/3, stratify=test["label"])

In [None]:
# Creating dataset
dataset = DatasetDict(
    {
        "train": Dataset.from_pandas(train, preserve_index=False),
        "test": Dataset.from_pandas(test, preserve_index=False),
        "validation": Dataset.from_pandas(validation, preserve_index=False)
    }
)

In [None]:
accuracy = evaluate.load("accuracy")


def compute_metrics_evaluate(eval_pred):
    """Evaluate metrics"""
    try:
        logits, labels = eval_pred
        predictions = np.argmax(logits, axis=-1)
        return accuracy.compute(predictions=predictions, references=labels)
    except Exception as e:
        logger.error(f"Error while evaluate: {e}")


def compute_metrics(pred):
    """
    Return accuracy and F1.
    """
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)

    f1 = f1_score(labels, preds, average="weighted")
    acc = accuracy_score(labels, preds)

    return {"accuracy": acc, "f1": f1}

In [None]:
def train_model(model_ckpt):
    """"Training the model after tokenizing the batched dataset.
    Args:
        model_ckpt: Model Checkpoint
    """
    try:
        tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
        config = AutoConfig.from_pretrained(
            model_ckpt, label2id=config["label2id"], id2label=config["id2label"])
        model = AutoModelForSequenceClassification.from_pretrained(
            model_ckpt, config=config).to(device)
    except Exception as e:
        logger.error(f"Error while initializing model {e}")

    def local_tokenizer(batch):
        """Tokenize the given batch"""
        try:
            temp = tokenizer(batch["title"], padding=True, truncation=True)
            return temp
        except Exception as e:
            logger.error(f"Error while tokenize: {e}")

    encoded_dataset = dataset.map(
        local_tokenizer, batched=True, batch_size=None)

    training_args = TrainingArguments(
        output_dir=config["output_path"],
        overwrite_output_dir=True,
        #   num_train_epochs = 2,
        max_steps=50,
        learning_rate=2e-5,
        per_device_train_batch_size=config["batch_size"],
        per_device_eval_batch_size=config["batch_size"],
        weight_decay=0.01,
        eval_strategy="epoch",
        report_to="none")

    trainer = Trainer(
        model=model,
        compute_metrics=compute_metrics,
        train_dataset=encoded_dataset["train"],
        eval_dataset=encoded_dataset["validation"],
        tokenizer=tokenizer,
        args=training_args
    )
    try:
        trainer.train()
    except Exception as e:
        logger.error(f"Error while training {e}")

    preds = trainer.predict(encoded_dataset["test"])

    return preds.metrics

In [None]:
train_model(config["model_checkpoint"])