### **1. 환경 설정 및 함수 정의**

In [None]:
import torch
import torchaudio
import torchcrepe
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
from glob import glob
from sklearn.preprocessing import StandardScaler


In [None]:
# 데이터 경로 설정
pcm_files = sorted(glob("E:/KsponSpeech/original/KsponSpeech_01/KsponSpeech_0001/*.pcm"))
txt_files = sorted(glob("E:/KsponSpeech/original/KsponSpeech_01/KsponSpeech_0001/*.txt"))

In [None]:
# PCM 파일 로드 함수
def load_pcm(file_path, sr=16000):
    #raw_audio = np.fromfile(file_path, dtype=np.int16).astype(np.float32)  # PCM int16 > float32 변환
    raw_audio = torch.from_numpy(np.fromfile(file_path, dtype=np.int16).astype(np.float32))
    audio = raw_audio / torch.iinfo(torch.int16).max  # 정규화 (-1 ~ 1)

    # 샘플링 레이트 16000으로 고정
    orig_sr = 16000  # PCM 파일의 기본 SR이 16000이라고 가정
    if orig_sr != sr:
        resampler = torchaudio.transforms.Resample(orig_freq=orig_sr, new_freq=sr)
        audio = resampler(audio)

    return audio


In [None]:
# 텍스트 전처리
import re

def clean_transcript(text):
    #  이중 전사 중 발음 스크립트 선택 ("(컴퓨터)/(컴퓨타)" → "컴퓨터")
    text = re.sub(r'\(([^)]+)\)/\(([^)]+)\)', r'(\2)', text)

    # 식별 불가 스크립트 제거 ("unk/나는" → "나는")
    text = re.sub(r'unk/\S+', '', text)

    # 노이즈 스크립트 제거 ("n/ o/ b/ u/ l/" 제거)
    text = re.sub(r'[nobul]/', '', text)

    # 특수문자 제거
    text = re.sub(r'[.,/!?;:\-()*+]', '', text)
    text = re.sub(r'\S+/', '', text)

    # 문장의 처음에 공백이 있거나 공백이 두 개 이상 이어져 있는 경우 공백을 하나로 정리
    text = re.sub(r'^\s+', '', text)  # 문장 처음의 공백 제거
    text = re.sub(r'\s+', ' ', text)  # 연속된 공백을 하나로 축소

    return text


In [None]:
def extract_features(audio, sr=16000):

    # MFCC 추출
    mfcc_transform = torchaudio.transforms.MFCC(
        sample_rate=sr,
        n_mfcc=13,
        melkwargs={
            "n_fft": 400,
            "hop_length": 160,
            "n_mels": 80
        }
    )
    mfcc = mfcc_transform(audio)

    # MFCC 정규화
    mfcc = normalize_mfcc(mfcc)

    # 스펙트로그램 변환
    spectorgram_transform = torchaudio.transforms.Spectrogram(
        n_fft=400,
        hop_length=160,
    )
    spectrogram = spectorgram_transform(audio)
    spectrogram_db = torchaudio.transforms.AmplitudeToDB()(spectrogram)

    # Pitch (음높이) 및 Energy (에너지)
    #pitch_transform = torchaudio.transforms.PitchShift(sr, n_steps=2)
    #pitch_values = pitch_transform(audio)
    pitch_values = 0  # 나중에 변경

    energy = torch.sqrt(torch.mean(audio**2, dim=-1))

    return mfcc, spectrogram_db, pitch_values, energy

def normalize_mfcc(mfcc):
  mean = torch.mean(mfcc, dim=1, keepdim=True)
  std = torch.std(mfcc, dim=1, keepdim=True)
  return (mfcc - mean) / (std + 1e-8)


In [None]:
# 데이터 로드 예시

audio_data = [load_pcm(file) for file in pcm_files]
texts = [open(txt, "r", encoding="cp949").read().strip() for txt in txt_files]

In [None]:
# 텍스트 전처리 출력 테스트
for text in texts:
  print(text)
  print(clean_transcript(text))

In [None]:
# 예제 데이터로 특징 추출
mfcc, spectrogram_db, pitch_mean, energy = extract_features(audio_data[0])
print("Text: ", texts[0])
print("MFCC Raw:", mfcc)
print("MFCC Shape:", mfcc.shape)
print("Spectrogram Shape:", spectrogram_db.shape)
print("Pitch Mean:", pitch_mean)
print("Energy:", energy)

