In [2]:
import os
import pandas as pd
import numpy as np
import librosa
import librosa.display
from tqdm import tqdm
import gc

In [6]:
# ========== 1) Audio Preprocessing (MFCC) ==========
from scipy.signal import butter, lfilter

def lowpass_filter(data, sr, cutoff=4000, order=5):
    """
    Apply a Butterworth low-pass filter to the data.
    """
    nyquist = 0.5 * sr
    normal_cutoff = cutoff / nyquist
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    return lfilter(b, a, data)

def extract_mfcc(file_path, n_mfcc=20, cutoff=5000):
    """
    Load audio, apply low-pass filter, and extract MFCC spectrogram.
    Returns a 2D numpy array of shape (n_mfcc, time_frames).
    """
    audio, sr = librosa.load(file_path, sr=None)  # sr=None -> original sampling rate
    # 1) Optional low-pass filter
    audio = lowpass_filter(audio, sr, cutoff=cutoff)
    
    # 2) 파라미터 설정 (예: 25ms window, 10ms hop)
    win_length = int(0.025 * sr)
    hop_length = int(0.01 * sr)
    n_fft = win_length
    
    # 3) MFCC 추출
    mfcc = librosa.feature.mfcc(
        y=audio,
        sr=sr,
        n_mfcc=n_mfcc,
        n_fft=n_fft,
        hop_length=hop_length,
        win_length=win_length,
        window='hann'
    )
    return mfcc  # shape: (n_mfcc, T)

In [1]:
# ========== 2) Text Preprocessing (Tokenization) ==========
from transformers import BertTokenizer

# 사전학습된 모델의 이름
BERT_MODEL_NAME = 'bert-base-uncased'

# 토크나이저 로드
tokenizer = BertTokenizer.from_pretrained(BERT_MODEL_NAME)

def tokenize_text(text, max_length=32):
    """
    BERT tokenizer를 이용해 text를 토큰화하고,
    최대 길이를 넘어가면 잘라내고, 짧으면 패딩.
    반환값: (input_ids, attention_mask)
    """
    encoding = tokenizer(
        text,
        truncation=True,
        padding='max_length',
        max_length=max_length,
        return_tensors='np'  # numpy 배열로 반환
    )
    input_ids = encoding['input_ids'][0]         # shape: (max_length,)
    attention_mask = encoding['attention_mask'][0]  # shape: (max_length,)
    return input_ids, attention_mask

In [8]:
# ========== 3) Padding/Truncation for MFCC ==========
def pad_or_truncate_mfcc(mfcc, target_time_frames):
    """
    mfcc: shape (n_mfcc, T)
    target_time_frames: 패딩/잘라낼 T 길이
    """
    n_mfcc, current_T = mfcc.shape
    
    if current_T < target_time_frames:
        # Zero-padding
        padded = np.pad(mfcc, ((0, 0), (0, target_time_frames - current_T)), mode='constant')
    else:
        # Truncate
        padded = mfcc[:, :target_time_frames]
    
    return padded


In [None]:
import os
import pandas as pd
from sklearn.preprocessing import LabelEncoder

BASE_DIR = '/Users/jeonsang-eon/ECE6254-Voice-Feature-Extraction/'
RAW_DATA_DIR = os.path.join(BASE_DIR, 'raw_data')
OUTPUT_DIR = os.path.join(BASE_DIR, 'data')
os.makedirs(OUTPUT_DIR, exist_ok=True)

# 관심있는 액센트와 연령대를 정의
ACCENTS_OF_INTEREST = ['us', 'england', 'indian', 'australia']
AGES_OF_INTEREST = ['teens', 'twenties', 'thirties', 'fourties', 'fifties', 'sixties']

# --------------------------------------------------------------------------------
# 1) CSV 로드 및 필터링 함수
# --------------------------------------------------------------------------------
def load_and_filter_csv(csv_path, accents=ACCENTS_OF_INTEREST):
    """
    1) CSV 로드
    2) gender, accent 결측 제거
    3) 관심 액센트만 선택
    4) 실제 오디오 파일이 존재하는 행만 남김
    5) 필요한 컬럼만 반환
    """
    df = pd.read_csv(csv_path)

    # gender, accent 결측 제거
    df = df.dropna(subset=['gender', 'accent'])
    
    # 관심 액센트만 남김
    df = df[df['accent'].isin(accents)]
    
    # 실제 파일이 존재하는지 확인
    df = df[df['filename'].apply(lambda x: os.path.exists(os.path.join(RAW_DATA_DIR, x)))]

    # 필요한 컬럼만 선택 (filename, text, gender, accent, age)
    keep_cols = ['filename', 'text', 'gender', 'accent', 'age']
    df = df[keep_cols].copy()

    return df

