#### API models fine tunning

In [None]:
# Google Drive 마운트 및 필요한 라이브러리 설치
from google.colab import drive
drive.mount('/content/drive')

# !pip install anthropic tqdm seaborn pandas numpy matplotlib scikit-learn
# !pip install --upgrade anthropic

import os
import json
import numpy as np
import pandas as pd
from pathlib import Path
from tqdm import tqdm
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt
from anthropic import Anthropic
import time

# Google Drive 내 작업 디렉토리 설정
DRIVE_PATH = "/content/drive/MyDrive/kt/edusmile"
DATA_PATH = f"{DRIVE_PATH}/label_data"

# 필요한 디렉토리 생성
os.makedirs(DATA_PATH, exist_ok=True)
for split in ['train', 'test', 'val']:
    os.makedirs(f"{DATA_PATH}/{split}", exist_ok=True)

class LLMClassifier:
    def __init__(self, api_key):
        """
        LLM 분류기 초기화
        """
        self.anthropic = Anthropic(api_key=api_key)
        self.categories = ['수업내용', '공지사항', '잡담']
        self.system_prompt = """
        당신은 텍스트 분류 모델입니다. 주어진 텍스트를 다음 카테고리 중 하나로 분류해야 합니다:
        - 수업내용: 강의나 수업과 관련된 내용
        - 공지사항: 공지나 안내사항
        - 잡담: 일상적인 대화나 잡담

        카테고리명만 정확히 응답해주세요.
        """

    def predict_single(self, text):
        """
        단일 텍스트에 대한 예측을 수행합니다.
        """
        try:
            response = self.anthropic.messages.create(
                model="claude-3-sonnet-20240229",
                max_tokens=10,
                temperature=0,
                system=self.system_prompt,
                messages=[{"role": "user", "content": text}]
            )
            prediction = response.content[0].text.strip()
            return prediction
        except Exception as e:
            print(f"예측 중 오류 발생: {e}")
            return None

    def predict_batch(self, texts, batch_size=5):
        """
        배치 단위로 예측을 수행합니다.
        """
        predictions = []
        for i in tqdm(range(0, len(texts), batch_size)):
            batch = texts[i:i + batch_size]
            batch_predictions = []
            for text in batch:
                prediction = self.predict_single(text)
                batch_predictions.append(prediction)
                time.sleep(1)  # API 호출 제한 고려
            predictions.extend(batch_predictions)
        return predictions

    def plot_confusion_matrix(self, y_true, y_pred, output_path=None):
        """
        혼동 행렬을 시각화합니다.
        """
        # None 값 제거
        valid_indices = [i for i, pred in enumerate(y_pred) if pred is not None]
        y_true = [y_true[i] for i in valid_indices]
        y_pred = [y_pred[i] for i in valid_indices]

        cm = confusion_matrix(y_true, y_pred, labels=self.categories)
        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                   xticklabels=self.categories,
                   yticklabels=self.categories)
        plt.title('Confusion Matrix')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')

        if output_path:
            plt.savefig(output_path)
            plt.close()
        else:
            plt.show()

