In [22]:
# 데이터 절반씩하여 (변환 텍스트, no 변환 텍스트) -> 결과 체크

import os
import torch
import numpy as np
import glob
from transformers import WhisperProcessor, WhisperForConditionalGeneration, Seq2SeqTrainingArguments, Seq2SeqTrainer
from datasets import Dataset, DatasetDict
from dataclasses import dataclass
from typing import Dict, List, Union
import json
import random

os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

def load_text(file_path):
    """텍스트 파일 로드 (다양한 인코딩 지원)"""
    encodings = ['utf-8', 'cp949', 'euc-kr']
    for encoding in encodings:
        try:
            with open(file_path, 'r', encoding=encoding) as f:
                text = f.read().strip()
            return text
        except UnicodeDecodeError:
            continue
    return "텍스트 로드 실패"

def get_kspon01_files():
    """KsponSpeech_01 모든 파일들 가져오기"""
    print("🔍 KsponSpeech_01 파일 수집 중...")
    
    # 경로 설정 - 수정된 경로
    base_audio_dir = "PreprocessData/KsponSpeech_01"      # NPY 파일들
    base_text_dir = "TrainData/unzipped_Speech/KsponSpeech_01"  # 원본 텍스트
    base_g2p_dir = "PreprocessData/KsponSpeech_01"        # G2P 텍스트 (여기에 있었음!)
    
    all_files = []
    
    # KsponSpeech_0001 ~ KsponSpeech_0018 폴더들 처리
    for i in range(1, 19):
        if i < 10:
            folder_name = f"KsponSpeech_000{i}"
        else:
            folder_name = f"KsponSpeech_00{i}"
        
        print(f"  📁 {folder_name} 처리 중...")
        
        # 폴더 경로들
        audio_folder = os.path.join(base_audio_dir, folder_name)
        original_folder = os.path.join(base_text_dir, folder_name)
        g2p_folder = os.path.join(base_g2p_dir, f"{folder_name}_g2p")  # PreprocessData에서 찾기
        
        # 폴더 존재 확인
        if not all([os.path.exists(audio_folder), os.path.exists(original_folder), os.path.exists(g2p_folder)]):
            print(f"    ⚠️ 폴더 없음: 오디오={os.path.exists(audio_folder)}, 원본={os.path.exists(original_folder)}, G2P={os.path.exists(g2p_folder)}")
            continue
        
        # NPY 파일들 찾기
        npy_files = glob.glob(os.path.join(audio_folder, "*_combined_features.npy"))
        
        folder_files = []
        for npy_file in npy_files:
            # 파일명에서 기본 이름 추출
            base_name = os.path.basename(npy_file).replace('_combined_features.npy', '')
            
            # 대응되는 텍스트 파일들
            original_txt = os.path.join(original_folder, f"{base_name}.txt")
            g2p_txt = os.path.join(g2p_folder, f"{base_name}.txt")
            
            # 모든 파일이 존재하는지 확인
            if all([os.path.exists(npy_file), os.path.exists(original_txt), os.path.exists(g2p_txt)]):
                folder_files.append({
                    'audio': npy_file,
                    'original_txt': original_txt,
                    'g2p_txt': g2p_txt,
                    'base_name': base_name,
                    'folder': folder_name
                })
        
        print(f"    ✅ {len(folder_files)}개 파일 매칭됨")
        all_files.extend(folder_files)
    
    print(f"📊 총 {len(all_files)}개 파일 수집 완료")
    return all_files

