In [6]:
import os
os.environ["TORCHAUDIO_BACKEND"] = "sox_io"  # sox_io 백엔드를 사용하도록 지정
import torch
import torch.nn as nn
import torch.optim as optim
import torchaudio
from torchaudio.datasets import SPEECHCOMMANDS
from torch.utils.data import DataLoader

# 1. 데이터셋 클래스 정의 - 데이터의 서브셋만 처리함 
# SPEECHCOMMANDS라는 torchaudio의 기본 데이터셋 클래스를 상속받아 SubsetSC라는 새로운 클래스를 정의
# 이 클래스는 데이터셋의 일부분(예: 훈련, 검증, 테스트)을 선택적으로 로드하는 기능을 추가
# SubsetSC 클래스는 SPEECHCOMMANDS(PyTorch Dataset)을 상속받은 클래스로 (waveform.shape, sample_rate, label)를 반환 
class SubsetSC(SPEECHCOMMANDS):
    def __init__(self, subset: str = None): # 클래스 생성자 (초기화 메서드), subset 매개변수는 어떤 부분집합을 로드할지 지정하는 문자열
        super().__init__("./data", download=True) # 부모 클래스 SPEECHCOMMANDS의 초기화 메소드를 호출, 데이터셋을 "./data" 디렉터리에 다운로드 및 저장하며, 필요시 자동으로 다운로드하도록 설정
        def load_list(filename): # 주어진 파일 이름을 기반으로 파일 목록을 로드하는 내부 함수 
            filepath = os.path.join(self._path, filename) # os.path.join()은 문자열끼리 조합해서 path를 생성함 ({./data}/{valiation.txt})
            if not os.path.exists(filepath): # os.path.exists()는 true/false 반환. 지정한 경로에 파일이 존재하지 않으면 경고 메시지를 출력하고, 빈 리스트를 반환. 즉, 파일이 없으면 건너 뜀
                print(f"File {filepath} not found. Skipping...")
                return [] # filepath가 존재하지 않으면 빈 리스트를 반환하고 load_list함수를 끝냄.
            with open(filepath) as f: # filepath에 해당하는 파일을 오픈함 
                return [os.path.join(self._path, line.strip()) for line in f] # load_list의 최종 리턴 결과 : ({./data/.../xxx.wav})
        if subset == "validation": # subset 매개변수가 "validation"이면, validation_list.txt 파일을 로드하여 _walker에 저장
            self._walker = load_list("validation_list.txt")
        elif subset == "testing": # subset이 "testing"인 경우,testing_list.txt를 로드하여 _walker에 저장
            self._walker = load_list("testing_list.txt")
        elif subset == "training": # subset이 "training"인 경우, 검증(validation)과 테스트(testing) 파일 목록을 로드하여 하나의 리스트로 합침
            excludes = load_list("validation_list.txt") + load_list("testing_list.txt") # 검증 파일 + 평가 파일을 excludes에 저장
            excludes = set(excludes) # 이 목록을 set으로 변환하여 중복 제거 및 빠른 조회가 가능하도록 수정
            self._walker = [w for w in self._walker if w not in excludes] # excludes에 있는 파일 제외한 파일만 _walker

# 요약:
# SubsetSC 클래스는 SpeechCommands 데이터셋의 특정 부분집합을 로드할 수 있게 해줍니다.
# subset 매개변수에 따라 검증, 테스트 또는 훈련 데이터 파일 목록을 _walker에 설정합니다.
# 훈련 데이터셋의 경우, 검증과 테스트 파일 목록을 제외한 나머지 파일들을 선택합니다.

