# 메소드 하나로 작업 원한다 해서 코드 보는 중

# 0. 환경설정

In [None]:
!pip install praat-parselmouth

In [None]:
import parselmouth
from parselmouth.praat import call
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import google.colab.drive as drive
drive.mount('/content/drive/')

# 1. Extract and Modulate Pitch Contour: Using Threshold

In [None]:
def stylize_pitch_manually(pitch_values, time_stamps, threshold=10.0):
    """
    음높이 데이터를 스타일화하여 불필요한 변화를 제거합니다.
    - pitch_values: 음높이 값 배열
    - time_stamps: 시간 값 배열
    - threshold: 두 음높이 값 간의 최소 차이 (Hz 단위)
    """
    stylized_pitch = [pitch_values[0]]  # 첫 음높이 값 포함
    stylized_time = [time_stamps[0]]   # 첫 시간 포함

    for i in range(1, len(pitch_values)):
        # 이전 값과 현재 값의 차이가 threshold보다 크면 추가
        if abs(pitch_values[i] - stylized_pitch[-1]) >= threshold:
            stylized_pitch.append(pitch_values[i])
            stylized_time.append(time_stamps[i])

    return np.array(stylized_pitch), np.array(stylized_time)

def calculate_movement_distance_and_slope(pitch_values, time_stamps):
    """
    음높이 움직임의 이동 거리와 기울기를 계산합니다.
    - pitch_values: 음높이 값 목록
    - time_stamps: 음높이 값에 해당하는 시간 목록
    """
    # 음높이 값을 0~100 사이로 정규화합니다.
    min_pitch = np.min(pitch_values)  # 음높이의 최솟값
    max_pitch = np.max(pitch_values)  # 음높이의 최댓값
    normalized_pitch = (pitch_values - min_pitch) / (max_pitch - min_pitch) * 100

    # 시간 값을 0~100 사이로 정규화합니다.
    min_time = np.min(time_stamps)
    max_time = np.max(time_stamps)
    normalized_time = (time_stamps - min_time) / (max_time - min_time) * 100

    movement_data = []  # 이동 거리와 기울기를 저장할 리스트
    for i in range(1, len(normalized_pitch)):
        delta_pitch = normalized_pitch[i] - normalized_pitch[i - 1]  # 음높이 차이
        delta_time = normalized_time[i] - normalized_time[i - 1]  # 시간 차이
        distance = np.sqrt(delta_pitch**2 + delta_time**2)  # 피타고라스 정리로 이동 거리 계산
        slope = delta_pitch / delta_time if delta_time != 0 else 0  # 기울기 계산
        movement_data.append({
            'start_time': normalized_time[i - 1],
            'end_time': normalized_time[i],
            'start_pitch': normalized_pitch[i - 1],
            'end_pitch': normalized_pitch[i],
            'distance': distance,
            'slope': slope
        })

    return movement_data

def extract_pitch_features_with_stylization_and_distance(audio_file, threshold=10.0):  # threshold=10으로 고정
    """
    Extract stylized pitch features including movement distance and slope from an audio file.
    """
    sound = parselmouth.Sound(audio_file)
    pitch = sound.to_pitch_ac(time_step=0.01, pitch_floor=50, pitch_ceiling=500) # time_step=0.01 (default)

    # Extract raw pitch values and timestamps
    pitch_values = pitch.selected_array['frequency']
    time_stamps = pitch.xs()

    # Filter out unvoiced segments
    valid_indices = pitch_values > 0
    pitch_values = pitch_values[valid_indices]
    time_stamps = time_stamps[valid_indices]

    if len(pitch_values) == 0:
        print(f"No valid pitch values found in {audio_file}")
        return []

    # Apply manual stylization
    stylized_pitch, stylized_time = stylize_pitch_manually(pitch_values, time_stamps, threshold)

    # Visualize the results
    visualize_pitch_values(stylized_pitch, stylized_time, title="Stylized Pitch Values")

    # Calculate movement distance and slope
    movement_data = calculate_movement_distance_and_slope(stylized_pitch, stylized_time)

    return movement_data


In [None]:
def visualize_pitch_values(pitch_values, time_stamps, title="Pitch Values"):
    """Plot pitch values for visualization."""
    plt.figure(figsize=(10, 5))
    plt.plot(time_stamps, pitch_values, label='Pitch Curve')
    plt.title(title)
    plt.xlabel('Time (s)')
    plt.ylabel('Pitch (Hz or Normalized)')
    plt.ylim(50, 500)  # 음역대와 일치시킴
    plt.legend()
    plt.grid(True)
    plt.show()