def create_5_5_mixed_dataset():
    """5:5 비율로 혼합 데이터셋 생성"""
    print("🎯 KsponSpeech_01 5:5 혼합 데이터셋 생성")
    
    # 모든 파일 수집
    all_files = get_kspon01_files()
    
    if len(all_files) == 0:
        print("❌ 파일을 찾을 수 없습니다.")
        return None
    
    # 파일들을 셔플링
    random.shuffle(all_files)
    
    # 5:5로 분할
    split_point = len(all_files) // 2
    original_files = all_files[:split_point]
    g2p_files = all_files[split_point:]
    
    print(f"📋 원본 텍스트: {len(original_files)}개, 발음 텍스트: {len(g2p_files)}개")
    
    # 데이터셋 구성
    data = {"audio": [], "text": [], "text_type": []}
    
    # 원본 텍스트 데이터 추가
    for file_info in original_files:
        try:
            original_text = load_text(file_info['original_txt'])
            if original_text != "텍스트 로드 실패":
                data["audio"].append(file_info['audio'])
                data["text"].append(original_text)
                data["text_type"].append("original")
        except Exception as e:
            print(f"원본 텍스트 로드 실패: {e}")
    
    # 발음 텍스트 데이터 추가
    for file_info in g2p_files:
        try:
            g2p_text = load_text(file_info['g2p_txt'])
            if g2p_text != "텍스트 로드 실패":
                data["audio"].append(file_info['audio'])
                data["text"].append(g2p_text)
                data["text_type"].append("g2p")
        except Exception as e:
            print(f"발음 텍스트 로드 실패: {e}")
    
    # 최종 통계
    total_count = len(data['audio'])
    original_count = data['text_type'].count('original')
    g2p_count = data['text_type'].count('g2p')
    
    print(f"📊 총 {total_count}개 - 원본: {original_count}개, 발음: {g2p_count}개")
    
    return Dataset.from_dict(data)

def map_to_array(batch):
    """NPY 파일을 오디오 배열로 변환"""
    arrays = []
    rates = []

    for audio_path in batch["audio"]:
        try:
            audio_array = np.load(audio_path)
            
            if audio_array.dtype in [np.int16, np.int8]:
                max_value = float(2 ** (15 if audio_array.dtype == np.int16 else 7))
                audio_array = audio_array.astype(np.float32) / max_value
            elif audio_array.dtype != np.float32:
                audio_array = audio_array.astype(np.float32)
            
            arrays.append(audio_array)
            rates.append(16000)
            
        except Exception as e:
            print(f"오디오 로드 오류: {e}")
            continue

    batch["audio"] = [{"array": arr, "sampling_rate": sr} for arr, sr in zip(arrays, rates)]
    return batch

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: WhisperProcessor

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        batch = self.processor.feature_extractor.pad(
            {"input_features": [feature["input_features"] for feature in features]},
            return_tensors="pt"
        )

        labels_batch = self.processor.tokenizer.pad(
            {"input_ids": [feature["labels"] for feature in features]},
            return_tensors="pt"
        )

        labels = labels_batch["input_ids"].masked_fill(
            labels_batch["input_ids"] == self.processor.tokenizer.pad_token_id,
            -100
        )

        batch["labels"] = labels
        return batch

