In [1]:
from pathlib import Path
import json

from rich import print as rprint

import midii

import src.preprocess_svs as ps
from src.preprocess_svs import gv

# GV File Correction

In [2]:
gv_path = "D:/dataset/177.다음색 가이드보컬 데이터"
gv_json_sample = "sample/gv/json"
gv_mid_sample = "sample/gv/midi"
gv_sample_preprocessed = "sample/gv/json_preprocessed"
gv_json_time_adjusted = "D:/dataset/다음색 가이드보컬 데이터 time_adjusted"
gv_json_preprocessed = "D:/dataset/다음색 가이드보컬 데이터 json preprocessed"
midi_filepath = "sample/gv/midi/SINGER_16_10TO29_CLEAR_FEMALE_BALLAD_C0632.mid"
time_adjusted_json_filepath = "sample/gv/json_time_adjusted/SINGER_16_10TO29_CLEAR_FEMALE_BALLAD_C0632.json"
filled_time_gaps_json_filepath = "sample/gv/json_filled_time_gaps/SINGER_16_10TO29_CLEAR_FEMALE_BALLAD_C0632.json"

In [None]:
print(len(list(ps.get_files(gv_path, "mid"))))

In [None]:
mid = midii.MidiFile(midi_filepath, convert_1_to_0=True)
tempo_rank = mid.tempo_rank()
print(tempo_rank)
print(ps.calculate_top_tempo_percentage(tempo_rank))

## Analysis Tempo Deviation

- json 을 처리하려면 quantize 를 위한 tempo 가 필요한데 json 에는 tempo 정보가 없음 
- -> tempo rank 검사 
- -> tempo 가 변하지 않는다는 충분한 보장
- -> dominate tempo 를 채택하여 quantize 해도 된다

In [5]:
# ps.tempo_statistics(gv_path, parallel=True, verbose=True)

- -> 이전 end_time 이 현재 start_time 보다 큰 경우가 있음 
- -> 이전 end_time 에 현재 start_time 을 맞추면, 뒤따라오는 메시지들의 sync 가 다 틀어짐 
- -> 이전 end_time 을 현재 start_time 에 맞춰주는 게 더 나음

## Verify notes sorted by time 

In [6]:
# gv.verify_json_notes_sorted_by_time(gv_path, parallel=True)

In [7]:
def adjust_note_times_sample():
    gv_path = "sample/gv/json"
    for json_path in ps.get_files(gv_path, "json"):
        p_orig = Path(json_path)
        out_path = p_orig.parent.parent / "json_time_adjusted" / p_orig.name
        out_path.parent.mkdir(exist_ok=True, parents=True)
        print(f"adjust time of \n{json_path}")
        with open(json_path, "r", encoding="utf-8") as f:
            data = json.load(f)
        notes = data.get("notes")
        processed_notes = gv.adjust_note_times(notes)
        with open(out_path, "w", encoding="utf-8") as f:
            json.dump(processed_notes, f, ensure_ascii=False, indent=4)
        print(f"saved to \n{out_path}")

In [None]:
adjust_note_times_sample()

## fill silence note between notes

In [None]:
print(time_adjusted_json_filepath)
print(filled_time_gaps_json_filepath)

In [None]:
gv.fill_time_gaps_save(time_adjusted_json_filepath, filled_time_gaps_json_filepath)

## verify correspondence json vs wav vs mid 

In [None]:
jsons = ps.get_files(gv_path, "json", sort=True)
mids = ps.get_files(gv_path, "mid", sort=True)
wavs = ps.get_files(gv_path, "wav", sort=True)
rprint(gv.verify_files_coherent(jsons, mids))
rprint(gv.verify_files_coherent(wavs, mids))
rprint(gv.verify_files_coherent(jsons, wavs))

## Remove abnormal files

In [None]:
gv.remove_abnormal_file(gv_path)

# GV Preprocessing

In [3]:
json_filepath = "sample/gv/json_preprocessed/SINGER_16_10TO29_CLEAR_FEMALE_BALLAD_C0632.json"
split_json_filepath = "sample/gv/split_json/SINGER_16_10TO29_CLEAR_FEMALE_BALLAD_C0632.json"
preprocessed_gv_path = "preprocessed_gv/"
preprocessed_gv_duration_path = "preprocessed_gv/duration"
preprocessed_gv_pitch_path = "preprocessed_gv/pitch"
preprocessed_gv_wav_path = "preprocessed_gv/wav"

