In [25]:
import sys
import os

# Agregar la ruta del directorio donde están los módulos
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)


In [26]:
import torch
from audio import audio as Audio #Viene del repo de fastspeech
import pyworld as pw
import numpy as np
import librosa
import tgt
from scipy.interpolate import interp1d

def remove_outlier(values):
    values = np.array(values)
    p25 = np.percentile(values, 25)
    p75 = np.percentile(values, 75)
    lower = p25 - 1.5 * (p75 - p25)
    upper = p75 + 1.5 * (p75 - p25)
    normal_indices = np.logical_and(values > lower, values < upper)

    return values[normal_indices]

def normalize(in_dir, mean, std):
    max_value = np.finfo(np.float64).min
    min_value = np.finfo(np.float64).max
    for filename in os.listdir(in_dir):
        filename = os.path.join(in_dir, filename)
        values = (np.load(filename) - mean) / std
        np.save(filename, values)

        max_value = max(max_value, max(values))
        min_value = min(min_value, min(values))

    return min_value, max_value

#Extraccion de expectogramas de mel
def extract_mel_spectrogram(wav, stft,duration):
    mel_spectrogram, energy = Audio.tools.get_mel_from_wav(wav, stft)
    mel_spectrogram = mel_spectrogram[:, : sum(duration)]
    energy = energy[: sum(duration)]
    return mel_spectrogram, energy

#Extraccion de pitch
def extract_pitch(wav, sampling_rate, hop_length,duration):
    # Usar DIO para obtener pitch
    pitch, t = pw.dio(wav.astype(np.float64), sampling_rate, frame_period=hop_length / sampling_rate * 1000)
    pitch = pw.stonemask(wav.astype(np.float64), pitch, t, sampling_rate)
    pitch = pitch[: sum(duration)]
    return pitch

#Esta función se encarga de extraer 
# las duraciones desde el archivo .TextGrid
def get_alignment(textgridsfile, sampling_rate, hop_length):
    sil_phones = ["sil", "sp", "spn"]

    phones = []
    durations = []
    start_time = 0
    end_time = 0
    end_idx = 0
    
    for t in textgridsfile._objects:
        s, e, p = t.start_time, t.end_time, t.text

        # Trim leading silences (eliminar silencios al principio)
        if phones == []:
            if p in sil_phones:
                continue
            else:
                start_time = s

        if p not in sil_phones:
            # Fonemas ordinarios
            phones.append(p)
            end_time = e
            end_idx = len(phones)
        else:
            # Fonemas silenciosos
            phones.append(p)

        # Calcular las duraciones de los fonemas en frames
        durations.append(
            int(np.round(e * sampling_rate / hop_length) - np.round(s * sampling_rate / hop_length))
        )

    # Eliminar los silencios al final
    phones = phones[:end_idx]
    durations = durations[:end_idx]

    return phones, durations, start_time, end_time

#Preprocesa un audio para obtener las carecateristicas escenciales
#como el Mel, pitch, energy y duration

def process_audio(wav_path,text_path,basename,out_dir,tg_path, sampling_rate, hop_length, stft):
    # Leer el archivo de audio
    wav, _ = librosa.load(wav_path)
    
    # Leer y obtener alineaciones
    textgrid = tgt.io.read_textgrid(tg_path)
    phone, duration, start, end = get_alignment(textgrid.get_tier_by_name("phones"), sampling_rate, hop_length)
    text = "{" + " ".join(phone) + "}"
    if start >= end:
            return None
    # Cortar el audio basado en el tiempo de inicio y fin
    wav = wav[int(sampling_rate * start): int(sampling_rate * end)].astype(np.float32)
    # Read raw text
    with open(text_path, "r") as f:
        raw_text = f.readline().strip("\n")
    # Extraer espectrograma de Mel y energía
    mel_spectrogram, energy = extract_mel_spectrogram(wav,stft,duration)
    
    # Extraer pitch
    pitch = extract_pitch(wav, sampling_rate, hop_length,duration)
    

    #procesamiento de pitch
     # Interpolación y Promedio de Pitch a Nivel de Fonema
    nonzero_ids = np.where(pitch != 0)[0]
    if len(nonzero_ids) > 0:
            interp_fn = interp1d(
            nonzero_ids,
            pitch[nonzero_ids],
            fill_value=(pitch[nonzero_ids[0]], pitch[nonzero_ids[-1]]),
            bounds_error=False,)
            pitch = interp_fn(np.arange(0, len(pitch)))
            # Phoneme-level average
            pos = 0
            for i, d in enumerate(duration):
                if d > 0:
                    pitch[i] = np.mean(pitch[pos : pos + d])
                else:
                    pitch[i] = 0
                pos += d
            pitch = pitch[: len(duration)]
    # Promedio de Energía a Nivel de Fonema
    pos = 0
    for i, d in enumerate(duration):
        if d > 0:
            energy[i] = np.mean(energy[pos : pos + d])
        else:
            energy[i] = 0
            pos += d
    energy = energy[: len(duration)]
    # Guardar los archivos de duración, pitch, energía y espectrograma de Mel
    dur_filename = "{}-duration.npy".format(basename)
    np.save(os.path.join(out_dir, "duration", dur_filename), duration)

    pitch_filename = "{}-pitch.npy".format(basename)
    np.save(os.path.join(out_dir, "pitch", pitch_filename), pitch)

    energy_filename = "{}-energy.npy".format(basename)
    np.save(os.path.join(out_dir, "energy", energy_filename), energy)

    mel_filename = "{}-mel.npy".format(basename)
    np.save(os.path.join(out_dir, "mel", mel_filename), mel_spectrogram.T)
    
    return (
            "|".join([basename, text, raw_text]),
            remove_outlier(pitch),
            remove_outlier(energy),
            mel_spectrogram.shape[1],
        )



