<a href="https://colab.research.google.com/github/KTFplus/KTFfintune/blob/master/smallwthdrivedata.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -q torch transformers datasets librosa evaluate jiwer peft soundfile

In [None]:
# @title Whisper 파인튜닝을 위한 Colab 최적화 코드 (RAM 최적화 버전)
# @markdown ## 1. 필수 패키지 설치 (필요한 경우 실행)
# !pip install -q transformers datasets librosa evaluate jiwer peft soundfile

# @markdown ## 2. Google Drive 마운트
from google.colab import drive
drive.mount('/content/drive')

# @markdown ## 3. 경로 및 기본 설정
import os
import torch
import pandas as pd
import numpy as np
import gc
import logging
from tqdm.auto import tqdm
from datasets import Dataset, Audio, load_from_disk
from transformers import (
    WhisperProcessor,
    WhisperForConditionalGeneration,
    Seq2SeqTrainer,
    Seq2SeqTrainingArguments,
    DataCollatorForSeq2Seq
)
from peft import LoraConfig, get_peft_model
import evaluate

# PyTorch 메모리 단편화 방지 설정
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:128'

# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 데이터셋 경로 (Google Drive 내 위치)
dataset_path = "/content/drive/MyDrive/train_B_Normalized"  # @param {type:"string"}
output_dir = "/content/drive/MyDrive/whisper_finetuned"  # @param {type:"string"}
preprocessed_dir = "/content/drive/MyDrive/preprocessed_whisper"  # @param {type:"string"}
os.makedirs(output_dir, exist_ok=True)
os.makedirs(preprocessed_dir, exist_ok=True)

# 메모리 정리 함수 - 더 철저하게
def clear_memory():
    gc.collect()
    torch.cuda.empty_cache()
    import psutil
    process = psutil.Process(os.getpid())
    logger.info(f"메모리 정리 완료. 현재 메모리 사용량: {process.memory_info().rss / 1024 ** 2:.2f} MB")

# 세션 유지 알림 (8시간마다)
from IPython.display import display, Javascript
display(Javascript('''
function alertUser() {
  alert("세션을 유지하기 위해 페이지와 상호작용하세요!");
  setTimeout(alertUser, 28800000); // 8시간마다
}
setTimeout(alertUser, 28800000);
'''))

# @markdown ## 4. 데이터셋 로드 및 준비
# 전처리된 데이터셋이 있는지 확인
train_processed_path = os.path.join(preprocessed_dir, "train_dataset")
eval_processed_path = os.path.join(preprocessed_dir, "eval_dataset")

use_preprocessed = os.path.exists(train_processed_path) and os.path.exists(eval_processed_path)

if use_preprocessed:
    logger.info("전처리된 데이터셋을 로드합니다...")
    train_dataset = load_from_disk(train_processed_path)
    eval_dataset = load_from_disk(eval_processed_path)
    logger.info(f"전처리된 데이터셋 로드 완료: 훈련 {len(train_dataset)}개, 검증 {len(eval_dataset)}개")