def main():
    # API 키 설정
    api_key = ""  # @param {type:"string"}

    # 분류기 초기화
    classifier = LLMClassifier(api_key)

    # 데이터 로드
    splits = {}
    for split in ['val', 'test']:
        file_path = f"{DATA_PATH}/{split}/{split}_dataset.jsonl"
        with open(file_path, 'r', encoding='utf-8') as f:
            splits[split] = [json.loads(line) for line in f]
        print(f"{split} 데이터 로드 완료: {len(splits[split])} 샘플")

    # Validation 평가
    print("\nValidation 데이터 평가 중...")
    val_texts = [item['instruction'] for item in splits['val']]
    val_labels = [item['category'] for item in splits['val']]

    val_predictions = classifier.predict_batch(val_texts)

    # None 값 제거
    val_valid_indices = [i for i, pred in enumerate(val_predictions) if pred is not None]
    val_labels_clean = [val_labels[i] for i in val_valid_indices]
    val_predictions_clean = [val_predictions[i] for i in val_valid_indices]

    print("\nValidation 결과:")
    print(classification_report(val_labels_clean, val_predictions_clean))

    # Test 평가
    print("\nTest 데이터 평가 중...")
    test_texts = [item['instruction'] for item in splits['test']]
    test_labels = [item['category'] for item in splits['test']]

    test_predictions = classifier.predict_batch(test_texts)

    # None 값 제거
    test_valid_indices = [i for i, pred in enumerate(test_predictions) if pred is not None]
    test_labels_clean = [test_labels[i] for i in test_valid_indices]
    test_predictions_clean = [test_predictions[i] for i in test_valid_indices]

    print("\nTest 결과:")
    print(classification_report(test_labels_clean, test_predictions_clean))

    # 혼동 행렬 저장
    output_path = f"{DATA_PATH}/confusion_matrix.png"
    classifier.plot_confusion_matrix(
        test_labels,
        test_predictions,
        output_path=output_path
    )
    print(f"\n혼동 행렬이 저장되었습니다: {output_path}")

    # 결과 저장
    results = {
        'test_metrics': classification_report(test_labels_clean, test_predictions_clean, target_names=classifier.categories, output_dict=True),
        'val_metrics': classification_report(val_labels_clean, val_predictions_clean, target_names=classifier.categories, output_dict=True),
    }

    results_path = f"{DATA_PATH}/evaluation_results.json"
    with open(results_path, 'w', encoding='utf-8') as f:
        json.dump(results, f, indent=2, ensure_ascii=False)
    print(f"\n평가 결과가 저장되었습니다: {results_path}")

if __name__ == "__main__":
    main()

In [None]:
# gpt_training.py

import os
import json
import time
import numpy as np
import pandas as pd
from pathlib import Path
from tqdm import tqdm
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt
from openai import OpenAI