Text:  아/ 몬 소리야, 그건 또. b/
MFCC Raw: tensor([[-1.3372e+00, -1.2596e+00, -1.2233e+00,  ..., -1.0976e+00,
         -1.0822e+00, -1.2013e+00],
        [-1.3993e+00, -8.7625e-01, -8.3103e-01,  ..., -4.9156e-01,
         -5.2939e-01, -4.6778e-01],
        [ 5.7234e-01,  9.0763e-01,  5.2310e-01,  ...,  5.3884e-01,
          7.1024e-01,  1.2474e+00],
        ...,
        [ 1.5656e+00, -4.1436e-01, -2.1543e-01,  ...,  8.1051e-01,
          1.9598e-01,  5.7674e-04],
        [ 9.5650e-02, -1.0997e+00, -2.7613e-01,  ...,  4.5922e-01,
          1.1356e+00,  1.1126e+00],
        [ 1.3364e+00,  1.5560e+00,  1.7766e+00,  ...,  1.4341e+00,
          1.4761e+00,  2.3825e+00]])
MFCC Shape: torch.Size([13, 315])
Spectrogram Shape: torch.Size([201, 315])
Pitch Mean: 0
Energy: tensor(0.0379)


### **2. 딥러닝 학습**

#### **2.1 데이터셋 클래스 정의**

In [None]:
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import torchaudio.transforms as transforms

class SpeechDataset(Dataset):
    def __init__(self, pcm_files, txt_files, sr=16000):
        self.pcm_files = pcm_files
        self.txt_files = txt_files
        self.sr = sr
        self.tokenizer = self.create_tokenizer() # 토크나이저 추가

    def __len__(self):
        return len(self.pcm_files)

    def __getitem__(self, idx):
        try:
            audio = load_pcm(self.pcm_files[idx], self.sr)
            text = open(self.txt_files[idx], "r", encoding="cp949").read().strip()
            text = clean_transcript(text)
            mfcc, spec, pitch, energy = extract_features(audio, self.sr)
            tokenized_text = self.tokenizer.encode(text) # 텍스트 토큰화
            return mfcc, torch.tensor(tokenized_text, dtype=torch.long) # 토큰화된 텍스트 반환
        except Exception as e:
            print(f"Error loading file {self.pcm_files[idx]}: {e}")
            return None

    def create_tokenizer(self): # 간단한 토크나이저 생성
        chars = set()
        for txt_file in self.txt_files:
            with open(txt_file, "r", encoding="cp949") as f:
                text = f.read()
                for char in text:
                    chars.add(char)
        char_to_int = {char: i + 1 for i, char in enumerate(sorted(list(chars)))} # 0은 padding에 사용
        int_to_char = {i + 1: char for i, char in enumerate(sorted(list(chars)))}
        return SimpleTokenizer(char_to_int, int_to_char)

class SimpleTokenizer: # 심플 토크나이저 클래스
    def __init__(self, char_to_int, int_to_char):
      self.char_to_int = {'<PAD>': -1, '<BLANK>': 0, **char_to_int}
      self.int_to_char = {-1: '<PAD>', 0: '<BLANK>', **int_to_char}

    def encode(self, text):
        return [self.char_to_int.get(char, 0) for char in text]

    def decode(self, tokens):
        return "".join([self.int_to_char[token] for token in tokens if token != 0])

def collate_fn(batch):
    batch = [item for item in batch if item is not None]
    if not batch:
        return torch.empty(0), torch.empty(0), torch.empty(0)
    mfccs = [item[0] for item in batch]
    texts = [item[1] for item in batch]

    max_len = max([mfcc.shape[1] for mfcc in mfccs])
    mfccs_padded = [torch.nn.functional.pad(mfcc, (0, max_len - mfcc.shape[1]), value=0) for mfcc in mfccs]
    mfccs_padded = torch.stack(mfccs_padded, dim=0)

    text_lengths = torch.tensor([len(t) for t in texts if len(t) > 0], dtype=torch.long) # text 길이 tensor 추가
    texts_padded = pad_sequence([t for t in texts if len(t) > 0], batch_first=True, padding_value=-1) # text padding 추가

    return mfccs_padded, texts_padded, text_lengths # text 길이 반환

# 데이터셋 생성
dataset = SpeechDataset(pcm_files, txt_files)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=collate_fn, num_workers=2)

# 토큰화 사전 출력
print("Tokenization Dictionary:")
print(dataset.tokenizer.char_to_int)