# --------------------------------------------------------------------------------
# 2) (Train 전용) (gender, age)별로 액센트 수를 맞추기 위한 Over/Under 샘플링
# --------------------------------------------------------------------------------
def balance_by_gender_age(df):
    """
    (gender, age) 조합별로, 모든 액센트가 동일한 개수가 되도록
    오버샘플링/언더샘플링을 적용.
    """
    df_temp = df.copy()
    all_accents = df_temp['accent'].unique()
    num_accents = len(all_accents)

    # (gender, age) 조합별로 액센트가 모두 존재하는지 확인
    combo_counts = df_temp.groupby(['gender', 'age'])['accent'].nunique()
    valid_combos = combo_counts[combo_counts == num_accents].index.tolist()

    balanced_groups = []
    resample_info = []

    for gender, age in valid_combos:
        subset = df_temp[(df_temp['gender'] == gender) & (df_temp['age'] == age)]
        
        # 해당 (gender, age)에서 액센트별 개수 확인
        accent_counts = subset.groupby('accent').size()
        
        # min 개수 대비 1.5배 이하 & max 개수 이하로 T 결정
        T = min(accent_counts.max(), int(1.5 * accent_counts.min()))
        
        for accent_val, group in subset.groupby('accent'):
            current_count = len(group)
            factor = T / current_count
            if current_count < T:
                # Over-sampling
                sampled = group.sample(n=T, replace=True, random_state=42)
                method = 'oversampled'
            else:
                # Under-sampling
                sampled = group.sample(n=T, replace=False, random_state=42)
                method = 'undersampled'
            
            balanced_groups.append(sampled)
            
            info_line = (f"[{gender}, {age}, {accent_val}]  "
                         f"Original={current_count}, Target={T}, "
                         f"Factor={factor:.2f}, Method={method}")
            resample_info.append(info_line)

    df_balanced = pd.concat(balanced_groups, ignore_index=True)

    return df_balanced, resample_info

# --------------------------------------------------------------------------------
# 3) (Train 전용) 검증 세트(valid) 분리
# --------------------------------------------------------------------------------
def split_train_valid_by_accent(df_balanced, df_train_original, valid_size=500):
    """
    df_balanced (이미 oversample/undersample 된 train)
    df_train_original (원본 train 필터링 결과)
    
    1) df_balanced에 포함되지 않은 row들만 candidate로 가져옴
    2) 각 accent별로 최대 valid_size개 샘플링
    3) df_valid 생성 후 반환
    """
    # df_balanced에 없는 파일만 후보로
    df_candidates = df_train_original[~df_train_original['filename'].isin(df_balanced['filename'])]

    valid_groups = []
    for accent, group in df_candidates.groupby('accent'):
        n_samples = min(valid_size, len(group))
        sampled_group = group.sample(n=n_samples, random_state=42)
        valid_groups.append(sampled_group)

    df_valid = pd.concat(valid_groups, ignore_index=True)
    return df_valid




In [10]:
# ========== 4) Main Function to create NPZ ==========
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical

