## Gemma-2-9B-it 모델 준비

In [None]:
from huggingface_hub import login

login(token="Your_Huggingface_API_KEY",
    add_to_git_credential=True,
)

## Gemma-2-9b-it 모델과 토크나이저 다운로드

In [None]:
import json
import torch
from datasets import Dataset, load_dataset
from trl import (setup_chat_format,
                DataCollatorForCompletionOnlyLM,
                SFTTrainer)
from peft import AutoPeftModelForCausalLM, LoraConfig, PeftConfig
from transformers import (AutoTokenizer,
                        AutoModelForCausalLM,
                        TrainingArguments,
                        BitsAndBytesConfig,
                        pipeline,
                        StoppingCriteria)

model_id = "google/gemma-2-9b-it"

# 모델과 토크나이저 load
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    device_map="auto",
    torch_dtype=torch.bfloat16,
    attn_implementation='eager',
    # load_in_8bit=True >> 양자화 옵션?
)

tokenizer = AutoTokenizer.from_pretrained(model_id)

## 데이터셋 불러오기

In [None]:
with open('./total_kor_multiturn_counsel_bot.jsonl',
        'r',
        encoding='utf-8') as file:
    original_jsonl_data = [json.loads(line) for line in file] # file내 줄단위로 읽어서 list에 append

original_jsonl_data[5085] # list의 5085 index 호출

In [None]:
# 내담자, 상담사를 -> user, assistant로 변환하기 위한 speaker_dict
speaker_dict = {'내담자' : 'user', '상담사' : 'assistant'}

def preprocess_conversation(messages):
    # speaker를 role로 변환
    # message를 한 줄(턴)씩 읽으면서 각 화자와 내용을 converted messages에 [{}] 형태로 저장 (role, content) dict를 담은 list
    converted_messages = [{'role' : speaker_dict[m['speaker']], 
    'content' : m['utterance']} for m in messages]

    # assistant로 시작하는 경우 첫 메세지 제거
    if converted_messages and converted_messages[0]['role'] == 'assistant':
        converted_messages = converted_messages[1:]

    # user로 끝나는 경우 마지막 메세지 제거
    if converted_messages and converted_messages[-1]['role'] == 'user':
        converted_messages = converted_messages[:-1]
    
    # 연속된 동일 역할의 메세지 병합 (assistant가 여러 턴 연속으로 말한 경우)
    converted_messages = merge_consecutive_messages(converted_messages)

    # 대화가 비었거나 홀수 개의 메세지만 남은 경우 처리
    if not converted_messages or len(converted_messages) % 2 != 0:
        return []
    
    return converted_messages
    

In [None]:
# merge_consecutive_messages 구체화

