In [None]:
from huggingface_hub import login
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import pandas as pd
from datasets import Dataset
from transformers import TrainingArguments, Trainer, DataCollatorWithPadding
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training, PeftConfig, PeftModel

In [None]:
from huggingface_hub import login
import os
hf_token = "YOUR_HF_TOKEN"
login(token=hf_token)

In [None]:
from safetensors.torch import load_file  # safetensors 라이브러리에서 load_file 임포트
# 원래 모델 로드
original_model_path = "google/gemma-2-2b-it"  # 원래 모델의 경로
tokenizer = AutoTokenizer.from_pretrained(original_model_path)
model = AutoModelForCausalLM.from_pretrained(
    original_model_path,
    device_map="auto",
    torch_dtype=torch.float16 if torch.backends.mps.is_available() else torch.float32
)

fine_tuned_tokenizer_path = "./results/final_model"  # 파인튜닝된 토크나이저 파일 경로
tokenizer = AutoTokenizer.from_pretrained(fine_tuned_tokenizer_path)

# 이미 파인튜닝된 가중치 로드
final_model_weights_path = "./results/final_model/adapter_model.safetensors"  # 파인튜닝된 가중치 파일 경로
weights = load_file(final_model_weights_path)  # safetensors 파일 로드
# LoRA 가중치 적용
peft_config_path = "./results/final_model"  # adapter_config.json 파일이 있는 경로
peft_config = PeftConfig.from_pretrained(peft_config_path)
model = PeftModel(model, peft_config)

# 모델의 상태 사전에서 'lora_A.weight'와 'lora_B.weight' 경로를 'lora_A.default.weight'와 'lora_B.default.weight'로 수정
new_weights = {}
for key, value in weights.items():
    if 'lora_A.weight' in key:
        new_key = key.replace('lora_A.weight', 'lora_A.default.weight')
    elif 'lora_B.weight' in key:
        new_key = key.replace('lora_B.weight', 'lora_B.default.weight')
    else:
        new_key = key
    new_weights[new_key] = value

missing_keys, unexpected_keys = model.load_state_dict(new_weights, strict=False)

# 로드된 가중치 정보 출력 (디버깅용)
print(f"Missing keys: {missing_keys}")
print(f"Unexpected keys: {unexpected_keys}")

In [13]:

# 1. CSV 파일 읽어오기 (칼 융 스타일의 데이터)
df = pd.read_csv('carl_dataset.csv')

# 2. 챗봇의 답변이 비어있는 행 제거
df = df.dropna(subset=['챗봇'])

# 3. 데이터 토크나이즈하기
def tokenize_row(row):
    if pd.notna(row['유저']) and pd.notna(row['챗봇']):
        inputs = tokenizer(row['유저'], padding='max_length', truncation=True, max_length=128, return_tensors="pt")
        labels = tokenizer(row['챗봇'], padding='max_length', truncation=True, max_length=128, return_tensors="pt")
        return pd.Series([inputs['input_ids'][0].tolist(), labels['input_ids'][0].tolist(), inputs['attention_mask'][0].tolist()])
    return pd.Series([None, None, None])


In [29]:
# 1. CSV 파일 읽어오기 (칼 융 스타일의 데이터)
df = pd.read_csv('carl_dataset.csv')

# 2. 챗봇의 답변이 비어있는 행 제거
df = df.dropna(subset=['챗봇'])

# 3. 데이터 토크나이즈하기
def tokenize_row(row):
    if pd.notna(row['유저']) and pd.notna(row['챗봇']):
        inputs = tokenizer(row['유저'], padding='max_length', truncation=True, max_length=128, return_tensors="pt")
        labels = tokenizer(row['챗봇'], padding='max_length', truncation=True, max_length=128, return_tensors="pt")
        return pd.Series([inputs['input_ids'][0].tolist(), labels['input_ids'][0].tolist(), inputs['attention_mask'][0].tolist()])
    return pd.Series([None, None, None])
df[['input_ids', 'labels', 'attention_mask']] = df.apply(tokenize_row, axis=1)

# 4. tokenized_dataset 생성
tokenized_dataset = df[['input_ids', 'labels', 'attention_mask']].dropna().reset_index(drop=True)
dataset = Dataset.from_pandas(tokenized_dataset)

In [30]:
# 훈련 데이터셋과 평가 데이터셋 분리
train_size = int(0.8 * len(dataset))
train_dataset = dataset.select(range(train_size))
eval_dataset = dataset.select(range(train_size, len(dataset)))