def create_npz_from_csv(
    csv_path,
    audio_base_dir,
    output_npz_path,
    n_mfcc=13,
    cutoff=5000,
    max_length_text=32,
    time_frames_target=None,
    label_col='accent'
):
    """
    1) CSV 로드 -> 2) 오디오(MFCC) & 텍스트(token) 추출 -> 3) 라벨 인코딩 -> 4) npz 저장
    csv_path: CSV 파일 경로 (filename, text, accent 등이 포함)
    audio_base_dir: 오디오가 저장된 상위 디렉토리
    output_npz_path: 최종 npz가 저장될 경로
    n_mfcc: MFCC 개수
    cutoff: 저역통과 필터 커트오프 주파수
    max_length_text: 텍스트 토큰 최대 길이
    time_frames_target: MFCC 시간 프레임 수 (None이면 median으로 자동 결정)
    label_col: 타겟 라벨 컬럼명 (예: 'accent')
    """
    df = pd.read_csv(csv_path)
    
    # 1) 결측치/조건 filtering (필요하다면 추가)
    # 여기서는 간단히 filename과 text가 결측이면 제외
    df = df.dropna(subset=['filename', 'text'])
    
    # 2) 라벨 인코딩 (accent)
    le = LabelEncoder()
    df[label_col] = df[label_col].fillna('unknown')  # 혹시 결측 accent 있으면 임시 처리
    df['label_encoded'] = le.fit_transform(df[label_col])
    
    # 3) 오디오 및 텍스트 전처리
    mfcc_list = []
    input_ids_list = []
    attention_mask_list = []
    
    print(f"Extracting MFCC & tokens from {len(df)} samples...")
    for i, row in tqdm(df.iterrows(), total=len(df)):
        audio_path = os.path.join(audio_base_dir, row['filename'])
        # (A) 오디오 -> MFCC
        mfcc = extract_mfcc(audio_path, n_mfcc=n_mfcc, cutoff=cutoff)
        mfcc_list.append(mfcc)
        
        # (B) 텍스트 -> 토큰
        txt = str(row['text'])  # 텍스트
        input_ids, att_mask = tokenize_text(txt, max_length=max_length_text)
        input_ids_list.append(input_ids)
        attention_mask_list.append(att_mask)
    
    # 4) MFCC 시간 길이 기준 결정 (time_frames_target이 None이면 median 산출)
    if time_frames_target is None:
        all_time_frames = [mf.shape[1] for mf in mfcc_list]
        time_frames_target = int(np.median(all_time_frames))
        print("Auto-selected median time frames:", time_frames_target)
    else:
        print("Using user-defined time frames:", time_frames_target)
    
    # 5) MFCC 리스트를 패딩/자르기하여 고정 길이로 맞춤
    padded_mfcc_list = [pad_or_truncate_mfcc(m, time_frames_target) for m in mfcc_list]
    
    # 6) 넘파이 배열로 변환 (채널 차원 추가: (n, 1, n_mfcc, time_frames))
    X_audio = np.stack([np.expand_dims(m, axis=0) for m in padded_mfcc_list])
    
    # 텍스트 토큰들 (n, max_length)
    X_input_ids = np.stack(input_ids_list)
    X_attention_mask = np.stack(attention_mask_list)
    
    # 라벨 (원-핫 인코딩 예시)
    labels = df['label_encoded'].values
    y = to_categorical(labels)  # (n, num_classes)
    
    # 7) NPZ로 저장
    np.savez_compressed(
        output_npz_path,
        X_audio=X_audio,
        X_input_ids=X_input_ids,
        X_attention_mask=X_attention_mask,
        y=y,
        label_encoder_classes=le.classes_,
        time_frames=time_frames_target  # 메타 정보
    )
    print(f"Saved dataset to {output_npz_path}")
    print(f"Shape of X_audio: {X_audio.shape}")
    print(f"Shape of X_input_ids: {X_input_ids.shape}, X_attention_mask: {X_attention_mask.shape}")
    print(f"Shape of y (one-hot): {y.shape}")



In [11]:
# ========== 5) 실제 함수 호출 예시 ==========