class GPTClassifier:
    def __init__(self, api_key):
        self.client = OpenAI(api_key=api_key)
        self.categories = ['수업내용', '공지사항', '잡담']

    def list_active_jobs(self):
        """현재 실행 중인 파인튜닝 작업을 확인합니다."""
        jobs = self.client.fine_tuning.jobs.list(limit=10)
        active_jobs = []
        for job in jobs:
            if job.status in ['running', 'pending']:
                active_jobs.append(job)
                print(f"Active Job ID: {job.id}")
                print(f"Status: {job.status}")
                print(f"Model: {job.model}")
                print(f"Created at: {job.created_at}")
                print("---")
        return active_jobs

    def cancel_job(self, job_id):
        """파인튜닝 작업을 취소합니다."""
        try:
            self.client.fine_tuning.jobs.cancel(job_id)
            print(f"Job {job_id} cancelled successfully")
            return True
        except Exception as e:
            print(f"Error cancelling job: {e}")
            return False

    def load_existing_data(self, file_path):
        """기존 JSONL 파일을 로드하고 OpenAI fine-tuning 형식으로 변환합니다."""
        data = []
        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f:
                item = json.loads(line)
                messages = [
                    {
                        "role": "system",
                        "content": "You are a text classifier that categorizes Korean educational text into three categories: '수업내용' (class content), '공지사항' (announcements), and '잡담' (chat)."
                    },
                    {
                        "role": "user",
                        "content": item['instruction']
                    },
                    {
                        "role": "assistant",
                        "content": item['category']
                    }
                ]
                data.append({"messages": messages})
        return data

    def save_training_data(self, data, output_file):
        """파인튜닝 데이터를 JSONL 형식으로 저장합니다."""
        with open(output_file, 'w', encoding='utf-8') as f:
            for item in data:
                f.write(json.dumps(item, ensure_ascii=False) + '\n')
        print(f"Training data saved to: {output_file}")
        print(f"File size: {os.path.getsize(output_file) / 1024:.2f} KB")

    def create_fine_tuning_job(self, training_file):
        """파인튜닝 작업을 생성합니다."""
        try:
            # 먼저 활성 작업 확인
            active_jobs = self.list_active_jobs()
            if len(active_jobs) >= 3:
                print("\n현재 3개의 활성 작업이 실행 중입니다.")
                cancel = input("오래된 작업을 취소하시겠습니까? (y/n): ")
                if cancel.lower() == 'y':
                    # 가장 오래된 작업 취소
                    oldest_job = min(active_jobs, key=lambda x: x.created_at)
                    if self.cancel_job(oldest_job.id):
                        time.sleep(10)  # 작업 취소 후 대기
                    else:
                        return None
                else:
                    return None

            print("\nUploading training file...")
            file = self.client.files.create(
                file=open(training_file, 'rb'),
                purpose="fine-tune"
            )
            print(f"File uploaded with ID: {file.id}")

            # 파일 상태 확인
            while True:
                file_status = self.client.files.retrieve(file.id)
                if file_status.status == "processed":
                    break
                print(f"Waiting for file processing... Status: {file_status.status}")
                time.sleep(5)

            print("\nCreating fine-tuning job...")
            job = self.client.fine_tuning.jobs.create(
                training_file=file.id,
                model="gpt-3.5-turbo",
                hyperparameters={
                    "n_epochs": 3
                }
            )

            return job.id
        except Exception as e:
            print(f"파인튜닝 작업 생성 중 오류 발생: {e}")
            return None

    def monitor_fine_tuning_job(self, job_id):
        """파인튜닝 작업의 진행 상황을 모니터링합니다."""
        print(f"\nMonitoring fine-tuning job: {job_id}")
        previous_status = None

        while True:
            try:
                job = self.client.fine_tuning.jobs.retrieve(job_id)
                current_status = job.status

                if current_status != previous_status:
                    print(f"\nStatus changed: {current_status}")
                    previous_status = current_status

                if hasattr(job, 'trained_tokens'):
                    print(f"Trained tokens: {job.trained_tokens}")

                if current_status == "succeeded":
                    print(f"\nFine-tuning completed successfully!")
                    print(f"Fine-tuned model ID: {job.fine_tuned_model}")
                    return True, job.fine_tuned_model
                elif current_status == "failed":
                    print(f"\nFine-tuning failed!")
                    if hasattr(job, 'error'):
                        print(f"Error: {job.error}")
                    return False, None

                time.sleep(30)

            except Exception as e:
                print(f"Monitoring error: {e}")
                time.sleep(30)

    def predict_single(self, text, model_id=None):
        """단일 텍스트에 대한 예측을 수행합니다."""
        try:
            response = self.client.chat.completions.create(
                model=model_id if model_id else "gpt-3.5-turbo",
                messages=[
                    {
                        "role": "system",
                        "content": "You are a text classifier that categorizes Korean educational text into three categories: '수업내용', '공지사항', '잡담'."
                    },
                    {
                        "role": "user",
                        "content": text
                    }
                ],
                temperature=0,
                max_tokens=10
            )
            return response.choices[0].message.content.strip()
        except Exception as e:
            print(f"예측 중 오류 발생: {e}")
            return None

    def predict_batch(self, texts, model_id=None, batch_size=5):
        """배치 단위로 예측을 수행합니다."""
        predictions = []
        for i in tqdm(range(0, len(texts), batch_size)):
            batch = texts[i:i + batch_size]
            batch_predictions = []
            for text in batch:
                prediction = self.predict_single(text, model_id)
                batch_predictions.append(prediction)
                time.sleep(1)  # API 호출 제한 고려
            predictions.extend(batch_predictions)
        return predictions

    def plot_confusion_matrix(self, y_true, y_pred, output_path=None):
        """혼동 행렬을 시각화합니다."""
        # None 값 제거
        valid_indices = [i for i, pred in enumerate(y_pred) if pred is not None]
        y_true = [y_true[i] for i in valid_indices]
        y_pred = [y_pred[i] for i in valid_indices]

        cm = confusion_matrix(y_true, y_pred, labels=self.categories)
        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                   xticklabels=self.categories,
                   yticklabels=self.categories)
        plt.title('Confusion Matrix')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')

        if output_path:
            plt.savefig(output_path)
            plt.close()
        else:
            plt.show()



