In [13]:
import os
import re
import json
import numpy as np
import librosa
import parselmouth
from scipy.spatial.distance import cosine
from google.cloud import speech
from dotenv import load_dotenv


# Google Cloud Speech-to-Text API 설정
load_dotenv()
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.getenv("KEY_PATH")

speech_client = speech.SpeechClient()

In [None]:
def preprocess_audio(input_path, target_sr=16000):
    """오디오를 읽고 샘플링 속도를 변환"""
    y, sr = librosa.load(input_path, sr=None)
    if sr != target_sr:
        y = librosa.resample(y, orig_sr=sr, target_sr=target_sr)
    return y, target_sr


def recognize_speech(y, sr, language_code="en-US"):
    """Google Speech-to-Text API를 사용해 단어별 타임스탬프 추출"""
    import soundfile as sf
    from io import BytesIO

    buffer = BytesIO()
    sf.write(buffer, y, sr, format="WAV")
    buffer.seek(0)
    audio = speech.RecognitionAudio(content=buffer.read())
    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=sr,
        language_code=language_code,
        enable_word_time_offsets=True,
    )
    response = speech_client.recognize(config=config, audio=audio)

    word_timestamps = []
    for result in response.results:
        for word_info in result.alternatives[0].words:
            word_timestamps.append(
                {
                    "word": word_info.word,
                    "start_time": word_info.start_time.total_seconds(),
                    "end_time": word_info.end_time.total_seconds(),
                }
            )
    return word_timestamps


def extract_sentence_pitch(y, sr):
    """문장 전체의 피치 데이터를 추출"""
    snd = parselmouth.Sound(y, sr)
    pitch = snd.to_pitch()
    frame_frequencies = pitch.selected_array["frequency"]
    valid_frequencies = frame_frequencies[
        frame_frequencies > 0
    ]  # 유효한 피치 데이터만 추출
    return (
        valid_frequencies if len(valid_frequencies) > 0 else np.array([0])
    )  # 빈 경우 0으로 처리


def extract_sentence_syllable_durations(y, sr):
    """문장 전체에서 음절 길이를 계산"""
    snd = parselmouth.Sound(y, sr)
    pitch = snd.to_pitch()
    frame_frequencies = pitch.selected_array["frequency"]
    valid_frames = frame_frequencies[frame_frequencies > 0]
    syllable_duration = (
        len(valid_frames) / pitch.ceiling if len(valid_frames) > 0 else 0
    )
    return syllable_duration


def calculate_cosine_similarity(vec1, vec2):
    """벡터 간 코사인 유사도 계산 (길이 패딩 포함)"""
    if len(vec1) == 0 or len(vec2) == 0:
        return 0.0
    max_len = max(len(vec1), len(vec2))
    vec1 = np.pad(vec1, (0, max_len - len(vec1)), mode="constant")
    vec2 = np.pad(vec2, (0, max_len - len(vec2)), mode="constant")
    return 1 - cosine(vec1, vec2)


def calculate_pitch_similarity(user_pitch, ref_pitch):
    """문장 전체의 피치 패턴 유사도 계산"""
    return calculate_cosine_similarity(user_pitch, ref_pitch)


def calculate_rhythm_similarity(user_timestamps, ref_timestamps):
    """문장 전체의 리듬 패턴 유사도 계산"""
    user_rhythm = [
        duration for word in user_timestamps for duration in word["syllable_durations"]
    ]
    ref_rhythm = [
        duration for word in ref_timestamps for duration in word["syllable_durations"]
    ]
    return calculate_cosine_similarity(user_rhythm, ref_rhythm)


def calculate_speed_ratio(user_timestamps, ref_timestamps):
    """발화 속도 차이를 계산 (초당 발화 단어 수)"""
    if len(user_timestamps) == 0 or len(ref_timestamps) == 0:
        return 0.0

    user_duration = user_timestamps[-1]["end_time"] - user_timestamps[0]["start_time"]
    ref_duration = ref_timestamps[-1]["end_time"] - ref_timestamps[0]["start_time"]

    user_speed = len(user_timestamps) / user_duration if user_duration > 0 else 0
    ref_speed = len(ref_timestamps) / ref_duration if ref_duration > 0 else 0

    return user_speed / ref_speed if ref_speed > 0 else 0


def analyze_audio(user_audio_path, ref_audio_path):
    """사용자와 기준 음성을 분석하여 결과 반환"""
    y_user, sr_user = preprocess_audio(user_audio_path)
    y_ref, sr_ref = preprocess_audio(ref_audio_path)

    user_timestamps = recognize_speech(y_user, sr_user)
    ref_timestamps = recognize_speech(y_ref, sr_ref)

    # 잘못 인식된 단어 추출
    user_words = [word["word"] for word in user_timestamps]
    ref_words = [word["word"] for word in ref_timestamps]
    mispronounced_words = list(set(user_words) - set(ref_words))
    excluded_words = mispronounced_words

    total_words = len(ref_words)
    mispronounced_ratio = (
        len(mispronounced_words) / total_words if total_words > 0 else 0
    )

    valid_user_timestamps = [
        word for word in user_timestamps if word["word"] not in excluded_words
    ]
    valid_ref_timestamps = [
        word for word in ref_timestamps if word["word"] not in excluded_words
    ]

    # 문장 전체 음절 길이 계산
    user_syllable_duration = extract_sentence_syllable_durations(y_user, sr_user)
    ref_syllable_duration = extract_sentence_syllable_durations(y_ref, sr_ref)
    rhythm_similarity = calculate_cosine_similarity(
        [user_syllable_duration], [ref_syllable_duration]
    )

    # 문장 전체 피치 패턴 계산
    user_pitch = extract_sentence_pitch(y_user, sr_user)
    ref_pitch = extract_sentence_pitch(y_ref, sr_ref)
    pitch_similarity = calculate_pitch_similarity(user_pitch, ref_pitch)

    # 발화 속도 및 기타 계산
    speed_ratio = calculate_speed_ratio(valid_user_timestamps, valid_ref_timestamps)
    pause_similarity = calculate_cosine_similarity(
        [word["end_time"] - word["start_time"] for word in valid_user_timestamps],
        [word["end_time"] - word["start_time"] for word in valid_ref_timestamps],
    )

    return {
        "Pitch Pattern": pitch_similarity,
        "Rhythm Pattern": rhythm_similarity,
        "Speed": speed_ratio,
        "Pause Pattern": pause_similarity,
        "Mispronounced Words": {
            "ratio": mispronounced_ratio,
            "list": mispronounced_words,
        },
    }