## Step 1 - preprocess gv json

- gv json -> adjust note times + fill time gaps + quantization + frames
- embed coherent json format(sharing with mssv)

In [4]:
print(gv_json_sample, gv_mid_sample, gv_sample_preprocessed)

sample/gv/json sample/gv/midi sample/gv/json_preprocessed


In [5]:
gv.preprocess_json(
    gv_json_sample, gv_mid_sample, gv_sample_preprocessed, parallel=True
)

In [6]:
# gv.preprocess_json(
#     gv,
#     gv,
#     gv_json_preprocessed,
#     parallel=True,
# )

## Step 2 - split notes by silence

In [7]:
split_json = ps.split_json_by_silence(json_filepath, min_length=6)
split_json_filepath = Path(split_json_filepath)
split_json_filepath.parent.mkdir(exist_ok=True, parents=True)
with open(split_json_filepath, "w", encoding="utf-8") as f:
    json.dump(split_json, f, indent=4, ensure_ascii=False)

## Step 3 or Step 4

- regularization korean (metadata.txt 의 가사의 글자 갯수가 split 된 duration/pitch/wav 의 갯수와 일치해야 하는지? 만약 일치하지 않아도 된다면, step 4 에 해도 되고, json 이 아니라 kor seq/pitch seq/GT 만 받아도 해도 된다)

In [8]:
split_json_filepath

PosixPath('sample/gv/split_json/SINGER_16_10TO29_CLEAR_FEMALE_BALLAD_C0632.json')

## Step 4 - save duration, pitch as npy file, split audio, save metadata

In [9]:
wav_filepath = "sample/gv/wav/SINGER_16_10TO29_CLEAR_FEMALE_BALLAD_C0632.wav"

In [10]:
metadata_list = []
metadata_list.append(
    gv.preprocess_one(
        wav_filepath,
        split_json_filepath,
        preprocessed_gv_pitch_path,
        preprocessed_gv_duration_path,
        preprocessed_gv_wav_path,
    )
)
preprocessed_gv_path = Path(preprocessed_gv_path)
preprocessed_gv_path.mkdir(exist_ok=True, parents=True)
with open(f"{preprocessed_gv_path}/metadata.txt", "w", encoding="utf-8") as f:
    f.write("".join(metadata_list))

## Normalizer 사용 설명

### 1. lyric_normalizer.py의 LyricNormalizer 클래스 import
### 2. LyricNormalizer 객체 생성
### 3. LyricNormalizer.normalize_lyrics() 함수 사용
#### &emsp; Input: GT(whisper result), 원본 가사, pitch sequence, duration sequence
#### &emsp; Output: 정규화 가사, pitch sequence, duration sequence, 정규화 정보를 담은 dictionary


In [12]:
import numpy as np
import whisper
import time
from datetime import datetime

from src.preprocess_svs.lyric_normalizer import LyricNormalizer

metadata_filepath = preprocessed_gv_path / "metadata.txt"
wav_base_path = preprocessed_gv_path / "wav"
pitch_base_path = preprocessed_gv_path / "pitch"
duration_base_path = preprocessed_gv_path / "duration"
normalized_metadata_filepath = preprocessed_gv_path / "normalized_metadata.txt"

lyric_normalizer = LyricNormalizer()
processed_lines = []

print("------------------------------------------")
print(f"Loading Whisper model at {datetime.now()}")
start_time = time.time()
model = whisper.load_model("large-v3", device="cuda")
end_time = time.time()
print(f"Model loaded in {end_time - start_time:.2f} seconds")
print("------------------------------------------")