def save_to_csv(data, output_file):
    """Save all extracted pitch data to a single CSV file."""
    df = pd.DataFrame(data)
    df.to_csv(output_file, index=False)

In [None]:
def save2csv_pitch_threshold_process(input_folder, output_file, threshold=10):
    """
    Processes all .wav files in a folder, extracts pitch features, and saves them to a CSV file.
    - input_folder: Path to the folder containing .wav files
    - output_file: Path to save the output CSV file
    - threshold: Threshold for pitch stylization
    """
    all_pitch_data = []  # List to store features from all files

    # Loop through all files in the input folder
    for file_name in os.listdir(input_folder):
        if file_name.endswith(".wav"):  # Process only .wav files
            audio_file = os.path.join(input_folder, file_name)
            try:
                # Extract features for the current file
                pitch_data = extract_pitch_features_with_stylization_and_distance(audio_file, threshold)

                # Add a column to identify the source file
                for entry in pitch_data:
                    entry['file_name'] = file_name

                all_pitch_data.extend(pitch_data)  # Add to the overall list

                print(f"Processed {file_name}.")
            except Exception as e:
                print(f"Error processing {file_name}: {e}")

    if all_pitch_data:
        # Save all features to a single CSV file
        save_to_csv(all_pitch_data, output_file)
        print(f"All features saved to {output_file}.")
    else:
        print("No pitch data was extracted.")
        return pd.DataFrame()

In [None]:
# Define the folder containing audio files and output folder
input_folder = "/content/drive/MyDrive/AIFFELthon/Data/Sample/literature_100/sample"
output_file = "/content/all_features.csv"

In [None]:
df = save2csv_pitch_threshold_process(input_folder, output_file)

# Display the first few rows of the DataFrame if data exists
if not df.empty:
    print(df.head())

# 2. Generate Pitch Movement Slope Data
- Intonation Curve Standardization
- the physical feature of pitch range, moving time, moving distance, slope

**Reference**
  - Jeahyuk Oh. (2014). *A Study of Methods of Standardization for Korean Intonation Curve*. 한국어학, 62, 395-420.
  - Jeahyuk Oh. (2024).*Improving the objectivity of intonation transcription*. 한말연구, 65(25), 1-20.

In [None]:
def group_pitch_data(df):
    """
    Groups pitch data by 'file_name'.
    - df: DataFrame containing pitch features
    - Returns: Grouped object
    """
    if not df.empty:
        return df.groupby('file_name')
    else:
        print("DataFrame is empty. Cannot group data.")
        return None

In [None]:
df_pitch_movement = pd.DataFrame(all_pitch_data)
df_pitch_movement

In [None]:
grouped = df_pitch_movement.groupby('file_name')

In [None]:
'''
for file_name, group in grouped:
    plt.figure(figsize=(10, 6))
    plt.plot(group['start_time'], group['slope'], marker='o', label=f'{file_name}')
    plt.title(f'Slope Values for {file_name}', fontsize=14)
    plt.xlabel('Start Time (s)', fontsize=12)
    plt.ylabel('Slope', fontsize=12)
    plt.legend()
    plt.grid(True)
    plt.show()
'''

In [None]:
# 구글 드라이브에서 데이터 불러오기
drive_csv_path = "/content/drive/MyDrive/AIFFELthon/Data/Literature/label_lit.csv"  # 사용자가 제공할 파일 경로
full_data = pd.read_csv(drive_csv_path)

In [None]:
# 불러온 데이터에서 필요한 행만 필터링
matching_filenames = df_pitch_movement['file_name'].unique()
filtered_data = full_data[full_data['voice_piece_filename'].isin(matching_filenames)]

# 필요한 컬럼만 유지
columns_to_keep = ['voice_piece_filename', 'styles', 'emotions', 'gender', 'age', 'disagree']
filtered_data = filtered_data[columns_to_keep]

# 원본 데이터프레임과 필터링된 데이터프레임 결합
merged_data = df_pitch_movement.merge(filtered_data, left_on='file_name', right_on='voice_piece_filename', how='inner')

# 3. Intonation Curve Standardization

In [None]:
def extract_pitch_from_sound(sound_file, pitch_floor=50, pitch_ceiling=500):
    """
    음성 파일에서 음높이 데이터를 추출합니다.
    """
    sound = parselmouth.Sound(sound_file)
    pitch = sound.to_pitch_ac(time_step=0.01, pitch_floor=pitch_floor, pitch_ceiling=pitch_ceiling)

    # 음높이 값 및 시간 값 추출
    pitch_values = pitch.selected_array['frequency']
    time_stamps = pitch.xs()

    # 무성(unvoiced) 부분 제거
    valid_indices = pitch_values > 0
    pitch_values = pitch_values[valid_indices]
    time_stamps = time_stamps[valid_indices]

    return time_stamps, pitch_values