Tokenization Dictionary:
{'<PAD>': -1, '<BLANK>': 0, '\n': 1, ' ': 2, '!': 3, '%': 4, '(': 5, ')': 6, '*': 7, '+': 8, ',': 9, '.': 10, '/': 11, '0': 12, '1': 13, '2': 14, '3': 15, '4': 16, '5': 17, '6': 18, '7': 19, '8': 20, '9': 21, '?': 22, 'A': 23, 'B': 24, 'C': 25, 'D': 26, 'F': 27, 'G': 28, 'L': 29, 'M': 30, 'N': 31, 'O': 32, 'P': 33, 'R': 34, 'S': 35, 'T': 36, 'V': 37, 'X': 38, 'b': 39, 'c': 40, 'g': 41, 'k': 42, 'l': 43, 'n': 44, 'o': 45, 'u': 46, '가': 47, '각': 48, '간': 49, '갈': 50, '감': 51, '갑': 52, '값': 53, '갔': 54, '강': 55, '갖': 56, '같': 57, '갚': 58, '개': 59, '객': 60, '걍': 61, '걔': 62, '걘': 63, '거': 64, '걱': 65, '건': 66, '걷': 67, '걸': 68, '검': 69, '것': 70, '겄': 71, '게': 72, '겠': 73, '겨': 74, '격': 75, '겪': 76, '견': 77, '결': 78, '겸': 79, '겹': 80, '겼': 81, '경': 82, '계': 83, '고': 84, '곡': 85, '곤': 86, '골': 87, '곰': 88, '곱': 89, '곳': 90, '공': 91, '과': 92, '곽': 93, '관': 94, '광': 95, '괜': 96, '괴': 97, '굉': 98, '교': 99, '구': 100, '국': 101, '군': 102, '굳': 103, '굴': 104, '굿': 105, '궁':

#### **2.2 모델 구축 및 학습**

In [None]:
# Set Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
import torch.nn as nn
import torch.optim as optim

class SpeechRecognitionModel(nn.Module):
    def __init__(self, input_dim=13, hidden_dim=256, output_dim=100):
        super(SpeechRecognitionModel, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)

    def forward(self, x):
        x = x.permute(2, 0, 1)  # (batch, feature, time) > (time, batch, feature)
        x, _ = self.lstm(x)
        x = self.fc(x)
        x = torch.nn.functional.log_softmax(x, dim=2)

        if torch.isnan(x).any():
            print("Warning: NaN detected in model forward!")
        elif torch.isinf(x).any():
            print("Warning: Inf detected in model forward!")

        return x

    def reset_parameters(self):
      for layer in self.children():
        if hasattr(layer, 'reset_parameters'):
            layer.reset_parameters()


# 모델 생성
model = SpeechRecognitionModel().to(device)
criterion = nn.CTCLoss(blank=0, zero_infinity=True)
#optimizer = optim.RMSprop(model.parameters(), lr=1e-5
# optimizer = optim.Adam(model.parameters(), lr=1e-6, eps=1e-4)
# optimizer = optim.SGD(model.parameters(), lr=1e-5, momentum=0.9)
optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-3)
# scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=1e-5, steps_per_epoch=len(dataloader), epochs=10)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)


In [None]:
from torch.nn.utils.rnn import pad_sequence
import matplotlib.pyplot as plt

torch.autograd.set_detect_anomaly(True)