with open(metadata_filepath, "r", encoding="utf-8") as f_meta:
    for line in f_meta:
        line = line.strip()
        if not line:
            continue

        parts = line.split("|")
        if len(parts) < 2:
            print(f"Skipping malformed line: {line}")
            continue
        
        original_filename_stem = parts[0]
        original_lyrics = parts[1]
        other_columns = parts[2:] # Store any additional columns

        wav_filepath = wav_base_path / f"{original_filename_stem}.wav"
        pitch_filepath = pitch_base_path / f"{original_filename_stem}.npy" # Assuming .npy
        duration_filepath = duration_base_path / f"{original_filename_stem}.npy"
        print("----------------------------------------------")
        print(f"Processing: {original_filename_stem}")
        print(f"  Original Lyrics: '{original_lyrics}' ({len(original_lyrics)})")

        # --- 2. Whisper STT ---
        stt_start_time = time.time()
        result = model.transcribe(str(wav_filepath), language="ko")
        stt_end_time = time.time()
        GT_text = result["text"]
        print(f"  STT Result: '{GT_text[:50]}' ({stt_end_time - stt_start_time:.2f} seconds)")

        # --- 3. Load Pitch, duration Sequence ---
        pitch_sequence = np.load(pitch_filepath).tolist() # Assuming it's a simple array/list
        duration_sequence = np.load(duration_filepath).tolist()
        print(f"  Original Pitch Sequence: {pitch_sequence}, {len(pitch_sequence)}")
        print(f"  Original Duration Sequence: {duration_sequence}, {len(duration_sequence)}")
        
        # --- 4. Text Normalization ---
        try:
            normalization_result = lyric_normalizer.normalize_lyrics(
                gt_lyrics=GT_text,
                raw_lyrics=original_lyrics,
                pitch_sequence=pitch_sequence,
                duration_sequence=duration_sequence,
                normalize_spaces=True 
            )

            normalized_lyrics_list = normalization_result.get('normalized_texts', [])
            normalized_lyrics = "".join(normalized_lyrics_list)
            normalized_durations = normalization_result.get('normalized_durations', []) 
            normalized_pitches = normalization_result.get('normalized_pitches', [])
            print(f"  Normalized Lyrics: '{normalized_lyrics}' ({len(normalized_lyrics)})")
            print(f"  Normalized Durations: {normalized_durations}, {len(normalized_durations)}")
            print(f"  Normalized Pitch Sequence: {normalized_pitches}, {len(normalized_pitches)}\n")

        except Exception as e:
            print(f"  Error during text normalization for {original_filename_stem}: {e}")
            normalized_lyrics = original_lyrics # Fallback to original lyrics on error

        # --- 5. Store for new metadata file ---
        new_line_parts = [original_filename_stem]
        if len(parts) > 1: # if there was an original lyric
            new_line_parts.append(normalized_lyrics)
        if len(parts) > 2: # if there were more columns
            new_line_parts.extend(parts[2:])
        
        processed_lines.append("|".join(new_line_parts))

    # --- Write to normalized_metadata.txt ---
    if processed_lines:
        print(f"\nWriting normalized metadata to: {normalized_metadata_filepath}")
        with open(normalized_metadata_filepath, "w", encoding="utf-8") as f_out:
            for line in processed_lines:
                f_out.write(line + "\n")
        print("Done.")
    else:
        print("No lines were processed to write to normalized_metadata.txt.")


------------------------------------------
Loading Whisper model at 2025-06-02 16:26:49.133318
Model loaded in 27.27 seconds
------------------------------------------
----------------------------------------------
Processing: SINGER_16_10TO29_CLEAR_FEMALE_BALLAD_C0632_00
  Original Lyrics: '사아라앙해앤나요오나아보오다도오 내애가아아니인그으녀어와이인느은니이' (35)
  STT Result: ' 사랑했나요 나보다도 내가 아닌 그녀와 있는 이' (1.13 seconds)
  Original Pitch Sequence: [60, 60, 60, 60, 61, 61, 60, 58, 58, 60, 60, 58, 58, 56, 56, 56, 0, 56, 56, 65, 65, 64, 65, 65, 56, 56, 55, 55, 56, 58, 58, 63, 63, 61, 61], 35
  Original Duration Sequence: [9, 14, 9, 9, 9, 9, 9, 9, 19, 9, 5, 14, 5, 9, 14, 42, 51, 9, 5, 14, 9, 9, 14, 9, 9, 14, 9, 9, 9, 9, 19, 9, 9, 9, 28], 35
  Normalized Lyrics: '사랑했나요나보다도 내가아아닌그녀와있는이' (21)
  Normalized Durations: [23, 18, 18, 9, 28, 14, 19, 9, 56, 51, 14, 14, 9, 9, 23, 23, 18, 9, 28, 18, 37], 21
  Normalized Pitch Sequence: [60, 60, 61, 60, 58, 60, 58, 56, 56, 0, 56, 65, 65, 64, 65, 56, 55, 56, 58, 63, 61], 21

---------