# 2. Transformer 기반 음성 명령 인식 모델 정의
#   -------------------------------------------------------------------
#    input_dim: 입력 특성 차원 (예: Mel-spectrogram의 멜 밴드 수)            - 128차원
#    num_classes: 분류할 클래스의 개수.
#    d_model: Transformer 내부의 임베딩 차원.                              - 128 차원 
#    nhead: 멀티헤드 어텐션에서의 헤드 수.                                  - 8개 
#    num_layers: Transformer 인코더 레이어의 수.                            - 4개
#    dim_feedforward: 각 Transformer 레이어 내 피드포워드 네트워크의 차원. - 512차원
#    dropout: 드롭아웃 비율.
#   -------------------------------------------------------------------
class SpeechTransformer(nn.Module):
    def __init__(self, input_dim, num_classes, d_model=128, nhead=8, num_layers=4, dim_feedforward=512, dropout=0.1):
        super(SpeechTransformer, self).__init__()
        self.input_projection = nn.Linear(input_dim, d_model) # 입력 특성 차원(input_dim)을 Transformer 모델에서 사용하는 차원(d_model)으로 선형 변환하는 레이어
        # 단일 Transformer 인코더 레이어를 생성. encoder_layer는 init 메서드 외에서는 사용되지 않으므로 지역 변수로 생성함
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, 
            nhead=nhead, 
            dim_feedforward=dim_feedforward, 
            dropout=dropout
        )
        # 단일 Transformer encoder인 encoder_layer 변수를 num_layers만큼 쌓아서 총 4개의 인코더 인스턴스를 생성
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.classifier = nn.Linear(d_model, num_classes)
        
    def forward(self, x):
        # x shape: (batch_size, seq_len, input_dim)
        #print (x)
        out = self.input_projection(x)             # -> (batch_size, seq_len, d_model)
        #print(out)
        out = out.transpose(0, 1)                    # Transformer 입력 shape: (seq_len, batch_size, d_model)
        out = self.transformer_encoder(out)          # -> (seq_len, batch_size, d_model)
        #print(out)
        out = out.mean(dim=0)                        # 시퀀스 차원 평균 -> (batch_size, d_model)
        #print(out)
        logits = self.classifier(out)              # -> (batch_size, num_classes)
        #print(logits)
        return logits
    
