In [None]:
import pandas as pd
import torch
from accelerate import Accelerator
from datasets import Dataset
from peft import LoraConfig, PeftModel, get_peft_model
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    DataCollatorForLanguageModeling,
    Trainer,
    TrainingArguments,
)

from .utils._logger import logger
from .utils._validation import config_args

In [None]:
# Training configuration
args = TrainingArguments(
    output_dir=config_args.output_dir,
    per_device_train_batch_size=2,
    gradient_accumulation_steps=1,
    max_steps=100,
    learning_rate=2.5e-5,
    optim="paged_adamw_8bit",
    logging_steps=25,
    logging_dir="training-logs",
    save_strategy="steps",
    save_steps=25,
    eval_strategy="steps",
    eval_steps=25,
    do_eval=True,
    report_to="none",
)

In [None]:
# Data Loading and Preprocessing
from pandas import DataFrame


def load_and_preprocess_data(csv_path: str) -> DataFrame:
    """Loads the CSV data, preprocesses the "category" column, and creates datasets."""
    try:
        df = pd.read_csv(
            csv_path, usecols=["category", "about_product", "product_name"]
        )
    except FileNotFoundError:
        logger.error(f"File not found: {csv_path}")
        raise

    df["category"] = df["category"].apply(lambda x: x.split("|")[-1])
    products = df[["category", "product_name"]]
    description = df[["category", "about_product"]]

    products = products.rename(columns={"product_name": "text"})
    description = description.rename(columns={"about_product": "text"})

    products["task_type"] = "Product Name"
    description["task_type"] = "Product Description"

    df = pd.concat([products, description], ignore_index=True)
    return df

In [None]:
# Dataset Creation
def create_datasets(df):
    """Creates datasets for training and testing."""
    dataset = Dataset.from_pandas(df)
    dataset = dataset.shuffle(seed=0)
    dataset = dataset.train_test_split(test_size=0.1)
    return dataset

In [None]:
# Formatting
def formatting_func(example) -> str:
    """Formats the given text to proper form."""
    text = f"""
            Given the product category, you need to generate a "{example["task_type"]}".
            ### Category: {example["category"]}\n ### {example["task_type"]}: {example["text"]}

            """
    return text

In [None]:
# Loading model
def load_model(base_model):
    """Loads the model."""
    try:
        model = AutoModelForCausalLM.from_pretrained(
            base_model,
            trust_remote_code=True,
            torch_dtype=torch.float16,
            load_in_8bit=True,
        )
        return model
    except Exception as e:
        logger.error(f"Error loading model: {e}")

In [None]:
# Loading tokenizer
def load_tokenizer(base_model):
    """Loads the tokenizer."""
    try:
        tokenizer = AutoTokenizer.from_pretrained(
            base_model,
            padding_size="left",
            add_eos_token=True,
            add_bos_token=True,
            use_fast=False,
        )
        tokenizer.pad_token = tokenizer.eos_token
        return tokenizer
    except Exception as e:
        logger.error(f"Error loading tokenizer: {e}")

In [None]:
model, tokenizer = (
    load_model(config_args.base_model),
    load_tokenizer(config_args.base_model),
)

In [None]:
# Tokenize function
def tokenize(prompt):
    """Tokenize the given prompt with declared format."""
    result = tokenizer(
        formatting_func(prompt),
        truncation=True,
        max_length=config_args.max_length,
        padding="max_length",
    )

    result["labels"] = result["input_ids"].copy()

    return result

In [None]:
dataset = create_datasets(load_and_preprocess_data(config_args.data_path)).map(tokenize)

In [None]:
# LoRA Configuration
def configure_lora(model, target_modules):
    """Configures LoRA for fine-tuning."""
    config = LoraConfig(
        r=32,
        lora_alpha=64,
        target_modules=target_modules,
        bias="none",
        lora_dropout=0.05,
        task_type="CAUSAL_LM",
    )
    model = get_peft_model(model, config)
    return model

In [None]:
# Trainable parameters
def print_trainable_parameters(model):
    """
    Prints the number of trainable parameters in the model.
    """
    trainable_params = 0
    all_param = 0
    for _, param in model.named_parameters():
        all_param += param.numel()
        if param.requires_grad:
            trainable_params += param.numel()
    print(
        f"trainable params: {trainable_params} || all params: {all_param} || trainable%: {100 * trainable_params / all_param}"
    )

In [None]:
print_trainable_parameters(configure_lora(model, config_args.target_modules))

In [None]:
# Training Function
def train_model(model, tokenizer, train_dataset, eval_dataset, args):
    """Fine-tunes the model on the training dataset."""
    model.eval()

    accelerator = Accelerator(gradient_accumulation_steps=1)
    model = accelerator.prepare_model(model)

    data_collator = DataCollatorForLanguageModeling(tokenizer, mlm=False)

    trainer = Trainer(
        model=model,
        args=args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        data_collator=data_collator,
    )

    model.config.use_cache = False
    trainer.train()
    return model

In [None]:
model = configure_lora(model, config_args.target_modules)

In [None]:
model = train_model(model, tokenizer, dataset["train"], dataset["test"], args)

In [None]:
# Inference
def generate_text(model, tokenizer, prompt, max_new_tokens, repetition_penalty):
    """Generates text using the fine-tuned model."""
    model.eval()
    inputs = tokenizer(prompt, return_tensors="pt")
    inputs.to("cuda")

    with torch.no_grad():
        output = model.generate(
            **inputs,
            max_new_tokens=max_new_tokens,
            repetition_penalty=repetition_penalty,
        )
        result = tokenizer.decode(output[0], skip_special_tokens=True)
        return result

In [None]:
trained_model = PeftModel.from_pretrained(
    load_model(config_args.base_model), config_args.output_dir + "checkpoint_folder"
)

In [None]:
prompt = """ 
prompt 
"""

In [None]:
print(
    generate_text(
        trained_model,
        tokenizer,
        prompt,
        config_args.max_new_tokens,
        config_args.repetition_penalty,
    )
)