# 학습 루프 (10 Epoch)
for epoch in range(10):
    total_loss = 0
    i = 0
    for mfcc, text, text_lengths in dataloader:
        i += 1
        mfcc = mfcc.to(device)
        text = text.to(device)
        text_lengths = text_lengths.to(device)

        if torch.isnan(mfcc).any() or torch.isinf(mfcc).any():
          print("NaN/Inf detected in input MFCC! Skipping batch.")
          continue
        elif torch.isnan(text).any() or torch.isinf(text).any():
          print("NaN/Inf detected in input text! Skipping batch.")
          continue
        elif torch.isnan(text_lengths).any() or torch.isinf(text_lengths).any():
          print("NaN/Inf detected in input text_lengths! Skipping batch.")
          continue

        # 옵티마이저 Gradient 초기화
        optimizer.zero_grad()

        # 모델에 MFCC 입력하여 출력 얻기 (Forward)
        outputs = model(mfcc) # (time, batch, feature)

        # 모델 출력 NaN 체크
        if torch.isnan(outputs).any() or torch.isinf(outputs).any():
            print("Warning: NaN/Inf detected in model outputs! Skipping batch.")
            continue

        # 입력 길이 설정 (모델 출력 길이 기준)
        #input_lengths = torch.tensor([mfcc_i.size(1) for mfcc_i in mfcc], dtype=torch.long)
        #input_lengths = torch.tensor([mfcc.shape[1]] * mfcc.shape[0], dtype=torch.long).to(device)
        input_lengths = torch.tensor([outputs.size(0)] * outputs.size(1), dtype=torch.long).to(device)

        # 출력 시퀀스 설정
        target_lengths = torch.tensor([len(t[t != -1]) for t in text], dtype=torch.long).to(device)
        #target_lengths = text_lengths.to(device)

        targets = text[text != -1].to(device) # 패딩(-1) 제거

        #print("Text: ", text)
        # NaN 방지 체크
        if(input_lengths < target_lengths).any():
          print(f"Error: Some input_lengths ({input_lengths} are shorter than target_lengths ({target_lengths})!)")
          input_lengths = torch.maximum(input_lengths, target_lengths + 1)

        # CTC Loss 계산 (입력: 모델 출력, 타겟: 정답 텍스트, 입력 길이, 출력 길이)
        loss = criterion(outputs, targets, input_lengths, target_lengths)

        # CTC Loss NaN 체크
        if torch.isnan(loss).any() or torch.isinf(loss).any():
            print("Warning: NaN/Inf detected in loss computation!")
            continue

        # 역전파(Gradient 계산)
        loss.backward()

        # Gradient Clipping 적용
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        # 가중치 업데이트
        optimizer.step()

        total_loss += loss.item()

        # Print Count
        if (epoch + 1) % 1 == 0:
            print(f"Epoch [{epoch+1}/{10}], Step [{i}/{len(dataloader)}], Loss: {loss.item():.4f}")

    scheduler.step()

    # Epoch마다 Loss 출력
    print(f"Epoch {epoch+1}: Avg Loss = {total_loss / len(dataloader):.4f} Current Loss = {loss.item():.4f}")


Epoch [1/10], Step [1/250], Loss: 202.7964
Epoch [1/10], Step [2/250], Loss: 328.9559
Epoch [1/10], Step [3/250], Loss: 272.9131
Epoch [1/10], Step [4/250], Loss: 125.5943
Epoch [1/10], Step [5/250], Loss: 136.9655
Epoch [1/10], Step [6/250], Loss: 174.7416
Epoch [1/10], Step [7/250], Loss: 682.9363
Epoch [1/10], Step [8/250], Loss: 280.3847
Epoch [1/10], Step [9/250], Loss: 762.5361
Epoch [1/10], Step [10/250], Loss: 319.1257
Epoch [1/10], Step [11/250], Loss: 1344.4042
Epoch [1/10], Step [12/250], Loss: 198.5763
Epoch [1/10], Step [13/250], Loss: 1000.0983
Epoch [1/10], Step [14/250], Loss: 293.5516
Epoch [1/10], Step [15/250], Loss: 229.5682
Epoch [1/10], Step [16/250], Loss: 411.0673
Epoch [1/10], Step [17/250], Loss: 161.9754
Epoch [1/10], Step [18/250], Loss: 58.0655
Epoch [1/10], Step [19/250], Loss: 15.3333
Epoch [1/10], Step [20/250], Loss: 10.8804
Epoch [1/10], Step [21/250], Loss: 8.9048
Epoch [1/10], Step [22/250], Loss: 4.8591
Epoch [1/10], Step [23/250], Loss: 4.8716
Epoc

In [None]:
# 전체 모델 저장
torch.save(model, 'drive/MyDrive/ai-dataset/speech_model_full4.pth')
torch.save(model.state_dict(), 'drive/MyDrive/ai-dataset/speech_model4.pth')

In [None]:
# 학습 진행 상태 저장
checkpoint = {
    'epoch': epoch,  # 마지막 학습한 에포크
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': loss.item()
}
torch.save(checkpoint, '/content/drive/MyDrive/ai-dataset/speech_checkpoint4.pth')

In [None]:
"""
# 학습 진행 상태 불러오기
checkpoint = torch.load('/content/drive/MyDrive/speech_checkpoint.pth')

# 모델, 옵티마이저 로드
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

# 마지막 에포크 번호 확인
start_epoch = checkpoint['epoch'] + 1
print(f"Resuming training from epoch {start_epoch}")
"""