def compare_audio_folders(user_folder, ref_folder):
    user_files = sorted(os.listdir(user_folder), key=extract_number_from_filename)
    ref_files = sorted(os.listdir(ref_folder), key=extract_number_from_filename)

    results = {}
    for idx, (user_file, ref_file) in enumerate(zip(user_files, ref_files), start=1):
        user_audio_path = os.path.join(user_folder, user_file)
        ref_audio_path = os.path.join(ref_folder, ref_file)

        print(f"Comparing: {user_file} vs {ref_file}")
        result = analyze_audio(user_audio_path, ref_audio_path)
        results[f"comparison_{idx}"] = result

    return results


def main():
    user_folder = "./test_data/child_audio_100"
    ref_folder = "./test_data/tts_tortoise_audio_100"
    results = compare_audio_folders(user_folder, ref_folder)

    with open("results5.json", "w") as f:
        json.dump(results, f, indent=4)

    print(json.dumps(results, indent=4))


def extract_number_from_filename(filename):
    """파일 이름에서 숫자를 추출"""
    match = re.search(r"(\d+)", filename)
    return int(match.group(1)) if match else float("inf")

In [20]:
# 실행
if __name__ == "__main__":
    main()

Comparing: E0001A671-BFG33-L1N2D1-E-F5NX-04822489.wav vs sentence_1.wav
Comparing: E0001A671-BFG33-L1N2D1-E-F5NX-04822502.wav vs sentence_2.wav
Comparing: E0001A671-BFG33-L1N2D1-E-F5NX-04822529.wav vs sentence_3.wav
Comparing: E0001A671-BFG33-L1N2D1-E-F5NX-04822580.wav vs sentence_4.wav
Comparing: E0001A671-BFG33-L1N2D1-E-F5NX-04822594.wav vs sentence_5.wav
Comparing: E0001A671-BFG33-L1N2D1-E-F5NX-04822603.wav vs sentence_6.wav
Comparing: E0001A671-BFG33-L1N2D1-E-F5NX-04822635.wav vs sentence_7.wav
Comparing: E0001A671-BFG33-L1N2D1-E-F5NX-04822655.wav vs sentence_8.wav
Comparing: E0001A671-BFG33-L1N2D1-E-F5NX-04822681.wav vs sentence_9.wav
Comparing: E0001A671-BFG33-L1N2D1-E-F5NX-04822693.wav vs sentence_10.wav
Comparing: E0001A671-BFG33-L1N2D1-E-F5NX-04822705.wav vs sentence_11.wav
Comparing: E0001A671-BFG33-L1N2D1-E-F5NX-04822723.wav vs sentence_12.wav
Comparing: E0001A671-BFG33-L1N2D1-E-F5NX-04822746.wav vs sentence_13.wav
Comparing: E0001A671-BFG33-L1N2D1-E-F5NX-04822759.wav vs sen

In [None]:
import json
import numpy as np
import pandas as pd


def calculate_statistics(json_file_path):
    # JSON 파일 불러오기
    with open(json_file_path, "r") as file:
        data = json.load(file)

    # 비교 항목 데이터를 추출하여 DataFrame 생성
    comparisons = []
    for key, values in data.items():
        comparisons.append(
            {
                "Pitch Pattern": values["Pitch Pattern"],
                "Rhythm Pattern": values["Rhythm Pattern"],
                "Speed": values["Speed"],
                "Pause Pattern": values["Pause Pattern"],
                "Mispronounced Words Ratio": values["Mispronounced Words"]["ratio"],
            }
        )

    df = pd.DataFrame(comparisons)

    # 통계량 계산
    stats = (
        df.describe(percentiles=[0.25, 0.5, 0.75])
        .loc[["min", "25%", "50%", "75%", "max", "mean"]]
        .T
    )
    stats.columns = ["Min", "Q1", "Median", "Q3", "Max", "Mean"]

    # 결과 출력
    print(stats)


# 실행
json_file_path = "pronunciation_evaluation_result.json"
calculate_statistics(json_file_path)

                                Min        Q1    Median        Q3       Max  \
Pitch Pattern              0.575492  0.808059  0.869744  0.920034  0.992826   
Rhythm Pattern             1.000000  1.000000  1.000000  1.000000  1.000000   
Speed                      0.378378  0.694153  0.875484  1.056140  5.333333   
Pause Pattern              0.293117  0.687836  0.799097  0.903662  0.997037   
Mispronounced Words Ratio  0.000000  0.000000  0.090909  0.250000  1.500000   

                               Mean  
Pitch Pattern              0.857399  
Rhythm Pattern             1.000000  
Speed                      0.957689  
Pause Pattern              0.766998  
Mispronounced Words Ratio  0.157405  