else:
    # 메타데이터 로드
    metadata_file = os.path.join(dataset_path, "filtered_data_B.csv")
    try:
        df = pd.read_csv(metadata_file)
        logger.info(f"메타데이터 로드 완료: {len(df)}개 항목")

        # 데이터셋 크기 제한 (무료 Colab에서 시간 제한 내 완료를 위해)
        max_samples = 5000  # @param {type:"integer"}
        if max_samples and len(df) > max_samples:
            df = df.sample(max_samples, random_state=42)
            logger.info(f"데이터셋 크기 제한: {max_samples}개 샘플만 사용")

        # 필수 컬럼 확인
        required_columns = ['fileName', 'ReadingLabelText']
        missing_columns = [col for col in required_columns if col not in df.columns]

        if missing_columns:
            # 컬럼명 자동 매핑 시도
            if 'fileName' in missing_columns and 'file_name' in df.columns:
                df['fileName'] = df['file_name']
                missing_columns.remove('fileName')

            if 'ReadingLabelText' in missing_columns:
                # 가능한 대체 컬럼명들
                text_column_candidates = ['text', 'transcript', 'ReadingLabelText']
                for col in text_column_candidates:
                    if col in df.columns:
                        df['ReadingLabelText'] = df[col]
                        missing_columns.remove('ReadingLabelText')
                        break

        if missing_columns:
            raise ValueError(f"CSV 파일에 필수 컬럼이 없습니다: {missing_columns}")

    except Exception as e:
        logger.error(f"메타데이터 로드 실패: {e}")
        raise

    # 데이터셋 분할 (훈련:검증 = 95:5)
    from sklearn.model_selection import train_test_split

    train_df, eval_df = train_test_split(df, test_size=0.05, random_state=42)
    logger.info(f"훈련 데이터: {len(train_df)}개, 검증 데이터: {len(eval_df)}개")

    # 메모리 정리
    del df
    clear_memory()

    # 데이터셋 생성 함수
    def create_dataset(dataframe, audio_dir, desc="데이터셋 생성"):
        # 오디오 파일 경로 생성 및 존재 확인
        audio_paths = []
        texts = []

        for idx, row in tqdm(dataframe.iterrows(), total=len(dataframe), desc=desc):
            # 파일명에 확장자가 없으면 추가
            filename = row['fileName']
            if not filename.lower().endswith('.wav'):
                filename += '.wav'

            file_path = os.path.join(audio_dir, filename)

            # 파일 존재 확인
            if os.path.exists(file_path):
                audio_paths.append(file_path)
                texts.append(row['ReadingLabelText'])
            else:
                logger.warning(f"파일을 찾을 수 없습니다: {file_path}")

        if not audio_paths:
            raise ValueError(f"유효한 오디오 파일을 찾을 수 없습니다. 경로를 확인하세요: {audio_dir}")

        # 데이터셋 사전 생성
        dataset_dict = {
            "audio": audio_paths,
            "text": texts
        }

        # 데이터셋 생성
        dataset = Dataset.from_dict(dataset_dict)
        dataset = dataset.cast_column("audio", Audio(sampling_rate=16000))
        return dataset

    # 훈련 및 검증 데이터셋 생성
    audio_dir = dataset_path
    if not os.path.exists(os.path.join(audio_dir, train_df['fileName'].iloc[0])):
        # 확장자 확인
        if not train_df['fileName'].iloc[0].lower().endswith('.wav'):
            logger.info("파일명에 .wav 확장자 추가 필요")
    else:
        logger.info(f"오디오 파일 경로 확인 완료: {audio_dir}")

    try:
        train_dataset = create_dataset(train_df, audio_dir, "훈련 데이터셋 생성")
        # 메모리 정리
        del train_df
        clear_memory()

        eval_dataset = create_dataset(eval_df, audio_dir, "검증 데이터셋 생성")
        # 메모리 정리
        del eval_df
        clear_memory()

        # 데이터셋 확인
        logger.info(f"훈련 데이터셋: {len(train_dataset)}개")
        logger.info(f"검증 데이터셋: {len(eval_dataset)}개")
    except Exception as e:
        logger.error(f"데이터셋 생성 실패: {e}")
        raise

    # @markdown ## 5. 모델 & 프로세서 설정
    model_name = "openai/whisper-small"  # @param ["openai/whisper-tiny", "openai/whisper-base", "openai/whisper-small"]

    # 프로세서 로드
    try:
        processor = WhisperProcessor.from_pretrained(
            model_name,
            language="korean",  # 한국어 설정
            task="transcribe"
        )
        logger.info(f"프로세서 로드 완료: {model_name}")
    except Exception as e:
        logger.error(f"프로세서 로드 실패: {e}")
        raise

    # @markdown ## 6. 데이터 전처리
    def prepare_dataset(batch):
        try:
            # 오디오 특징 추출
            audio = batch["audio"]
            # float16으로 저장하여 메모리 절약
            batch["input_features"] = processor.feature_extractor(
                audio["array"],
                sampling_rate=audio["sampling_rate"]
            ).input_features[0].astype(np.float16)  # float32 대신 float16 사용

            # 텍스트 토큰화
            batch["labels"] = processor.tokenizer(batch["text"]).input_ids
            return batch
        except Exception as e:
            logger.error(f"데이터 전처리 실패: {e}")
            # 오류가 발생해도 계속 진행할 수 있도록 기본값 반환
            return {
                "input_features": np.zeros((80, 3000), dtype=np.float16),  # float16 사용
                "labels": [0]
            }

    # 데이터셋 전처리 (배치 처리로 메모리 효율성 향상)
    logger.info("훈련 데이터셋 전처리 중...")
    train_dataset = train_dataset.map(
        prepare_dataset,
        remove_columns=["audio", "text"],
        batched=False,
        desc="훈련 데이터 전처리",
        num_proc=1  # 단일 프로세스로 메모리 사용량 감소
    )
    # 메모리 정리
    clear_memory()

    logger.info("검증 데이터셋 전처리 중...")
    eval_dataset = eval_dataset.map(
        prepare_dataset,
        remove_columns=["audio", "text"],
        batched=False,
        desc="검증 데이터 전처리",
        num_proc=1  # 단일 프로세스로 메모리 사용량 감소
    )
    # 메모리 정리
    clear_memory()

    # 전처리된 데이터셋 저장
    logger.info("전처리된 데이터셋 저장 중...")
    train_dataset.save_to_disk(train_processed_path)
    eval_dataset.save_to_disk(eval_processed_path)
    logger.info(f"전처리된 데이터셋 저장 완료: {preprocessed_dir}")