def stylize_pitch_tier_from_sound(sound_file, frequency_resolution=2.0, pitch_floor=50, pitch_ceiling=500):
    """
    음성 파일에서 PitchTier를 생성하고 스타일화합니다.
    """
    try:
        # 음성 파일에서 Pitch 추출
        sound = parselmouth.Sound(sound_file)
        pitch = sound.to_pitch_ac(time_step=0.01, pitch_floor=pitch_floor, pitch_ceiling=pitch_ceiling)

        # Pitch를 PitchTier로 변환
        manipulation = call(sound, "To Manipulation", 0.01, pitch_floor, pitch_ceiling)
        pitch_tier = call(manipulation, "Extract pitch tier")


        # 스타일화 수행
        call(pitch_tier, "Stylize...", frequency_resolution, "semitones")

        # 스타일화된 값 추출
        # PitchTire 객체에서는 값 추출이 불가능함 (API 제공 X)
        # Praat의 Synthesize > "To Pitch..." 이용하여 다시 pitch 객체로 변환
        close_copy_pitch = call(pitch_tier, "To Pitch...", 0.01, pitch_floor, pitch_ceiling)

        stylized_pitch = close_copy_pitch.selected_array['frequency']
        stylized_time = close_copy_pitch.xs()

        if len(stylized_time) == 0 or len(stylized_pitch) == 0:
            raise ValueError("Stylized time or pitch is empty.")

        return np.array(stylized_time), np.array(stylized_pitch)

    except Exception as e:
        print(f"Failed to process {sound_file}: {e}")
        return np.array([]), np.array([])


def calculate_slopes(stylized_time, stylized_pitch):
    """
    스타일화된 Pitch 데이터를 바탕으로 각 구간의 기울기를 계산합니다.
    """
    slopes = []
    for i in range(1, len(stylized_time)):
        delta_pitch = stylized_pitch[i] - stylized_pitch[i - 1]
        delta_time = stylized_time[i] - stylized_time[i - 1]
        slope = delta_pitch / delta_time if delta_time != 0 else 0
        slopes.append(slope)
    return np.array(slopes)


def save_stylized_pitch(stylized_time, stylized_pitch, slopes, output_file):
    """
    스타일화된 음높이 데이터를 CSV 파일로 저장합니다.
    """
    # slopes의 길이를 stylized_time과 맞추기 위해 None 값을 추가
    if len(slopes) < len(stylized_time):
        slopes = list(slopes) + [None]  # 마지막 구간의 기울기 값 없음 처리

    data = {
        "Time (s)": stylized_time,
        "Pitch (Hz)": stylized_pitch,
        "Slope": slopes
    }
    df = pd.DataFrame(data)
    df.to_csv(output_file, index=False)
    print(f"Stylized pitch data saved to {output_file}")

def process_audio_files(input_folder, output_folder):
    os.makedirs(output_folder, exist_ok=True)

    for file_name in os.listdir(input_folder):
        if file_name.endswith(".wav"):
            sound_file = os.path.join(input_folder, file_name)

            base_name = os.path.splitext(file_name)[0]
            output_plot = os.path.join(output_folder, f"{base_name}_plot.png")
            output_csv = os.path.join(output_folder, f"{base_name}_pitch.csv")

            try:
                # PitchTier 스타일화 수행
                stylized_time, stylized_pitch = stylize_pitch_tier_from_sound(sound_file, frequency_resolution=2.0)

                # 각 구간의 기울기 계산
                slopes = calculate_slopes(stylized_time, stylized_pitch)

                # 시각화
                plot_pitch_stylization(stylized_time, stylized_pitch, stylized_time, stylized_pitch, output_path=output_plot)

                # 결과 저장
                save_stylized_pitch(stylized_time, stylized_pitch, slopes, output_csv)

                print(f"Processed {file_name}: Plot saved to {output_plot}, CSV saved to {output_csv}.")
            except Exception as e:
                print(f"Failed to process {file_name}: {e}")

In [None]:
if __name__ == "__main__":
    # 입력 폴더 및 출력 폴더 경로 설정
    input_folder = "/content/drive/MyDrive/AIFFELthon/Data/Sample/literature_100/sample"  # .wav 파일이 포함된 폴더 경로
    output_folder = "/content/processed_results_res2.0"  # 결과 파일을 저장할 폴더

    process_audio_files(input_folder, output_folder)