def train_mixed_5_5_whisper():
    """5:5 혼합 데이터로 Whisper 모델 학습"""
    output_dir = "whisper_mixed_5_5_finetuned"
    
    print("🚀 KsponSpeech_01 5:5 혼합 학습 시작")
    
    # 1. 혼합 데이터셋 생성
    dataset = create_5_5_mixed_dataset()
    
    if dataset is None or len(dataset) == 0:
        print("❌ 데이터셋 생성 실패")
        return None, None
    
    # 2. 오디오 데이터 전처리
    print("🔄 오디오 데이터 전처리 중...")
    dataset = dataset.map(map_to_array, batched=True, batch_size=8)
    dataset = dataset.filter(lambda x: len(x["audio"]) > 0)
    
    print(f"✅ 최종 데이터셋 크기: {len(dataset)}개")

    # 3. 데이터셋 분할
    train_test_valid = dataset.train_test_split(test_size=0.2, seed=42)
    test_valid = train_test_valid["test"].train_test_split(test_size=0.5, seed=42)

    datasets = DatasetDict({
        "train": train_test_valid["train"],
        "test": test_valid["test"],
        "validation": test_valid["train"]
    })

    print(f"📊 학습: {len(datasets['train'])}개, 검증: {len(datasets['validation'])}개")

    # 4. 모델 및 프로세서 로드
    print("🤖 모델 로드 중...")
    model = WhisperForConditionalGeneration.from_pretrained(
        "openai/whisper-small", 
        use_cache=False,
        low_cpu_mem_usage=True
    )
    
    processor = WhisperProcessor.from_pretrained("openai/whisper-small")

    def prepare_dataset(batch):
        """데이터 전처리"""
        audio = batch["audio"]
        
        input_features = processor.feature_extractor(
            audio["array"],
            sampling_rate=audio["sampling_rate"]
        ).input_features[0]
        
        if isinstance(input_features, np.ndarray):
            input_features = torch.from_numpy(input_features)
        
        input_features = input_features.to(model.dtype)
        
        labels = processor.tokenizer(
            batch["text"],
            truncation=True,
            max_length=448,
            padding=False
        ).input_ids
        
        return {
            "input_features": input_features,
            "labels": labels
        }

    processed_datasets = DatasetDict({
        split: dataset.map(
            prepare_dataset,
            remove_columns=dataset.column_names
        )
        for split, dataset in datasets.items()
    })

    data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

    # 5. 훈련 설정
    device_capability = torch.cuda.get_device_capability() if torch.cuda.is_available() else (0, 0)
    supports_bf16 = device_capability >= (8, 0)
    
    training_args = Seq2SeqTrainingArguments(
        output_dir=output_dir,
        per_device_train_batch_size=4,
        per_device_eval_batch_size=2,
        gradient_accumulation_steps=4,
        learning_rate=1e-6,
        warmup_steps=300,
        max_steps=4000,
        gradient_checkpointing=True,
        bf16=supports_bf16,
        fp16=False if supports_bf16 else True,
        dataloader_pin_memory=False,
        evaluation_strategy="steps",
        eval_steps=400,
        save_steps=400,
        logging_steps=50,
        load_best_model_at_end=True,
        metric_for_best_model="eval_loss",
        greater_is_better=False,
        save_total_limit=3,
        report_to=["tensorboard"],
        push_to_hub=False,
        remove_unused_columns=False,
    )

    trainer = Seq2SeqTrainer(
        model=model,
        args=training_args,
        train_dataset=processed_datasets["train"],
        eval_dataset=processed_datasets["validation"],
        data_collator=data_collator,
        tokenizer=processor.tokenizer,
    )

    print("🚀 학습 시작...")
    trainer.train()

    print("💾 모델 저장 중...")
    trainer.save_model(output_dir)
    processor.save_pretrained(output_dir)

    return model, processor

if __name__ == "__main__":
    print("🎯 KsponSpeech_01 5:5 혼합 학습")
    
    # 시드 설정
    random.seed(42)
    torch.manual_seed(42)
    
    # 학습 실행
    model, processor = train_mixed_5_5_whisper()
    
    if model is None:
        print("❌ 학습 실패")
    else:
        print("✅ 학습 완료!")

🎯 KsponSpeech_01 5:5 혼합 학습
🚀 KsponSpeech_01 5:5 혼합 학습 시작
🎯 KsponSpeech_01 5:5 혼합 데이터셋 생성
🔍 KsponSpeech_01 파일 수집 중...
  📁 KsponSpeech_0001 처리 중...
    ✅ 997개 파일 매칭됨
  📁 KsponSpeech_0002 처리 중...
    ✅ 160개 파일 매칭됨
  📁 KsponSpeech_0003 처리 중...
    ✅ 1000개 파일 매칭됨
  📁 KsponSpeech_0004 처리 중...
    ✅ 1000개 파일 매칭됨
  📁 KsponSpeech_0005 처리 중...
    ✅ 1000개 파일 매칭됨
  📁 KsponSpeech_0006 처리 중...
    ✅ 37개 파일 매칭됨
  📁 KsponSpeech_0007 처리 중...
    ✅ 1000개 파일 매칭됨
  📁 KsponSpeech_0008 처리 중...
    ✅ 937개 파일 매칭됨
  📁 KsponSpeech_0009 처리 중...
    ✅ 834개 파일 매칭됨
  📁 KsponSpeech_0010 처리 중...
    ✅ 376개 파일 매칭됨
  📁 KsponSpeech_0011 처리 중...
    ✅ 1000개 파일 매칭됨
  📁 KsponSpeech_0012 처리 중...
    ✅ 961개 파일 매칭됨
  📁 KsponSpeech_0013 처리 중...
    ✅ 1000개 파일 매칭됨
  📁 KsponSpeech_0014 처리 중...
    ✅ 861개 파일 매칭됨
  📁 KsponSpeech_0015 처리 중...
    ✅ 65개 파일 매칭됨
  📁 KsponSpeech_0016 처리 중...
    ✅ 1개 파일 매칭됨
  📁 KsponSpeech_0017 처리 중...
    ✅ 1000개 파일 매칭됨
  📁 KsponSpeech_0018 처리 중...
    ✅ 484개 파일 매칭됨