# @markdown ## 5. 모델 & 프로세서 설정 (전처리된 데이터셋 사용 시에도 필요)
model_name = "openai/whisper-small"  # @param ["openai/whisper-tiny", "openai/whisper-base", "openai/whisper-small"]

# 프로세서 로드
try:
    processor = WhisperProcessor.from_pretrained(
        model_name,
        language="korean",  # 한국어 설정
        task="transcribe"
    )
    logger.info(f"프로세서 로드 완료: {model_name}")
except Exception as e:
    logger.error(f"프로세서 로드 실패: {e}")
    raise

# 모델 로드 (양자화 없이)
try:
    model = WhisperForConditionalGeneration.from_pretrained(
        model_name,
        device_map="auto",  # 양자화 없이 로드
        torch_dtype=torch.float16  # float16 정밀도 사용
    )
    logger.info(f"모델 로드 완료: {model_name}")
except Exception as e:
    logger.error(f"모델 로드 실패: {e}")
    raise

# LoRA 설정
lora_config = LoraConfig(
    r=16,
    lora_alpha=32,
    target_modules=["q_proj", "v_proj"],
    lora_dropout=0.05,
    bias="none"
)

# 모델 준비
try:
    model = get_peft_model(model, lora_config)
    model.print_trainable_parameters()
except Exception as e:
    logger.error(f"LoRA 설정 실패: {e}")
    raise

# 메모리 정리
clear_memory()

# @markdown ## 7. 훈련 설정
# 메모리 최적화를 위한 배치 설정
batch_size = 2  # @param {type:"integer"}
gradient_accumulation_steps = 16  # @param {type:"integer"}
max_steps = 2000  # @param {type:"integer"}

training_args = Seq2SeqTrainingArguments(
    output_dir=output_dir,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    gradient_accumulation_steps=gradient_accumulation_steps,
    learning_rate=1e-5,
    warmup_ratio=0.1,
    max_steps=max_steps,  # 세션 제한 내 완료를 위해 조정
    fp16=True,
    eval_strategy="steps",  # 'evaluation_strategy' 대신 'eval_strategy' 사용
    eval_steps=500,
    save_steps=500,
    logging_steps=100,
    save_total_limit=2,  # 최근 2개 체크포인트만 저장
    load_best_model_at_end=True,
    metric_for_best_model="cer",
    greater_is_better=False,
    predict_with_generate=True,
    generation_max_length=225,
    push_to_hub=False,
    report_to=["tensorboard"],
    dataloader_num_workers=1,  # 메모리 사용량 감소를 위해 1로 설정
    dataloader_pin_memory=True,  # 메모리 효율성 향상
)

# @markdown ## 8. 평가 메트릭
cer_metric = evaluate.load("cer")

def compute_metrics(pred):
    try:
        pred_ids = pred.predictions
        label_ids = pred.label_ids

        # 특수 토큰 처리
        label_ids[label_ids == -100] = processor.tokenizer.pad_token_id

        # 디코딩
        pred_str = processor.batch_decode(pred_ids, skip_special_tokens=True)
        label_str = processor.batch_decode(label_ids, skip_special_tokens=True)

        # CER 계산
        cer = cer_metric.compute(predictions=pred_str, references=label_str)
        return {"cer": cer}
    except Exception as e:
        logger.error(f"메트릭 계산 실패: {e}")
        return {"cer": 1.0}  # 오류 시 최악의 점수 반환

# @markdown ## 9. 트레이너 설정
# Whisper 모델용 데이터 콜레이터
from dataclasses import dataclass
from typing import Dict, List, Union, Any
import torch

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # 입력 특성 처리
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # 레이블 처리
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # -100으로 패딩된 레이블 대체 (손실 계산에서 무시됨)
        labels = labels_batch["input_ids"].masked_fill(labels_batch["input_ids"] == self.processor.tokenizer.pad_token_id, -100)

        batch["labels"] = labels
        return batch

# 새 데이터 콜레이터 사용
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.tokenizer,
)