In [None]:
def main():
    # 경로 설정
    TRAIN_PATH = "/content/drive/MyDrive/kt/edusmile/label_data/train/train_dataset.jsonl"
    VAL_PATH = "/content/drive/MyDrive/kt/edusmile/label_data/val/val_dataset.jsonl"
    TEST_PATH = "/content/drive/MyDrive/kt/edusmile/label_data/test/test_dataset.jsonl"
    OUTPUT_PATH = "/content/drive/MyDrive/kt/edusmile/label_data"

    # API 키 설정
    api_key = ""  # @param {type:"string"}

    # 분류기 초기화
    classifier = GPTClassifier(api_key)

    # 기존 데이터 로드 및 변환
    print("\n기존 데이터 로드 및 변환 중...")
    training_data = classifier.load_existing_data(TRAIN_PATH)
    print(f"학습 데이터 로드 완료: {len(training_data)} 샘플")

    # OpenAI fine-tuning 형식으로 변환된 데이터 저장
    finetune_file = os.path.join(OUTPUT_PATH, "openai_finetune.jsonl")
    classifier.save_training_data(training_data, finetune_file)

    # 파인튜닝 수행
    print("\n파인튜닝 시작...")
    job_id = classifier.create_fine_tuning_job(finetune_file)

    if job_id:
        print(f"\n파인튜닝 작업 ID: {job_id}")
        success, model_id = classifier.monitor_fine_tuning_job(job_id)

        if success:
            print(f"\n파인튜닝 완료! 모델 ID: {model_id}")

            # Validation 평가
            print("\nValidation 데이터 평가 중...")
            val_data = classifier.load_existing_data(VAL_PATH)
            val_texts = [item['messages'][1]['content'] for item in val_data]
            val_labels = [item['messages'][2]['content'] for item in val_data]

            val_predictions = classifier.predict_batch(val_texts, model_id)
            print("\nValidation 결과:")
            print(classification_report(val_labels, val_predictions))

            # Test 평가
            print("\nTest 데이터 평가 중...")
            test_data = classifier.load_existing_data(TEST_PATH)
            test_texts = [item['messages'][1]['content'] for item in test_data]
            test_labels = [item['messages'][2]['content'] for item in test_data]

            test_predictions = classifier.predict_batch(test_texts, model_id)
            print("\nTest 결과:")
            print(classification_report(test_labels, test_predictions))

            # 혼동 행렬 저장
            output_path = os.path.join(OUTPUT_PATH, "confusion_matrix.png")
            classifier.plot_confusion_matrix(
                test_labels,
                test_predictions,
                output_path=output_path
            )
            print(f"\n혼동 행렬이 저장되었습니다: {output_path}")

            # 결과 저장
            results = {
                'model_id': model_id,
                'test_metrics': classification_report(test_labels, test_predictions, target_names=classifier.categories, output_dict=True),
                'val_metrics': classification_report(val_labels, val_predictions, target_names=classifier.categories, output_dict=True),
            }

            results_path = os.path.join(OUTPUT_PATH, "evaluation_results.json")
            with open(results_path, 'w', encoding='utf-8') as f:
                json.dump(results, f, indent=2, ensure_ascii=False)
            print(f"\n평가 결과가 저장되었습니다: {results_path}")
        else:
            print("\n파인튜닝 실패")
    else:
        print("\n파인튜닝 작업 생성 실패")

if __name__ == "__main__":
    main()
