### 0. 환경 구성

전처리에 필요한 패키지 설치

In [7]:
# %pip install datasets>=2.6.1
# %pip install transformers[torch]

### 1. 데이터 불러오기

In [8]:
import os
import shutil

# 원본 데이터 폴더 경로
source_dir = 'data/train_data'
# 타겟 디렉토리 설정
audio_target_dir = 'data/audio'
label_target_dir = 'data/label'

# 타겟 디렉토리가 없으면 생성
if not os.path.exists(audio_target_dir):
    os.makedirs(audio_target_dir)
if not os.path.exists(label_target_dir):
    os.makedirs(label_target_dir)

# source_dir에서 각 폴더를 순회
for folder in os.listdir(source_dir):
    folder_path = os.path.join(source_dir, folder)
    
    # 폴더 내의 파일을 순회
    if os.path.isdir(folder_path):
        for file in os.listdir(folder_path):
            file_path = os.path.join(folder_path, file)
            
            # audio.wav 파일을 audio 폴더로 복사
            if file.endswith('audio.wav'):
                shutil.copy(file_path, os.path.join(audio_target_dir, f"{folder}_audio.wav"))
            
            # alignment.txt 파일 처리
            elif file.endswith('alignment.txt'):
                with open(file_path, 'r', encoding='utf-8') as f:
                    lines = f.readlines()
                    # 두 번째 줄만 처리, 앞뒤 쉼표 및 따옴표 제거, 나머지 쉼표는 띄어쓰기로 대체, 소문자 변환
                    if len(lines) > 1:
                        timing_line = lines[1].strip().lstrip('",').rstrip('",').replace(",", " ").lower()
                
                # 수정된 타이밍 정보를 새 파일(label.txt)에 저장
                label_file_path = os.path.join(label_target_dir, f"{folder}_label.txt")
                with open(label_file_path, 'w', encoding='utf-8') as f:
                    # 양쪽의 따옴표 제거
                    timing_line = timing_line.strip('"')
                    f.write(timing_line)
                    

In [1]:
import os

data_folder = "./data"
audio_folder = os.path.join(data_folder, "audio")
label_folder = os.path.join(data_folder, "label")

# audio 폴더 내의 파일 개수 확인
audio_files = [file for file in os.listdir(audio_folder) if file.endswith(".wav")]
print(f"audio.wav 개수: {len(audio_files)}")

# labels 폴더 내의 파일 개수 확인
label_files = [file for file in os.listdir(label_folder) if file.endswith(".txt")]
print(f"label.txt 개수: {len(label_files)}")

audio.wav 개수: 10025
label.txt 개수: 10025


In [1]:
from datasets import Dataset, DatasetDict
import os

def load_dataset(data_folder):
    audio_folder = os.path.join(data_folder, "audio")
    label_folder = os.path.join(data_folder, "label")

    # 오디오 파일과 레이블 파일 매핑을 위한 리스트 초기화
    audios = []
    sentences = []

    for label_file in os.listdir(label_folder):
        if label_file.endswith(".txt"):
            prefix = label_file.split("_")[0]
            audio_file = f"{prefix}_audio.wav"
            audio_path = os.path.join(audio_folder, audio_file)
            label_path = os.path.join(label_folder, label_file)

            # 레이블 파일 읽기
            with open(label_path, 'r', encoding='utf-8') as file:
                sentence = file.read().strip()

            # 오디오 파일 경로와 문장을 각각의 리스트에 추가
            audios.append(audio_path)
            sentences.append(sentence)

    # Dataset 생성을 위한 사전 구성
    data_dict = {"audio": audios, "sentence": sentences}

    # 데이터셋 분할 (0.95 -> train,test 비율)
    train_split = int(0.95 * len(audios))
    train_dataset = Dataset.from_dict({key: value[:train_split] for key, value in data_dict.items()})
    test_dataset = Dataset.from_dict({key: value[train_split:] for key, value in data_dict.items()})

    # DatasetDict 생성
    dataset_dict = DatasetDict({
        "train": train_dataset,
        "test": test_dataset
    })

    return dataset_dict

data_folder = "./data"
dataset_dict = load_dataset(data_folder)

# DatasetDict 출력
print(dataset_dict)


DatasetDict({
    train: Dataset({
        features: ['audio', 'sentence'],
        num_rows: 9523
    })
    test: Dataset({
        features: ['audio', 'sentence'],
        num_rows: 502
    })
})


### 3. 데이터 전처리

prepare_dataset 함수에서 필요한 feature_extractor, tokenizer 구하기

In [2]:
from transformers import WhisperFeatureExtractor, WhisperTokenizer
from datasets import Audio, load_dataset

# Whisper 모델을 위한 FeatureExtractor와 Tokenizer 로드
feature_extractor = WhisperFeatureExtractor.from_pretrained("openai/whisper-medium")
tokenizer = WhisperTokenizer.from_pretrained("openai/whisper-medium", language="english", task="transcribe")

def prepare_dataset(batch):
    """
    Prepare audio data to be suitable for Whisper AI model.
    """
    # 오디오 데이터를 로드 (이미 16kHz로 다운샘플링 되었다고 가정)
    audio = batch["audio"]

    # 입력 오디오 배열로부터 log-Mel 입력 특성을 계산
    batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]

    # 대상 텍스트를 토큰화하고 라벨 ID로 인코딩
    batch["labels"] = tokenizer(batch["sentence"]).input_ids
    return batch

# 데이터셋에서 오디오 컬럼을 16kHz로 다운샘플링
custom_dataset = dataset_dict.cast_column("audio", Audio(sampling_rate=16000))

# 준비된 데이터셋 확인
print('| Check the audio example')
print(f'{custom_dataset["train"][0]}\n')

# 데이터를 Whisper AI 모델에 맞게 준비
custom_dataset = custom_dataset.map(
    prepare_dataset,
    remove_columns=custom_dataset.column_names["train"],
    # num_proc=2 # 멀티프로세싱 활성화
)

# 다운샘플링 효과 확인
print('| Check the effect of downsampling:')
print(f'{custom_dataset["train"][0]}\n')


| Check the audio example
{'audio': {'path': './data\\audio\\100038_audio.wav', 'array': array([0., 0., 0., ..., 0., 0., 0.]), 'sampling_rate': 16000}, 'sentence': 'and i like to have it'}



Map:   0%|          | 0/9523 [00:00<?, ? examples/s]

Map:   0%|          | 0/502 [00:00<?, ? examples/s]

| Check the effect of downsampling:
{'input_features': [[-1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.0433225631713867, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.0632929801940918, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1095075607299805, -1.1130571365356445, -1.1130571365356445, -1.0816056728363037, -1.111069679260254, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -0.7644461393356323, -0.846001148223877, -1.0278089046478271, -0.9870222806930542, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, -1.1130571365356445, 

In [3]:
# 데이터셋 저장 경로 지정
dataset_path = "./data/map_dataset_medium"

# 데이터셋을 디스크에 저장
custom_dataset.save_to_disk(dataset_path)

print(f"데이터셋이 저장되었습니다: {dataset_path}")

Saving the dataset (0/19 shards):   0%|          | 0/9523 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/502 [00:00<?, ? examples/s]

데이터셋이 저장되었습니다: ./data/map_dataset_medium


In [4]:
from datasets import load_from_disk

# 저장된 데이터셋 로드
loaded_dataset = load_from_disk(dataset_path)

print(loaded_dataset)


DatasetDict({
    train: Dataset({
        features: ['input_features', 'labels'],
        num_rows: 9523
    })
    test: Dataset({
        features: ['input_features', 'labels'],
        num_rows: 502
    })
})