# @markdown ## 10. 훈련 실행 (체크포인트 복구 지원)
import glob

def find_latest_checkpoint(output_dir):
    """가장 최근 체크포인트 폴더 찾기"""
    checkpoints = glob.glob(os.path.join(output_dir, "checkpoint-*"))
    if not checkpoints:
        return None

    # 체크포인트 번호 기준으로 정렬
    checkpoints = sorted(checkpoints, key=lambda x: int(x.split("-")[-1]))
    return checkpoints[-1]  # 가장 큰 숫자(최신) 체크포인트 반환

# 체크포인트 확인
latest_checkpoint = find_latest_checkpoint(output_dir)

if latest_checkpoint:
    logger.info(f"최신 체크포인트 발견: {latest_checkpoint}")
    try:
        trainer.train(resume_from_checkpoint=latest_checkpoint)
        logger.info(f"체크포인트 {os.path.basename(latest_checkpoint)}에서 훈련 재개 성공")
    except Exception as e:
        logger.warning(f"체크포인트에서 재개 실패: {e}")
        logger.info("처음부터 훈련을 시작합니다.")
        trainer.train()
else:
    logger.info("체크포인트가 없습니다. 처음부터 훈련을 시작합니다.")
    trainer.train()

# @markdown ## 11. 최종 모델 저장
final_model_path = os.path.join(output_dir, "final_model")

# LoRA 어댑터 병합 (선택사항)
merge_lora = True  # @param {type:"boolean"}

try:
    if merge_lora:
        # LoRA 어댑터를 원본 모델과 병합
        # 먼저 현재 모델 저장
        adapter_path = os.path.join(output_dir, "lora_adapter")
        model.save_pretrained(adapter_path)

        # 메모리 정리
        del model
        clear_memory()

        # 원본 모델 로드
        logger.info("원본 모델 로드 중...")
        from peft import PeftModel
        base_model = WhisperForConditionalGeneration.from_pretrained(
            model_name,
            device_map="auto",
            torch_dtype=torch.float16  # float16 정밀도 사용
        )

        # LoRA 어댑터 로드 및 병합
        logger.info("LoRA 어댑터 병합 중...")
        peft_model = PeftModel.from_pretrained(base_model, adapter_path)
        merged_model = peft_model.merge_and_unload()  # 어댑터 병합

        # 메모리 정리
        del base_model, peft_model
        clear_memory()

        # 병합된 모델 저장
        logger.info(f"병합된 모델 저장 중: {final_model_path}")
        merged_model.save_pretrained(final_model_path)
        processor.save_pretrained(final_model_path)
        logger.info(f"LoRA 어댑터가 병합된 모델 저장 완료: {final_model_path}")

        # 메모리 정리
        del merged_model
        clear_memory()
    else:
        # LoRA 어댑터만 저장
        logger.info(f"LoRA 어댑터 모델 저장 중: {final_model_path}")
        model.save_pretrained(final_model_path)
        processor.save_pretrained(final_model_path)
        logger.info(f"LoRA 어댑터 모델 저장 완료: {final_model_path}")
        logger.warning("주의: 이 모델을 로드할 때는 LoRA 설정이 필요합니다.")

        # 메모리 정리
        del model
        clear_memory()
except Exception as e:
    logger.error(f"모델 저장 실패: {e}")
    # 최소한 LoRA 어댑터라도 저장
    emergency_path = os.path.join(output_dir, "emergency_save")
    os.makedirs(emergency_path, exist_ok=True)
    model.save_pretrained(emergency_path)
    processor.save_pretrained(emergency_path)
    logger.info(f"비상 저장 완료: {emergency_path}")

# @markdown ## 12. 모델 테스트 (선택사항)
test_model = True  # @param {type:"boolean"}

if test_model:
    from transformers import pipeline

    # 테스트할 오디오 파일 경로
    test_audio = "/content/drive/MyDrive/sample.wav"  # @param {type:"string"}

    if not os.path.exists(test_audio):
        logger.warning(f"테스트 오디오 파일을 찾을 수 없습니다: {test_audio}")
    else:
        try:
            # 파이프라인 생성
            pipe = pipeline(
                "automatic-speech-recognition",
                model=final_model_path,
                chunk_length_s=30,
                device=0 if torch.cuda.is_available() else -1,
            )

            # 테스트 실행
            result = pipe(test_audio, return_timestamps=True)
            logger.info(f"인식 결과: {result['text']}")
        except Exception as e:
            logger.error(f"모델 테스트 실패: {e}")

logger.info("파인튜닝 프로세스 완료!")