📊 총 12713개 파일 수집 완료
📋 원본 텍스트: 6356

Map: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████| 12713/12713 [00:06<00:00, 2074.32 examples/s]
Filter: 100%|███████████████████████████████████████████████████████████████████████████████████████████████| 12713/12713 [00:39<00:00, 321.35 examples/s]


✅ 최종 데이터셋 크기: 12713개
📊 학습: 10170개, 검증: 1271개
🤖 모델 로드 중...


Map:  30%|█████████████████████████████▌                                                                      | 3004/10170 [10:20<24:39,  4.84 examples/s]


KeyboardInterrupt: 

TrainData G2P 폴더 상세 확인:
G2P 폴더 18개:
  KsponSpeech_0001_g2p: 1000개 파일
  KsponSpeech_0002_g2p: 0개 파일
  KsponSpeech_0003_g2p: 0개 파일
  KsponSpeech_0004_g2p: 0개 파일
  KsponSpeech_0005_g2p: 0개 파일
  KsponSpeech_0006_g2p: 0개 파일
  KsponSpeech_0007_g2p: 0개 파일
  KsponSpeech_0008_g2p: 0개 파일
  KsponSpeech_0009_g2p: 0개 파일
  KsponSpeech_0010_g2p: 0개 파일
  KsponSpeech_0011_g2p: 0개 파일
  KsponSpeech_0012_g2p: 0개 파일
  KsponSpeech_0013_g2p: 0개 파일
  KsponSpeech_0014_g2p: 0개 파일
  KsponSpeech_0015_g2p: 0개 파일
  KsponSpeech_0016_g2p: 0개 파일
  KsponSpeech_0017_g2p: 0개 파일
  KsponSpeech_0018_g2p: 0개 파일

총 G2P 텍스트 파일: 1000개

매칭 가능한 데이터 확인:
  KsponSpeech_0001: 오디오=997, 원본=1000, G2P=1000 → 매칭=997

실제 매칭 가능한 전체 데이터: 997개


In [2]:
'''
실시간 CER/WER 300스텝마다 출력
최종 테스트 결과 자동 저장
PT 파일 재사용을 위한 저장
구조 보존 강화된 생성 설정
'''

import os
import torch
import numpy as np
import glob
from transformers import WhisperProcessor, WhisperForConditionalGeneration, Seq2SeqTrainingArguments, Seq2SeqTrainer
from datasets import Dataset, DatasetDict
from dataclasses import dataclass
from typing import Dict, List, Union
import json
import random
import re
from jiwer import wer, cer

os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

def load_text(file_path):
    """텍스트 파일 로드 (다양한 인코딩 지원)"""
    encodings = ['utf-8', 'cp949', 'euc-kr']
    for encoding in encodings:
        try:
            with open(file_path, 'r', encoding=encoding) as f:
                text = f.read().strip()
            return text
        except UnicodeDecodeError:
            continue
    return "텍스트 로드 실패"

def preprocess_g2p_text(text):
    """구조 보존을 위한 G2P 텍스트 전처리"""
    # 공백 정리
    text = re.sub(r'\s+', ' ', text)
    
    # 문장 경계 명시화 (선택적)
    # text = re.sub(r'([.!?])', r'\1 /', text)
    
    # 반복되는 특수문자 정리
    text = re.sub(r'([+*/])\1+', r'\1', text)
    
    return text.strip()