# --------------------------------------------------------------------------------
# 4) 메인 실행부
# --------------------------------------------------------------------------------
if __name__ == "__main__":

    # (A) Train CSV 로드 & 필터링
    TRAIN_CSV_PATH = os.path.join(RAW_DATA_DIR, "cv-valid-train.csv")
    df_train_filtered = load_and_filter_csv(TRAIN_CSV_PATH, accents=ACCENTS_OF_INTEREST)
    print("Filtered train accent counts:")
    print(df_train_filtered['accent'].value_counts())

    # (B) Train Balancing
    df_train_balanced, info_lines = balance_by_gender_age(df_train_filtered)
    
    # Resample 정보 텍스트로 저장
    txt_path = os.path.join(OUTPUT_DIR, "train_dataset_info.txt")
    with open(txt_path, "w") as f:
        f.write("Resample Factor Information:\n")
        for line in info_lines:
            f.write(line + "\n")
    print("\nResample info saved to", txt_path)

    # (C) Balanced Train -> Valid 세트 분리
    df_valid = split_train_valid_by_accent(df_train_balanced, df_train_filtered, valid_size=500)
    print("\nBalanced train shape:", df_train_balanced.shape)
    print("Valid shape:", df_valid.shape)

    # (D) Test CSV 로드 & 필터링 (테스트는 Balancing 생략)
    TEST_CSV_PATH = os.path.join(RAW_DATA_DIR, "cv-valid-test.csv")
    df_test_filtered = load_and_filter_csv(TEST_CSV_PATH, accents=ACCENTS_OF_INTEREST)
    print("\nFiltered test accent counts:")
    print(df_test_filtered['accent'].value_counts())

    # (E) Label Encoding
    #  - train_balanced의 accent로 인코더 학습 -> valid, test 동일 인코더 적용
    label_encoder = LabelEncoder()
    df_train_balanced['accent_encoded'] = label_encoder.fit_transform(df_train_balanced['accent'])
    df_valid['accent_encoded'] = label_encoder.transform(df_valid['accent'])
    df_test_filtered['accent_encoded'] = label_encoder.transform(df_test_filtered['accent'])

    # (F) CSV 저장
    train_csv_out = os.path.join(OUTPUT_DIR, "df_train_balanced.csv")
    valid_csv_out = os.path.join(OUTPUT_DIR, "df_valid.csv")
    test_csv_out  = os.path.join(OUTPUT_DIR, "df_test.csv")

    df_train_balanced.to_csv(train_csv_out, index=False)
    df_valid.to_csv(valid_csv_out, index=False)
    df_test_filtered.to_csv(test_csv_out, index=False)

    print("\nSaved balanced train to:", train_csv_out)
    print("Saved valid to:", valid_csv_out)
    print("Saved test to:", test_csv_out)

    # (G) 라벨 매핑 정보도 텍스트로 기록
    label_map_txt = os.path.join(OUTPUT_DIR, "label_mapping_info.txt")
    with open(label_map_txt, "w") as f:
        f.write("Accent Label Mapping:\n")
        for idx, accent in enumerate(label_encoder.classes_):
            f.write(f"{idx}: {accent}\n")
    print("Accent label mapping saved to", label_map_txt)

BASE_DIR = '/Users/jeonsang-eon/ECE6254-Voice-Feature-Extraction/'
RAW_DATA_DIR = os.path.join(BASE_DIR, "raw_data")

# 예: train.csv, valid.csv, test.csv 가 있다고 가정
TRAIN_CSV_PATH = os.path.join(RAW_DATA_DIR, "cv-other-train.csv")
VALID_CSV_PATH = os.path.join(RAW_DATA_DIR, "cv-other-dev.csv")
TEST_CSV_PATH  = os.path.join(RAW_DATA_DIR, "cv-other-test.csv")

# 출력 경로 설정
output_dir = os.path.join(BASE_DIR, "data")
os.makedirs(output_dir, exist_ok=True)

# 1) train 세트 변환
train_npz_path = os.path.join(output_dir, "train_dataset.npz")
create_npz_from_csv(
    csv_path=TRAIN_CSV_PATH,
    audio_base_dir=RAW_DATA_DIR,
    output_npz_path=train_npz_path,
    n_mfcc=20,
    cutoff=5000,
    max_length_text=32,
    time_frames_target=None,    # median을 자동으로 결정
    label_col='accent'
)

# 2) valid 세트 변환
valid_npz_path = os.path.join(output_dir, "valid_dataset.npz")
create_npz_from_csv(
    csv_path=VALID_CSV_PATH,
    audio_base_dir=RAW_DATA_DIR,
    output_npz_path=valid_npz_path,
    n_mfcc=20,
    cutoff=5000,
    max_length_text=32,
    time_frames_target=None,    # 별도로 지정 가능
    label_col='accent'
)

# 3) test 세트 변환
test_npz_path = os.path.join(output_dir, "test_dataset.npz")
create_npz_from_csv(
    csv_path=TEST_CSV_PATH,
    audio_base_dir=RAW_DATA_DIR,
    output_npz_path=test_npz_path,
    n_mfcc=20,
    cutoff=5000,
    max_length_text=32,
    time_frames_target=None,
    label_col='accent'
)

gc.collect()



Extracting MFCC & tokens from 145133 samples...


  0%|          | 104/145133 [00:09<3:40:38, 10.95it/s]


KeyboardInterrupt: 