In [33]:
# LoRA 설정
lora_config = LoraConfig(
    r=16,
    lora_alpha=32,
    lora_dropout=0.1,
    bias="none",
)
# 모델을 LoRA로 준비
tmodel = prepare_model_for_kbit_training(model)
tmodel = get_peft_model(model, lora_config)

In [None]:
# 모델을 올바른 장치로 이동
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
tmodel.to(device)

In [35]:
# 평가 메트릭 계산 함수
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = torch.argmax(logits, dim=-1)
    loss = torch.nn.functional.mse_loss(predictions.float(), labels.float())
    return {"eval_loss": loss.item()}

In [47]:
# TrainingArguments 설정
training_args = TrainingArguments(
    output_dir="./results_carl/final_model",  # 모델 저장 경로
    eval_strategy="no",  # evaluation_strategy를 steps로 변경
    save_strategy="steps",  # save_strategy를 steps로 변경
    #eval_steps=100,  # 500 스텝마다 평가
    logging_strategy="steps",
    save_steps=100,  # 500 스텝마다 체크포인트 저장
    logging_steps=10,  # 로그를 출력할 스텝 간격
    learning_rate=2e-5,
    per_device_train_batch_size=1,  # 배치 크기를 더 줄임
    num_train_epochs=3,
    weight_decay=0.01,
    remove_unused_columns=False,  # 이 부분을 추가하여 오류 해결
    bf16=torch.backends.mps.is_available(),  # Mixed Precision Training 활성화 (MPS에서만)
    save_total_limit=3,  # 최대 저장할 체크포인트 수
    #load_best_model_at_end=True,  # 훈련 종료 시 가장 좋은 모델 로드
    #metric_for_best_model="eval_loss",  # 가장 좋은 모델을 선택할 메트릭
    #greater_is_better=False,  # 낮은 eval_loss가 더 좋음
    gradient_accumulation_steps=8,  # Gradient Accumulation 사용
    logging_dir='./logs',  # 로그 디렉토리 설정
)

In [None]:
from transformers import DataCollatorWithPadding
data_collator = DataCollatorWithPadding(tokenizer)
from transformers import TrainerCallback
# TrainerCallback 설정
class LogCallback(TrainerCallback):
    def on_log(self, args, state, control, logs=None, **kwargs):
        if logs is not None:
            print(f"Step: {state.global_step}, Logs: {logs}")
# Trainer 설정
trainer = Trainer(
    model=tmodel,
    args=training_args,
    train_dataset=train_dataset,
    #eval_dataset=eval_dataset,  # 평가 데이터셋 추가
    tokenizer=tokenizer,
    #compute_metrics=compute_metrics,  # 평가 메트릭 추가
    data_collator=data_collator,  # 데이터 콜레이터 추가
    callbacks=[LogCallback()]  # 콜백 추가
)
# 모델 훈련 시작
checkpoint_path = "./results_carl/"
"""
if os.path.exists(checkpoint_path):
    trainer.train(resume_from_checkpoint=checkpoint_path)  # checkpoint-500에서 재개
else:
    trainer.train()  # 처음부터 훈련 시작
    """
trainer.train()
# 모델 저장
trainer.save_model("./results_carl/final")

In [49]:
def generate_response(prompt, max_length=256):
    inputs = tokenizer(prompt, return_tensors="pt", padding=True, truncation=True, max_length=max_length).to(device)
    with torch.no_grad():
        outputs = tmodel.generate(
            input_ids=inputs.input_ids,
            attention_mask=inputs.attention_mask,
            max_length=max_length,
            num_return_sequences=1,
            no_repeat_ngram_size=2,
            early_stopping=False,
            pad_token_id=tokenizer.eos_token_id,
            repetition_penalty=1.2,
        )
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    response = response[len(prompt):].strip()
    response = response.replace("**", "").strip()
    return response

In [None]:
# 테스트 입력에 대한 모델 출력 확인
test_inputs = ["화가 안 참아져.", "아, 내가 길 가다가 갑자기 확 쓰러지는 건 아닐까요? "]
for test_input in test_inputs:
    response = generate_response(test_input, max_length=512)
    print(f"Input: {test_input}")
    print(f"Response: {response}")

In [None]:
while True:
    user_input = input("You: ")
    if user_input.lower() in ["exit", "quit"]:
        break
    response = generate_response(user_input, max_length=512)  # max_length를 늘려서 응답이 짧게 끊기지 않도록 함
    print(f"Bot: {response}")