#### **2.4 평가**

In [None]:
import Levenshtein

def evaluate_model(model, audio_file, text_file, tokenizer, sr=16000):
    """
    학습된 모델을 활용하여 새로운 음성 파일과 텍스트 파일을 비교하여 정확도를 평가하는 함수.

    Args:
        model: 학습된 음성 인식 모델
        audio_file (str): 평가할 음성 파일 경로
        text_file (str): 해당 음성의 정답 텍스트 파일 경로
        tokenizer: 문자 기반 토크나이저
        sr (int): 샘플링 레이트 (기본값 16kHz)

    Returns:
        dict: {"predicted_text": str, "wer": float, "cer": float}
    """

    # 음성 파일 로드 및 특징 추출
    try:
        audio = load_pcm(audio_file, sr)  # PCM 파일 로드
        mfcc, _, _, _ = extract_features(audio, sr)  # MFCC 추출
        mfcc = mfcc.unsqueeze(0).to(next(model.parameters()).device)  # 배치 차원 추가 및 GPU로 이동
    except Exception as e:
        print(f"Error loading audio file {audio_file}: {e}")
        return None

    # 모델 추론
    model.eval()
    with torch.no_grad():
        output = model(mfcc)  # (T, N, C) 형태의 logits 출력
        output = output.permute(1, 0, 2)  # (N, T, C) 형태로 변환

    # CTC 디코딩 (가장 높은 확률의 인덱스를 선택)
    predicted_indices = torch.argmax(output, dim=2)  # (N, T) 형태
    predicted_tokens = [token.item() for token in predicted_indices[0]]

    # 중복 제거 (CTC 특성상 같은 문자가 연속적으로 나타나는 경우가 많음)
    def ctc_decode(tokens):
        decoded = []
        prev_token = None
        for token in tokens:
            if token != prev_token and token != 0:  # BLANK 토큰(0) 무시
                decoded.append(token)
            prev_token = token
        return decoded

    predicted_tokens = ctc_decode(predicted_tokens)
    predicted_text = tokenizer.decode(predicted_tokens)  # 토큰을 문자로 변환

    # 정답 텍스트 로드 및 토큰 변환
    with open(text_file, "r", encoding="cp949") as f:
        ground_truth_text = f.read().strip()
        ground_truth_text = clean_transcript(ground_truth_text)

    # WER (Word Error Rate) 및 CER (Character Error Rate) 계산
    def calculate_wer(ref, hyp):
        ref_words = ref.split()
        hyp_words = hyp.split()
        return Levenshtein.distance(ref_words, hyp_words) / max(len(ref_words), 1)

    def calculate_cer(ref, hyp):
        return Levenshtein.distance(ref, hyp) / max(len(ref), 1)

    wer = calculate_wer(ground_truth_text, predicted_text)
    cer = calculate_cer(ground_truth_text, predicted_text)

    # 결과 출력
    print("\n📌 Evaluation Result")
    print(f"🔹 Reference Text  : {ground_truth_text}")
    print(f"🔹 Predicted Text  : {predicted_text}")
    print(f"✅ WER: {wer:.4f}, CER: {cer:.4f}")

    return {"predicted_text": predicted_text, "wer": wer, "cer": cer}

In [None]:
#실제 평가

result = evaluate_model(model, pcm_files[0], txt_files[0], dataset.tokenizer)

print(result)


📌 Evaluation Result
🔹 Reference Text  : 아 몬 소리야 그건 또 
🔹 Predicted Text  :            !   !! ! ! ! D! ! !  !갑!  갑 갑   갑 갑 갑 !  
✅ WER: 3.0000, CER: 3.6154
{'predicted_text': '           !   !! ! ! ! D! ! !  !갑!  갑 갑   갑 갑 갑 !  ', 'wer': 3.0, 'cer': 3.6153846153846154}


In [None]:
"""
total_wer = 0
total_cer = 0
num_samples = 0

for pcm_file, txt_file in zip(eval_pcm_files, eval_txt_files):
    wer, cer = evaluate_single(model, pcm_file, txt_file, device, dataset.tokenizer) # dataset.tokenizer 추가
    total_wer += wer
    total_cer += cer
    num_samples += 1

avg_wer = total_wer / num_samples
avg_cer = total_cer / num_samples
print(f"WER: {avg_wer:.4f}, CER: {avg_cer:.4f}")
"""