In [1]:
import torch
import os
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from datasets import Dataset
from transformers import DataCollatorForLanguageModeling, TrainingArguments
from trl import SFTTrainer
import pandas as pd

In [2]:
from dotenv import load_dotenv

load_dotenv()

model_id = "google/gemma-2-2b"
data_dir = "./dataset/csv"
output_dir = './results2'
token = os.environ.get('HUGGINGFACE_TOKEN')

In [3]:
# 모델 및 토크나이저 로드
# bnb_config = BitsAndBytesConfig(
#     load_in_4bit=True,
#     bnb_4bit_quant_type="nf4",
#     bnb_4bit_compute_dtype=torch.bfloat16
# )

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_id, token=token)
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    device_map={"":0},
    token=token
)

In [None]:
# 데이터셋 로드 및 전처리
def load_qna_files(data_dir):
    files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.csv')]
    data = []
    for file in files:
        dataset = pd.read_csv(file)
        print(f'Sample data of {file}')
        print(dataset.head(5))
        for index, row in dataset.iterrows():
            data.append(f"Question: {row['Question']}\nAnswer: {row['Answer']}")
    return data

data = load_qna_files(data_dir)
print(f'Sample of refined data')
print(data[:30])

In [6]:
def preprocess_data(texts, tokenizer, max_length=512):
    encodings = tokenizer(texts, truncation=True, padding='max_length', max_length=max_length, return_tensors="pt")
    return encodings

# 전처리된 데이터셋
encodings = preprocess_data(data, tokenizer)

In [None]:
# Hugging Face Dataset 객체로 변환
dataset = Dataset.from_dict({"input_ids": encodings["input_ids"], "attention_mask": encodings["attention_mask"]})

# 데이터셋 샘플 출력
print(f"Dataset size: {len(dataset)}")
print("Sample dataset entry:", dataset[0])

In [None]:
# Sample text data to demonstrate tokenization before and after
sample_texts = ["Question: What is a logistic regression model?\nAnswer: It’s a basic machine learning model for classification."]

# Tokenization before and after comparison
for text in sample_texts:
    print(f"Original text: {text}")
    tokens = tokenizer.tokenize(text)
    print(f"Tokenized: {tokens}")

In [9]:
# 데이터 콜레이터 설정
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)

In [None]:
# Fine-tuning을 여러 번 가능하게 하는 함수
def fine_tune_model(model, dataset, tokenizer, output_dir="./results", epochs=3):
    training_args = TrainingArguments(
        output_dir=output_dir,
        overwrite_output_dir=True,
        num_train_epochs=epochs,
        per_device_train_batch_size=1,
        save_steps=500,
        save_total_limit=2,
        logging_steps=100,
        learning_rate=5e-5,
        # fp16=True,  # GPU의 성능을 최대한 활용하기 위한 설정
    )
    
    trainer = SFTTrainer(
        model=model,
        args=training_args,
        train_dataset=dataset,
        data_collator=data_collator
    )
    
    trainer.train()
    
    # 모델 저장
    model.save_pretrained(output_dir)
    tokenizer.save_pretrained(output_dir)
    print(f"Model saved to {output_dir}")

# Fine-tuning 실행
# fine_tune_model(model, dataset, tokenizer, output_dir)
fine_tune_model(model, dataset, tokenizer, output_dir, epochs=3)

In [11]:
def query_model(question, model, tokenizer, max_length=100):
    # Tokenize the input question
    inputs = tokenizer(question, return_tensors="pt").to(model.device)
    
    # Generate the answer with better control over generation parameters
    output_tokens = model.generate(
        **inputs, 
        max_length=max_length, 
        num_return_sequences=1, 
        no_repeat_ngram_size=2,  # Prevent token repetition
        do_sample=True,  # Enable sampling for varied answers
        top_p=0.95,  # Use nucleus sampling
        temperature=0.7  # Add randomness for more natural output
    )
    
    # Decode the generated tokens
    answer = tokenizer.decode(output_tokens[0], skip_special_tokens=True)
    
    return answer

In [None]:
question = "What is one benefit of using cloud data for model maintenance??"
# sample_prompt = f"Question: {question}\nAnswer: "
# response = query_model(sample_prompt, model, tokenizer)
response = query_model(question, model, tokenizer)
print(f"{response}")

In [None]:
question = "Why do we use CNN?"
response = query_model(question, model, tokenizer)
print(f"{response}")