In [27]:
#Crear carpetas para guardar las caracteristicas

out_dir = "/home/dereck125/Documentos/dereckpreprocessed"
os.makedirs((os.path.join(out_dir, "mel")), exist_ok=True)
os.makedirs((os.path.join(out_dir, "pitch")), exist_ok=True)
os.makedirs((os.path.join(out_dir, "energy")), exist_ok=True)
os.makedirs((os.path.join(out_dir, "duration")), exist_ok=True)

In [28]:
from sklearn.preprocessing import StandardScaler
import json
import random
sr = 22050
hl = 256
val_size=100
stft = Audio.stft.TacotronSTFT(
    filter_length=1024,
    hop_length=hl,
    win_length=1024,
    n_mel_channels=80,
    sampling_rate=sr,
    mel_fmin=0.0,
    mel_fmax=8000.0,
)

print("Procesando datos ...")
out = list()
n_frames = 0
pitch_scaler = StandardScaler()
energy_scaler = StandardScaler()
#Obtener el pitch, energy, duration y mel-spectogram
wav_directory = "/home/dereck125/Documentos/dataset1/fvoice/"
for wav_name in os.listdir(wav_directory):
    if ".wav" not in wav_name:
        continue
    basename = wav_name.split(".")[0]
    tg_path = os.path.join("/home/dereck125/Documentos/dataset1/dereck_align/","{}.TextGrid".format(basename))
    text_path =os.path.join(wav_directory,"{}.lab".format(basename))
    if os.path.exists(tg_path):
        ret = process_audio(wav_directory+wav_name,text_path,basename,out_dir,tg_path, sr, hl, stft)
        if ret is None:
            continue
        else:
            info, pitch, energy, n = ret
        out.append(info)
    if len(pitch) > 0:
        pitch_scaler.partial_fit(pitch.reshape((-1, 1)))
    if len(energy) > 0:
        energy_scaler.partial_fit(energy.reshape((-1, 1)))

    n_frames += n
print("Computing statistic quantities ...")
#pitch normalization
pitch_mean = pitch_scaler.mean_[0]
pitch_std = pitch_scaler.scale_[0]
energy_mean = energy_scaler.mean_[0]
energy_std = energy_scaler.scale_[0]

pitch_min, pitch_max = normalize(
            os.path.join(out_dir, "pitch"), pitch_mean, pitch_std
        )
energy_min, energy_max = normalize(
            os.path.join(out_dir, "energy"), energy_mean, energy_std
        )

with open(os.path.join(out_dir, "stats.json"), "w") as f:
            stats = {
                "pitch": [
                    float(pitch_min),
                    float(pitch_max),
                    float(pitch_mean),
                    float(pitch_std),
                ],
                "energy": [
                    float(energy_min),
                    float(energy_max),
                    float(energy_mean),
                    float(energy_std),
                ],
            }
            f.write(json.dumps(stats))
print("Total time: {} hours".format(
                n_frames * hl / sr / 3600
            )
        )   

#Crea los conjuntos de train y test 
random.shuffle(out)     

out = [r for r in out if r is not None]

# escribe los metadatos
with open(os.path.join(out_dir, "train.txt"), "w", encoding="utf-8") as f:
    for m in out[val_size :]:
        f.write(m + "\n")
with open(os.path.join(out_dir, "val.txt"), "w", encoding="utf-8") as f:
    for m in out[:val_size]:
        f.write(m + "\n")


Procesando datos ...
Computing statistic quantities ...
Total time: 0.35468803224993706 hours