# 3. 평가 함수 정의
def evaluate(model, data_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, targets in data_loader: # DataLoader에서 미니배치를 하나씩 꺼내어 반복
            outputs = model(inputs) # 모델에 입력 데이터를 전달하여 예측값(outputs) 생성
            _, predicted = torch.max(outputs, dim=1) # 가장 높은 값의 인덱스를 예측값으로 사용
            total += targets.size(0)
            correct += (predicted == targets).sum().item()
    return correct / total

# 4. main 함수 정의
def main():
    transform = torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128)

    # [데이터셋 로드]
    # 1️. train_set은 SubsetSC 클래스의 인스턴스
    # 2️. train_set._walker에 훈련용 .wav 파일 경로 리스트가 저장됨
    # 3️. train_set[0]을 호출하면 __getitem__()이 실행되어 실제 오디오 데이터(waveform), 샘플링 레이트, 라벨을 반환
    # >> 이제 train_set을 DataLoader에 넘겨 학습에 활용할 수 있음음
    train_set = SubsetSC("training") # 훈련 데이터셋 생성 (waveform.shape, sample_rate, label)
    test_set = SubsetSC("testing") # 테스트 데이터셋 생성 (waveform.shape, sample_rate, label)

    # 라벨 목록 및 인덱스 매핑 생성 
    labels = sorted(list(set(dat[2] for dat in train_set))) # 데이터셋에서 라벨 목록을 추출하고 정렬
    # train_set에서 dat마다 반복하여 dat[2]에 담겨있는 label을 추출 
    # set()을 사용하여 중복된 label을 제거
    # list()는 집합을 리스트로 변환
    # sorted() 알파벳 순서로 정렬
    label_to_idx = {label: idx for idx, label in enumerate(labels)} 
    # enumerate()는 반복(iterable) 가능한 객체(리스트, 튜플 등)를 순회할 때, 자동으로 인덱스(index) 값을 함께 제공하는 Python 내장 함수
    # label_to_idx는 딕셔너리 형태가 된다. 
    # 예시: labels = ["cat", "dog", "yes", "no"] 라면,
    # label_to_idx = {
    #                 "cat": 0,
    #                 "dog": 1,
    #                 "yes": 2,
    #                 "no": 3
    #                }
    def collate_fn(batch):
        waveforms = []
        targets = []
        for waveform, sample_rate, label, *_ in batch:
            mel_spec = transform(waveform) # waveform을 Mel-spectrogram으로 변환
            mel_spec = mel_spec.mean(dim=0)
            # 채널끼리 평균을 구함 -> 채널의 차원이 1로 됨 
            # 원래 데이터 (2채널, n_mels=128, 시간=201)
            # [    [채널1: 128 x 201],
            #     [채널2: 128 x 201]
            # ]
            # 
            # mean(dim=0) 적용 후
            # [    평균(채널1, 채널2): 128 x 201]
            mel_spec = mel_spec.transpose(0, 1) # (n_mels, time) -> (time, n_mels)
            waveforms.append(mel_spec) # (time, n_mels)을 waveforms에 추가
            targets.append(label_to_idx[label]) # 라벨을 인덱스로 변환하여 targets에 추가
        waveforms = nn.utils.rnn.pad_sequence(waveforms, batch_first=True) # waveforms를 패딩하여 하나의 텐서로 변환
        targets = torch.tensor(targets) # lables의 인덱스를 텐서로 변환
        return waveforms, targets

    train_loader = DataLoader(train_set, batch_size=32, shuffle=True, collate_fn=collate_fn)
    test_loader = DataLoader(test_set, batch_size=32, shuffle=False, collate_fn=collate_fn)
    # waveforms, targets를 반환하는 collate_fn 함수를 사용하여 DataLoader 생성
    # waveforms의 shape는 (batch_size, seq_len, n_mels)
    # targets의 shape는 (batch_size)

    input_dim = 128
    num_classes = len(labels)
    model = SpeechTransformer(input_dim=input_dim, num_classes=num_classes)
    # input_dim은 Mel-spectrogram의 멜 밴드 수와 같은 입력 특성의 차원 (128)
    # model(inputs)와 같이 inputs는 waveforms로 마지막 차원이 128이므로 model의 입력 차원에 맞음

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-3)

    # 훈련 단계
    num_epochs = 10
    for epoch in range(num_epochs):
        model.train() # 모델을 학습 모드로 설정
        total_loss = 0 # 1 epoch 내에서 발생한 전체 손실을 누적하는 변수
        for inputs, targets in train_loader:
            optimizer.zero_grad()  # 기울기(Gradient) 초기화 → 이전 스텝의 기울기를 제거하여 새로운 업데이트 계산
            outputs = model(inputs)  # 모델에 입력 데이터를 전달하여 예측값(outputs) 생성
            loss = criterion(outputs, targets)  # 손실 함수(criterion) 계산 → 실제 값(targets)과 예측값(outputs) 비교
            loss.backward()  # 역전파(Backpropagation) 수행 → 기울기(Gradient) 계산
            optimizer.step()  # 옵티마이저(Optimizer) 업데이트 → 기울기를 기반으로 가중치(W) 조정
            total_loss += loss.item()  # 손실 값(loss)을 누적하여 전체 손실(total_loss) 계산
        avg_loss = total_loss / len(train_loader) # 전체 손실을 데이터셋 크기로 나누어 평균 손실 계산
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")

    # 평가 단계
    accuracy = evaluate(model, test_loader)
    print(f"Test Accuracy: {accuracy * 100:.2f}%")

if __name__ == '__main__':
    main()




Epoch 1/10, Loss: 1.9380
Epoch 2/10, Loss: 1.2769
Epoch 3/10, Loss: 1.0876
Epoch 4/10, Loss: 0.9995
Epoch 5/10, Loss: 0.9530
Epoch 6/10, Loss: 0.8923
Epoch 7/10, Loss: 0.8470
Epoch 8/10, Loss: 0.8180
Epoch 9/10, Loss: 0.7867
Epoch 10/10, Loss: 0.7556
Test Accuracy: 80.36%