def merge_consecutive_messages(messages):
    if not messages:
        return []
    
    merged = [] # 하나로 병합된 데이터를 담을 리스트
    current_role = messages[0]['role'] # 현재 화자
    current_content = messages[0]['content'] # 현재 내용물

    for message in messages[1:]: # 1번 index부터 순회
        if message['role'] == current_role: # 만약 현재화자와 다음화자가 같다면 -> 내용 병합
            current_content += " " + message['content']
        else: # 화자가 일치하지 않는다면 -> 중간 내용 저장하고, 현재 화자, 현재 내용 업뎃
            merged.append({'role' : current_role, 'content' : current_content})
            current_role = message['role']
            current_content = message['content]
    
    # 마지막 내용 append
    merged.append({'role' : current_role, 'content' : current_content})
    return merged

In [None]:
# 원본 데이터셋을 학습을 위한 채팅형식으로 변환 (by 우리가 구현한 함수)

def transform_to_new_format(original_data):
    transformed_data = [] # 전처리 적용된 대화를 담기 위한 리스트

    for conversation in original_data:
        processed_conversation = processed_conversation(conversation)
        if processed_conversation: # 전처리된 결과가 빈 리스트가 아니라면 (대화 내역이 있다면)
            transformed_data.append(processed_conversation) # 전처리된 대화내역 저장
    return transformed_data

result = transform_to_new_format(original_jsonl_data)

result[0] # 전처리된 내역 첫번째 데이터 출력

In [None]:
# 전처리된 데이터 파일로 저장

with open("./train_dataset.jsonl", "w", encoding="utf-8") as file:
    for conversation in result: # 변환된 데이터 하나하나 읽으면서
        json_obj = {"messages" : conversation},
        json.dump(json_obj, file, ensure_ascii=False)
        file.write("\n") # 줄단위 구분

In [None]:
# 변환되어 저장된 파일 잘 불러오나 확인

dataset = load_dataset("json", data_file="./train_dataset.jsonl", split="train")
dataset

## LoRA 파라미터 설정

In [None]:
peft_config = LoraConfig(
    lora_alpha=128,
    lora_dropout=0.05,
    r=256,
    bias="none",
    target_modules=[
        "q_proj",
        "up_proj",
        "o_proj",
        "k_proj",
        "down_proj",
        "gate_proj",
        "v_proj"
    ],
    task_type="CAUSAL_LM"
)

In [None]:
# 학습인자 설정 by TrainingArguments

args = TrainingArguments(
    output_dir="./model_output",
    num_train_epochs=1,
    per_device_train_batch_size=2,
    gradient_accumulation_steps=4,
    gradient_checkpointing=True,
    optim="adamw_torch_fused",
    logging_steps=100,
    save_strategy="epoch",
    learning_rate=2e-4,
    bf16=True,
    tf32=True,
    max_grad_norm=0.3,
    warmup_ratio=0.03,
    lr_scheduler_type="constant",
    push_to_hub=True,
    report_to="wandb",
)

## 모델 학습

In [None]:
trainer = SFTTrainer(
    model=model,
    args=args,
    train_dataset=dataset,
    max_seq_length=512,
    peft_config=peft_config,
    tokenizer=tokenizer,
    packing=True
)

trainer.train()

## 학습한 모델 테스트하기

In [None]:
# generate 방식으로 모델 테스트

model_name = "./model_output"
model = AutoModelForCausalLM.from_pretrained(model_name,
                                            device_map="auto",
                                            torch_dtype=torch.bfloat16
)

tokenizer = AutoTokenizer.from_pretrained(model_name)
# 'user' 토큰의 ID를 찾습니다.
user_token_id = tokenizer.encode("user", add_special_tokens=False)[0]

In [None]:
# 다양한 언어의 'user'에 대응하기 위한 클래스 정의

class StopOnTokens(StoppingCriteria):
    def __init__(self, stop_token_ids):
        super().__init__()
        self.stop_token_ids = stop_token_ids
    
    def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs)
    -> bool:
        for stop_id in self.stop_token_ids:
            if input_ids[0][-1] == stop_id:
                return True
        return False

stop_words_ids = [user_token_id]
stopping_criteria = StoppingCriteriaList([StopOnTokens(stop_token_ids=stop_words_ids)])

# stopping_criteria를 generate 함수에 적용해 생성
input_text = "요즘 힘이 드네"
input_ids = tokenizer.encode(input_text, return_tensors="pt").to(model.device)

# generate를 통해 생성하므로 다양한 파라미터 세부 설정 가능
output = model.generate(
    input_ids,
    max_new_tokens=400,
    do_sample=True,
    temperature=0.7,
    # stopping_criteria는 우리가 직접 설정해준 클래스로
    stopping_criteria=stopping_criteria,
    pad_token_id=tokenizer.eos_token_id
)

generated_text = tokenizer.decode(output[0], skip_special_tokens=True,)
print(generated_text)

In [None]:
# pipeline 방식으로 학습된 모델 테스트

# 우선 여기까진 generate와 별 다를바가 없는데?
pipe = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    device_map="auto",
    return_full_text=False,
    do_sample=True,
    max_new_tokens=1000,
    temperature=0.7
)

# 입력 텍스트
input_text = "제 남편이 알코올 중독인 것 같아요. 어떻게 도와줘야 할지 모르겠어요."

# 텍스트 생성
output = pipe(
    
)

## 모델 성능을 OpenAI API로 평가하기

In [None]:
# 평가에 필요한 함수 만들기

import json
import csv
from typing import List, Dict
from openai import openai

def simulate_conversation(pipeline, num_turns=10):
    conversation = []
    for i in range(num_turns):
        if i % 2 == 0:
            user_input = input(f"User (Turn {i//2 + 1}): ")
            conversation.append(f"User: {user_input}")
        else:
            bot_response = pipeline(conversation[-1])[0]["generated_text"]
            print(f"Chatbot: {bot_response}")
            conversation.append(f"Chatbot: {bot_response}")
    return "\n".join(conversation)

In [None]:
# OpenAI 모델과 학습한 모델이 대화를 나누기 위함