def get_kspon01_files():
    """KsponSpeech_01 모든 파일들 가져오기"""
    print("🔍 KsponSpeech_01 파일 수집 중...")
    
    base_audio_dir = "PreprocessData/KsponSpeech_01"
    base_text_dir = "TrainData/unzipped_Speech/KsponSpeech_01"
    base_g2p_dir = "PreprocessData/KsponSpeech_01"
    
    all_files = []
    
    for i in range(1, 19):
        folder_name = f"KsponSpeech_000{i}" if i < 10 else f"KsponSpeech_00{i}"
        print(f"  📁 {folder_name} 처리 중...")
        
        audio_folder = os.path.join(base_audio_dir, folder_name)
        original_folder = os.path.join(base_text_dir, folder_name)
        g2p_folder = os.path.join(base_g2p_dir, f"{folder_name}_g2p")
        
        if not all([os.path.exists(audio_folder), os.path.exists(original_folder), os.path.exists(g2p_folder)]):
            print(f"    ⚠️ 폴더 없음")
            continue
        
        npy_files = glob.glob(os.path.join(audio_folder, "*_combined_features.npy"))
        
        folder_files = []
        for npy_file in npy_files:
            base_name = os.path.basename(npy_file).replace('_combined_features.npy', '')
            original_txt = os.path.join(original_folder, f"{base_name}.txt")
            g2p_txt = os.path.join(g2p_folder, f"{base_name}.txt")
            
            if all([os.path.exists(npy_file), os.path.exists(original_txt), os.path.exists(g2p_txt)]):
                folder_files.append({
                    'audio': npy_file,
                    'original_txt': original_txt,
                    'g2p_txt': g2p_txt,
                    'base_name': base_name,
                    'folder': folder_name
                })
        
        print(f"    ✅ {len(folder_files)}개 파일 매칭됨")
        all_files.extend(folder_files)
    
    print(f"📊 총 {len(all_files)}개 파일 수집 완료")
    return all_files

def create_phonetic_only_dataset():
    """100% 발음 텍스트 데이터셋 생성"""
    print("🎯 100% 발음 텍스트 데이터셋 생성 (구조 보존 강화)")
    
    all_files = get_kspon01_files()
    if len(all_files) == 0:
        return None
    
    random.shuffle(all_files)
    
    data = {"audio": [], "text": [], "original_text": []}
    
    # 100% G2P 텍스트만 사용
    for file_info in all_files:
        try:
            g2p_text = load_text(file_info['g2p_txt'])
            original_text = load_text(file_info['original_txt'])
            
            if g2p_text != "텍스트 로드 실패" and original_text != "텍스트 로드 실패":
                # G2P 텍스트 전처리 (구조 보존)
                processed_g2p = preprocess_g2p_text(g2p_text)
                
                data["audio"].append(file_info['audio'])
                data["text"].append(processed_g2p)  # 학습용 (발음 텍스트)
                data["original_text"].append(original_text)  # 평가용 (원본 텍스트)
                
        except Exception as e:
            print(f"텍스트 로드 실패: {e}")
    
    total_count = len(data['audio'])
    print(f"📊 총 {total_count}개 발음 텍스트 데이터 생성")
    
    return Dataset.from_dict(data)

def map_to_array(batch):
    """NPY 파일을 오디오 배열로 변환"""
    arrays = []
    rates = []

    for audio_path in batch["audio"]:
        try:
            audio_array = np.load(audio_path)
            
            if audio_array.dtype in [np.int16, np.int8]:
                max_value = float(2 ** (15 if audio_array.dtype == np.int16 else 7))
                audio_array = audio_array.astype(np.float32) / max_value
            elif audio_array.dtype != np.float32:
                audio_array = audio_array.astype(np.float32)
            
            arrays.append(audio_array)
            rates.append(16000)
            
        except Exception as e:
            print(f"오디오 로드 오류: {e}")
            continue

    batch["audio"] = [{"array": arr, "sampling_rate": sr} for arr, sr in zip(arrays, rates)]
    return batch

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: WhisperProcessor

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        batch = self.processor.feature_extractor.pad(
            {"input_features": [feature["input_features"] for feature in features]},
            return_tensors="pt"
        )

        labels_batch = self.processor.tokenizer.pad(
            {"input_ids": [feature["labels"] for feature in features]},
            return_tensors="pt"
        )

        labels = labels_batch["input_ids"].masked_fill(
            labels_batch["input_ids"] == self.processor.tokenizer.pad_token_id,
            -100
        )

        batch["labels"] = labels
        return batch