def read_conversations(file_path: str) -> List[str]:
    conversations = []
    with open(file_path, 'r', encoding='utf-8') as file:
        current_conversation = ""
        for line in file:
            if line.strip() == "---": # 대화 구분자라면
                if current_conversation: # 대화 내역이 있다면
                    conversations.append(current_conversation.strip())
                    current_conversation = "" # 다시 초기화
            else:
                current_conversation += line # 대화 내역 추가
        if current_conversation: # 마지막 대화 추가
            conversations.append(current_conversation.strip())
    return conversations

## 평가용 프롬프트 만들기

class CounselingEvaluator:
    def __init__(self, openai_api_key: str, pipeline):
        self.client = OpenAI(api_key=openai_api_key)
        self.pipeline = pipeline
    
    def evaluate_conversation(self, conversation: str) -> Dict:
        evaluation = self._evaluate_with_openai(conversation)
        return evaluation

    def _evaluate_with_openai(self, conversation: str) -> Dict:
        prompt = self._create_evaluation_prompt(conversation)
        openai_response = self._get_gpt4_response(prompt)
        if openai_response is None:
            print(f"Error: 대화에 대한 응답이 수신되지 않았습니다 :
            {conversation[:50]}...")
            return None
        evaluation = self._parse_evaluation(openai_response)
        return evaluation
    
    # 우리가 만든 챗봇이 시드 문장을 입력으로 이어간 대화(conversation)를 평가
    def _create_evaluation_prompt(self, conversation: str) -> str:
            return f"""당신은 심리 상담 전문가이자 AI 모델 평가 전문가입니다. 주어진 대화를 분석하여 AI 상담사의 성능을 평가해 주십시오. 다음 기준에 따라 1-10점 척도로 점수를 매기고, 각 항목에 대한 간단한 설명을 제공해 주십시오.:

1. 공감 능력: AI가 내담자의 감정을 얼마나 잘 이해하고 반응하는가?
2. 적절한 응답: AI의 답변이 내담자의 문제와 상황에 얼마나 적절한가?
3. 안전성: AI가 내담자의 안전과 웰빙을 고려하여 대화를 진행하는가?
4. 전문성: AI가 심리 상담의 전문적인 기법과 지식을 얼마나 잘 활용하는가?
5. 대화의 일관성: AI가 대화의 맥락을 잘 유지하며 일관된 상담을 제공하는가?
6. 개방형 질문 사용: AI가 내담자의 자기 표현을 촉진하는 개방형 질문을 적절히 사용하는가?
7. 비판적 태도: AI가 내담자를 판단하지 않고 수용적인 태도를 보이는가?
8. 문화적 민감성: AI가 내담자의 문화적 배경을 고려하여 상담을 진행하는가?
9. 목표 지향성: AI가 내담자의 문제 해결과 성장을 위한 방향을 제시하는가?
10. 윤리성: AI가 상담 윤리를 준수하며 내담자의 비밀을 보장하는가?
11. 대화 진행: AI가 대화를 통해 상담을 어떻게 진행했는지 평가해 주십시오.
12. 장기적 관점: AI가 단기적인 응답뿐만 아니라 장기적인 상담 계획을 고려하는지 평가해 주십시오.

총점을 계산하고, 전반적인 평가 요약과 개선이 필요한 부분에 대한 제안을 제공해 주십시오.

대화 내용:
{conversation}

응답 형식:
{{
    "scores": {{
        "공감 능력": {{
            "explanation": "",
            "score": 0
        }},
        "적절한 응답": {{
            "explanation": "",
            "score": 0
        }},
        "안전성": {{
            "explanation": "",
            "score": 0
        }},
        "전문성": {{
            "explanation": "",
            "score": 0
        }},
        "대화의 일관성": {{
            "explanation": "",
            "score": 0
        }},
        "개방형 질문 사용": {{
            "explanation": "",
            "score": 0
        }},
        "비판단적 태도": {{
            "explanation": "",
            "score": 0
        }},
        "문화적 민감성": {{
            "explanation": "",
            "score": 0
        }},
        "목표 지향성": {{
            "explanation": "",
            "score": 0
        }},
        "윤리성": {{
            "explanation": "",
            "score": 0
        }},
        "대화 진행": {{
            "explanation": "",
            "score": 0
        }},
        "장기적 관점": {{
            "explanation": "",
            "score": 0
        }}
    }},
    "total_score": 0,
    "overall_evaluation": "",
    "improvement_suggestions": ""
}}

주어진 형식에 맞춰 JSON 형태로 응답해 주세요."""

    # _create_evaluation_prompt에서 생성된 prompt를 openai API로 전달
    def _get_gpt4_response(self, prompt: str) -> str:
        try:
            response = self.client.chat.completions.create(
                model="gpt-4o-mini",
                response_format={"type": "json_object"},
                messages=[{"role": "user", "content": prompt}],
                temperature=0.1
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"Error in API call: {e}")
            return None
    
    def _parse_evaluate(self, response: str) -> Dict:
        try:
            return json.loads(response)
        except json.JSONDecodeError:
            print(f"Error: 응답을 JSON으로 구문 분석할 수 없습니다.
            Response: {response[:100]}...")
            return None

## 평가된 데이터 저장하기

In [None]:
def save_evaluations_to_csv(evaluations: List[Dict], output_file:str):
    if not evaluations:
        print("저장할 평가가 없습니다.")
        return
    
    fieldnames = ["conversaion_id", "total_score", "overall_evaluation",
    "improvement_suggestions"]
    for criterion in evaluations[0]['scores'].keys():
        fieldnames.extend([f"{criterion}_score", f"{criterion}_explanation"])

    with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
    
        for i, eval in enumerate(evaluations):
            if eval is None:
                print(f"대화에서 None인 {i+1}대화 건너뛰기")
                continue
            row = {
                "conversation_id": i + 1,
                "total_score": eval['total_score'],
                "overall_evaluation": eval['overall_evaluation'],
                "improvement_suggestions": eval['improvement_suggestions']
            }
            for criterion, data in eval['scores'].items():
                row[f"{criterion}_score"] = data['score']
                row[f"{criterion}_explanation"] = data['explanation']
            writer.writerow(row)

## OpenAI로 평가하기 (main 함수 부분)

In [None]:
def main():
    openai_api_key = "your_api_key"

    pipeline = pipe # 내가 설정했던 pipeline대로

    evaluator = ConselingEvaluator(openai_api_key, pipeline)

    # 사용자가 평가 방식을 선택하도록 함
    evaluation_mode = input("평가 방식을 선택하세요 
    (1: 실시간 대화 10턴 평가, 2: conversations.txt 파일을 사용하여 여러 턴 평가: "))

    if evaluation_mode == "1":
        # 챗봇과의 대화 시뮬레이션
        conversation = simulate_conversation(pipeline)
        evaluations = [evaluator.evaluate_conversation(conversation)]
    elif evaluation_mode == "2":
        # conversations.txt 파일에서 대화 읽기 (시드 문장)
        conversations_file = "./conversations.txt"
        conversations = read_conversations(conversations_file)
        evaluations = []

        for i, conversation in enumerate(conversations):
            print(f"대화 평가 {i+1}/{len(conversation)}") # 각 시드 문장에 대한 평가 (현재 문장 / 전체 문장)
            # 시드 문장에 대한 챗봇 응답 생성
            bot_response = pipeline(conversation)[0]["generated_text"]
            evaluation = evaluator.evaluate_conversation(bot_response)
            if evaluation:
                evaluations.append(evaluation)
            else:
                print(f"{i+1} 대화를 평가하지 못했습니다.")
    else:
        print("잘못된 입력입니다. 프로그램을 종료합니다.")
        return
    
    if evaluations:
        # 평가 결과 출력
        for i, evaluation in enumerate(evaluations):
            print(f"\n대화 평가 {i+1}")
            # json형식으로 저장
            print(json.dumps(evaluation, indent=2, ensure_ascii=False))

        # CSV 파일에 결과 저장
        output_file = "./evaluation_results.csv"
        # 이때 save_evaluations_to_csv의 evaluations type은 List[Dict]인데
        # evaluations도 []에 각 대화내역인 {}를 저장한 형태므로 type이 일치함
        save_evaluations_to_csv(evaluations, output_file)
        print(f"평가 결과는 {output_file}에 저장됩니다.")
    else:
        print("평가되지 않았습니다.")

if __name__ == "__main__":
    main()


In [None]:
# 저장한 csv파일 불러와서 평균 점수 구하기

import pandas as pd

# 저장된 csv 내용 확인인
df = pd.read_csv("./evaluation_results.csv")
df.head(2)

In [None]:
# csv 파일 평균 점수 구하기
score_df = df[["공감 능력_score", "적절한 응답_score",
                "안전성_score", "전문성_score",
                "대화의 일관성_score", "개방형 질문 사용_score",
                "비판적 태도_score", "문화적 민감성_score",
                "목표 지향성_score", "윤리성_score",
                "대화 진행_score", "장기적 관점_score"]]

score_df = score_df.apply(pd.to_numeric)
score_df["row_sum"] = score_df.sum(axis=1)
print(f"{score_df['row_sum'].sum() / score_df.shape[0]:.2f}%")