def compute_metrics(eval_pred, processor, original_texts):
    """CER/WER 계산"""
    predictions, labels = eval_pred
    
    # 라벨에서 -100을 제거
    labels = np.where(labels != -100, labels, processor.tokenizer.pad_token_id)
    
    # 디코딩
    decoded_preds = processor.tokenizer.batch_decode(predictions, skip_special_tokens=True)
    decoded_labels = processor.tokenizer.batch_decode(labels, skip_special_tokens=True)
    
    # CER/WER 계산 (오류 처리 포함)
    try:
        # 빈 예측 처리
        filtered_preds = []
        filtered_labels = []
        
        for pred, label in zip(decoded_preds, decoded_labels):
            if pred.strip() and label.strip():
                filtered_preds.append(pred.strip())
                filtered_labels.append(label.strip())
        
        if len(filtered_preds) > 0:
            cer_score = cer(filtered_labels, filtered_preds) * 100
            wer_score = wer(filtered_labels, filtered_preds) * 100
        else:
            cer_score = 100.0
            wer_score = 100.0
            
    except Exception as e:
        print(f"메트릭 계산 오류: {e}")
        cer_score = 100.0
        wer_score = 100.0
    
    return {
        "cer": cer_score,
        "wer": wer_score
    }

class PhoneticWhisperTrainer(Seq2SeqTrainer):
    """CER/WER 평가가 포함된 커스텀 트레이너"""
    
    def __init__(self, original_texts=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.original_texts = original_texts
        
    def evaluate(self, eval_dataset=None, ignore_keys=None, metric_key_prefix="eval"):
        """평가 시 CER/WER 계산"""
        print("📊 CER/WER 평가 중...")
        
        eval_dataset = eval_dataset if eval_dataset is not None else self.eval_dataset
        
        # 기본 평가
        output = super().evaluate(eval_dataset, ignore_keys, metric_key_prefix)
        
        # CER/WER 계산을 위한 샘플 예측
        try:
            eval_dataloader = self.get_eval_dataloader(eval_dataset)
            sample_batch = next(iter(eval_dataloader))
            
            # GPU로 이동
            if torch.cuda.is_available():
                sample_batch = {k: v.cuda() if isinstance(v, torch.Tensor) else v 
                              for k, v in sample_batch.items()}
            
            # 예측 생성
            self.model.eval()
            with torch.no_grad():
                generated_ids = self.model.generate(
                    sample_batch["input_features"],
                    max_length=448,
                    num_beams=2,
                    repetition_penalty=2.5,  # 구조 보존 강화
                    no_repeat_ngram_size=4,
                    length_penalty=1.2,
                    early_stopping=True,
                    do_sample=False,
                )
            
            # 메트릭 계산
            metrics = compute_metrics(
                (generated_ids.cpu().numpy(), sample_batch["labels"].cpu().numpy()),
                self.tokenizer,
                self.original_texts
            )
            
            # 결과 추가
            output.update({f"{metric_key_prefix}_{k}": v for k, v in metrics.items()})
            
            print(f"📈 CER: {metrics['cer']:.2f}%, WER: {metrics['wer']:.2f}%")
            
        except Exception as e:
            print(f"⚠️ CER/WER 계산 오류: {e}")
        
        return output

def train_phonetic_whisper():
    """100% 발음 텍스트로 Whisper 모델 학습"""
    output_dir = "whisper_phonetic_only_finetuned"
    pt_save_path = "whisper_phonetic_model.pt"
    
    print("🚀 100% 발음 텍스트 Whisper 학습 시작")
    print("🎯 목표: 구조 보존 + 발음 패턴 학습")
    
    # 1. 발음 전용 데이터셋 생성
    dataset = create_phonetic_only_dataset()
    
    if dataset is None or len(dataset) == 0:
        print("❌ 데이터셋 생성 실패")
        return None, None
    
    # 2. 오디오 데이터 전처리
    print("🔄 오디오 데이터 전처리 중...")
    dataset = dataset.map(map_to_array, batched=True, batch_size=8)
    dataset = dataset.filter(lambda x: len(x["audio"]) > 0)
    
    print(f"✅ 최종 데이터셋 크기: {len(dataset)}개")

    # 3. 데이터셋 분할
    train_test_valid = dataset.train_test_split(test_size=0.2, seed=42)
    test_valid = train_test_valid["test"].train_test_split(test_size=0.5, seed=42)

    datasets = DatasetDict({
        "train": train_test_valid["train"],
        "test": test_valid["test"],
        "validation": test_valid["train"]
    })

    print(f"📊 학습: {len(datasets['train'])}개, 검증: {len(datasets['validation'])}개, 테스트: {len(datasets['test'])}개")

    # 4. 모델 및 프로세서 로드
    print("🤖 모델 로드 중...")
    model = WhisperForConditionalGeneration.from_pretrained(
        "openai/whisper-small", 
        use_cache=False,
        low_cpu_mem_usage=True
    )
    
    processor = WhisperProcessor.from_pretrained("openai/whisper-small")

    def prepare_dataset(batch):
        """데이터 전처리"""
        audio = batch["audio"]
        
        input_features = processor.feature_extractor(
            audio["array"],
            sampling_rate=audio["sampling_rate"]
        ).input_features[0]
        
        if isinstance(input_features, np.ndarray):
            input_features = torch.from_numpy(input_features)
        
        input_features = input_features.to(model.dtype)
        
        # 발음 텍스트로 라벨 생성
        labels = processor.tokenizer(
            batch["text"],
            truncation=True,
            max_length=448,
            padding=False
        ).input_ids
        
        return {
            "input_features": input_features,
            "labels": labels
        }

    processed_datasets = DatasetDict({
        split: dataset.map(
            prepare_dataset,
            remove_columns=dataset.column_names
        )
        for split, dataset in datasets.items()
    })

    data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

    # 5. 훈련 설정 (구조 보존 강화)
    device_capability = torch.cuda.get_device_capability() if torch.cuda.is_available() else (0, 0)
    supports_bf16 = device_capability >= (8, 0)
    
    training_args = Seq2SeqTrainingArguments(
        output_dir=output_dir,
        per_device_train_batch_size=4,
        per_device_eval_batch_size=2,
        gradient_accumulation_steps=4,
        learning_rate=5e-7,              # 매우 낮은 학습률 (구조 보존)
        warmup_steps=500,                # 긴 워밍업
        max_steps=3000,                  # 적절한 스텝 수
        gradient_checkpointing=True,
        bf16=supports_bf16,
        fp16=False if supports_bf16 else True,
        dataloader_pin_memory=False,
        evaluation_strategy="steps",
        eval_steps=300,                  # 300스텝마다 평가 (CER/WER 계산)
        save_steps=300,
        logging_steps=50,
        load_best_model_at_end=True,
        metric_for_best_model="eval_cer", # CER 기준으로 최적 모델 선택
        greater_is_better=False,
        save_total_limit=3,
        report_to=["tensorboard"],
        push_to_hub=False,
        remove_unused_columns=False,
        # 구조 보존을 위한 생성 설정
        generation_max_length=448,
        generation_num_beams=2,
        predict_with_generate=True,
    )

    # 6. 커스텀 트레이너 (CER/WER 포함)
    trainer = PhoneticWhisperTrainer(
        model=model,
        args=training_args,
        train_dataset=processed_datasets["train"],
        eval_dataset=processed_datasets["validation"],
        data_collator=data_collator,
        tokenizer=processor.tokenizer,
        original_texts=[item["original_text"] for item in datasets["validation"]],
    )

    print("🚀 발음 전용 학습 시작...")
    print("📊 평가 주기: 300스텝마다 CER/WER 계산")
    trainer.train()

    print("💾 모델 저장 중...")
    
    # 1. Hugging Face 형식으로 저장
    trainer.save_model(output_dir)
    processor.save_pretrained(output_dir)
    
    # 2. PT 파일로 저장 (재사용 용이)
    print(f"💾 PT 파일 저장 중: {pt_save_path}")
    torch.save({
        'model_state_dict': model.state_dict(),
        'processor_config': processor.to_dict() if hasattr(processor, 'to_dict') else None,
        'model_config': model.config.to_dict(),
        'training_args': training_args.to_dict(),
        'vocab_size': len(processor.tokenizer),
        'training_type': 'phonetic_only_structure_preserved'
    }, pt_save_path)
    
    # 3. 최종 테스트 (CER/WER)
    print("🧪 최종 테스트 중...")
    test_results = trainer.evaluate(processed_datasets["test"], metric_key_prefix="test")
    
    # 4. 결과 저장
    final_stats = {
        "training_type": "phonetic_only_structure_preserved",
        "total_samples": len(dataset),
        "learning_rate": training_args.learning_rate,
        "max_steps": training_args.max_steps,
        "final_test_cer": test_results.get("test_cer", "N/A"),
        "final_test_wer": test_results.get("test_wer", "N/A"),
        "model_path": output_dir,
        "pt_path": pt_save_path
    }
    
    with open(os.path.join(output_dir, "phonetic_training_results.json"), "w", encoding='utf-8') as f:
        json.dump(final_stats, f, indent=2, ensure_ascii=False)
    
    print("✅ 발음 전용 학습 완료!")
    print(f"📊 최종 결과:")
    print(f"   CER: {final_stats['final_test_cer']}")
    print(f"   WER: {final_stats['final_test_wer']}")
    print(f"   HF 모델: {output_dir}")
    print(f"   PT 파일: {pt_save_path}")
    
    return model, processor

def load_phonetic_model(pt_path="whisper_phonetic_model.pt"):
    """저장된 PT 파일에서 모델 로드"""
    print(f"📁 PT 파일에서 모델 로드: {pt_path}")
    
    if not os.path.exists(pt_path):
        print(f"❌ PT 파일이 없습니다: {pt_path}")
        return None, None
    
    # PT 파일 로드
    checkpoint = torch.load(pt_path, map_location='cpu')
    
    # 모델 초기화
    model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small")
    processor = WhisperProcessor.from_pretrained("openai/whisper-small")
    
    # 상태 로드
    model.load_state_dict(checkpoint['model_state_dict'])
    
    if torch.cuda.is_available():
        model = model.cuda()
    
    print("✅ PT 파일에서 모델 로드 완료!")
    print(f"   훈련 타입: {checkpoint.get('training_type', 'unknown')}")
    print(f"   학습률: {checkpoint.get('training_args', {}).get('learning_rate', 'unknown')}")
    
    return model, processor

if __name__ == "__main__":
    print("🎯 100% 발음 텍스트 Whisper 학습 (구조 보존 + CER/WER)")
    
    # 시드 설정
    random.seed(42)
    torch.manual_seed(42)
    
    # 학습 실행
    model, processor = train_phonetic_whisper()
    
    if model is None:
        print("❌ 학습 실패")
    else:
        print("✅ 학습 완료!")

🎯 100% 발음 텍스트 Whisper 학습 (구조 보존 + CER/WER)
🚀 100% 발음 텍스트 Whisper 학습 시작
🎯 목표: 구조 보존 + 발음 패턴 학습
🎯 100% 발음 텍스트 데이터셋 생성 (구조 보존 강화)
🔍 KsponSpeech_01 파일 수집 중...
  📁 KsponSpeech_0001 처리 중...
    ✅ 997개 파일 매칭됨
  📁 KsponSpeech_0002 처리 중...
    ✅ 160개 파일 매칭됨
  📁 KsponSpeech_0003 처리 중...
    ✅ 1000개 파일 매칭됨
  📁 KsponSpeech_0004 처리 중...
    ✅ 1000개 파일 매칭됨
  📁 KsponSpeech_0005 처리 중...
    ✅ 1000개 파일 매칭됨
  📁 KsponSpeech_0006 처리 중...
    ✅ 37개 파일 매칭됨
  📁 KsponSpeech_0007 처리 중...
    ✅ 1000개 파일 매칭됨
  📁 KsponSpeech_0008 처리 중...
    ✅ 937개 파일 매칭됨
  📁 KsponSpeech_0009 처리 중...
    ✅ 834개 파일 매칭됨
  📁 KsponSpeech_0010 처리 중...
    ✅ 376개 파일 매칭됨
  📁 KsponSpeech_0011 처리 중...
    ✅ 1000개 파일 매칭됨
  📁 KsponSpeech_0012 처리 중...
    ✅ 961개 파일 매칭됨
  📁 KsponSpeech_0013 처리 중...
    ✅ 1000개 파일 매칭됨
  📁 KsponSpeech_0014 처리 중...
    ✅ 861개 파일 매칭됨
  📁 KsponSpeech_0015 처리 중...
    ✅ 65개 파일 매칭됨
  📁 KsponSpeech_0016 처리 중...
    ✅ 1개 파일 매칭됨
  📁 KsponSpeech_0017 처리 중...
    ✅ 1000개 파일 매칭됨
  📁 KsponSpeech_0018 처리 중...
    ✅ 484개 파일 

Map: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████| 12713/12713 [00:05<00:00, 2127.90 examples/s]
Filter:  16%|███████████████                                                                                 | 2000/12713 [00:06<00:35, 301.80 examples/s]


KeyboardInterrupt: 