In [1]:
%cd "D:\Schoolwork\TERM 3\WORK\visual_prosody"

D:\Schoolwork\TERM 3\WORK\visual_prosody


In [2]:
import os, sys, glob, shutil
import argparse
import json
import yaml
import numpy as np
from pprint import pprint

In [3]:
import logging
import os
import os.path as op

import torch
import torch.nn.functional as F
import numpy as np
import torchaudio
import pandas as pd

In [4]:
from tqdm.notebook import tqdm

# MCD

In [5]:
def antidiag_indices(offset, min_i=0, max_i=None, min_j=0, max_j=None):
    """
    for a (3, 4) matrix with min_i=1, max_i=3, min_j=1, max_j=4, outputs
    offset=2 (1, 1),
    offset=3 (2, 1), (1, 2)
    offset=4 (2, 2), (1, 3)
    offset=5 (2, 3)
    constraints:
        i + j = offset
        min_j <= j < max_j
        min_i <= offset - j < max_i
    """
    if max_i is None:
        max_i = offset + 1
    if max_j is None:
        max_j = offset + 1
    min_j = max(min_j, offset - max_i + 1, 0)
    max_j = min(max_j, offset - min_i + 1, offset + 1)
    j = torch.arange(min_j, max_j)
    i = offset - j
    return torch.stack([i, j])

def batch_dynamic_time_warping(distance, shapes=None):
    """full batched DTW without any constraints
    distance:  (batchsize, max_M, max_N) matrix
    shapes: (batchsize,) vector specifying (M, N) for each entry
    """
    # ptr: 0=left, 1=up-left, 2=up
    ptr2dij = {0: (0, -1), 1: (-1, -1), 2: (-1, 0)}

    bsz, m, n = distance.size()
    cumdist = torch.zeros_like(distance)
    backptr = torch.zeros_like(distance).type(torch.int32) - 1

    # initialize
    cumdist[:, 0, :] = distance[:, 0, :].cumsum(dim=-1)
    cumdist[:, :, 0] = distance[:, :, 0].cumsum(dim=-1)
    backptr[:, 0, :] = 0
    backptr[:, :, 0] = 2

    # DP with optimized anti-diagonal parallelization, O(M+N) steps
    for offset in range(2, m + n - 1):
        ind = antidiag_indices(offset, 1, m, 1, n)
        c = torch.stack(
            [cumdist[:, ind[0], ind[1] - 1], cumdist[:, ind[0] - 1, ind[1] - 1],
             cumdist[:, ind[0] - 1, ind[1]], ],
            dim=2
        )
        v, b = c.min(axis=-1)
        backptr[:, ind[0], ind[1]] = b.int()
        cumdist[:, ind[0], ind[1]] = v + distance[:, ind[0], ind[1]]

    # backtrace
    pathmap = torch.zeros_like(backptr)
    for b in range(bsz):
        i = m - 1 if shapes is None else (shapes[b][0] - 1).item()
        j = n - 1 if shapes is None else (shapes[b][1] - 1).item()
        dtwpath = [(i, j)]
        while (i != 0 or j != 0) and len(dtwpath) < 10000:
            assert (i >= 0 and j >= 0)
            di, dj = ptr2dij[backptr[b, i, j].item()]
            i, j = i + di, j + dj
            dtwpath.append((i, j))
        dtwpath = dtwpath[::-1]
        indices = torch.from_numpy(np.array(dtwpath))
        pathmap[b, indices[:, 0], indices[:, 1]] = 1

    return cumdist, backptr, pathmap


def compute_l2_dist(x1, x2):
    """compute an (m, n) L2 distance matrix from (m, d) and (n, d) matrices"""
    return torch.cdist(x1.unsqueeze(0), x2.unsqueeze(0), p=2).squeeze(0).pow(2)


def compute_rms_dist(x1, x2):
    l2_dist = compute_l2_dist(x1, x2)
    return (l2_dist / x1.size(1)).pow(0.5)


def get_divisor(pathmap, normalize_type):
    if normalize_type is None:
        return 1
    elif normalize_type == "len1":
        return pathmap.size(0)
    elif normalize_type == "len2":
        return pathmap.size(1)
    elif normalize_type == "path":
        return pathmap.sum().item()
    else:
        raise ValueError(f"normalize_type {normalize_type} not supported")


In [6]:
def batch_mel_cepstral_distortion(
        y1, y2, sr, normalize_type="path", mfcc_fn=None
):
    # https://huggingface.co/spaces/OFA-Sys/OFA-Generic_Interface/blob/main/fairseq/fairseq/tasks/text_to_speech.py
    """
    https://arxiv.org/pdf/2011.03568.pdf
    The root mean squared error computed on 13-dimensional MFCC using DTW for
    alignment. MFCC features are computed from an 80-channel log-mel
    spectrogram using a 50ms Hann window and hop of 12.5ms.
    y1: list of waveforms
    y2: list of waveforms
    sr: sampling rate
    """

    try:
        import torchaudio
    except ImportError:
        raise ImportError("Please install torchaudio: pip install torchaudio")

    if mfcc_fn is None or mfcc_fn.sample_rate != sr:
        melkwargs = {
            "n_fft": int(0.05 * sr), "win_length": int(0.05 * sr),
            "hop_length": int(0.0125 * sr), "f_min": 20,
            "n_mels": 80, "window_fn": torch.hann_window
        }
        mfcc_fn = torchaudio.transforms.MFCC(
            sr, n_mfcc=13, log_mels=True, melkwargs=melkwargs
        ).to(y1[0].device)
    return batch_compute_distortion(
        y1, y2, sr, lambda y: mfcc_fn(y).transpose(-1, -2), compute_rms_dist,
        normalize_type
    )

    
def batch_compute_distortion(y1, y2, sr, feat_fn, dist_fn, normalize_type):
    d, s, x1, x2 = [], [], [], []
    for cur_y1, cur_y2 in zip(y1, y2):
        assert (cur_y1.ndim == 1 and cur_y2.ndim == 1)
        cur_x1 = feat_fn(cur_y1)
        cur_x2 = feat_fn(cur_y2)
        x1.append(cur_x1)
        x2.append(cur_x2)

        cur_d = dist_fn(cur_x1, cur_x2)
        d.append(cur_d)
        s.append(d[-1].size())
    max_m = max(ss[0] for ss in s)
    max_n = max(ss[1] for ss in s)
    d = torch.stack(
        [F.pad(dd, (0, max_n - dd.size(1), 0, max_m - dd.size(0))) for dd in d]
    )
    s = torch.LongTensor(s).to(d.device)
    cumdists, backptrs, pathmaps = batch_dynamic_time_warping(d, s)

    rets = []
    itr = zip(s, x1, x2, d, cumdists, backptrs, pathmaps)
    for (m, n), cur_x1, cur_x2, dist, cumdist, backptr, pathmap in itr:
        cumdist = cumdist[:m, :n]
        backptr = backptr[:m, :n]
        pathmap = pathmap[:m, :n]
        divisor = get_divisor(pathmap, normalize_type)

        distortion = cumdist[-1, -1] / divisor
        ret = distortion, (cur_x1, cur_x2, dist, cumdist, backptr, pathmap)
        rets.append(ret)
    return rets

In [7]:
split_txt_val_path = r'.\preprocessed_data\LJSpeech\val.txt'
val_uids = []
with open(split_txt_val_path) as file:
    for line in file:
        # print(line.split('|')[0])
        val_uids.append(line.split('|')[0])

In [8]:
len(val_uids)

512

In [39]:
ref_folder = r'.\output\0629lj\result\LJSpeech\wav\reconstructed'
syn_folder = r'.\output\0629lj\result\LJSpeech\wav\synthesized'

In [42]:
uids = []
MCDs = []

In [43]:
for uid in tqdm(val_uids):
    uids.append(uid)
    wav_1, sr = torchaudio.load(op.join(ref_folder, f"{uid}.wav"))
    wav_2, sr = torchaudio.load(op.join(syn_folder, f"{uid}.wav"))
    wav_1 = wav_1.squeeze()
    wav_2 = wav_2.squeeze()
    results = batch_mel_cepstral_distortion(
            [wav_1],
            [wav_2],
            sr=sr,
    )
    MCDs.append(results[0][0].item())

  0%|          | 0/512 [00:00<?, ?it/s]

In [45]:
lj_mcd_df = pd.DataFrame({
    'uid': uids,
    'MCD': MCDs,
})

In [47]:
lj_mcd_df.to_csv(r".\jupyter_walkthrough\metrics\MCD_LJ_recon.csv")

In [52]:
print(f"MCD mean on LJSpeech: {torch.tensor(MCDs).mean()}")
print(f"MCD std on LJSpeech: {torch.tensor(MCDs).std()}")

MCD mean on LJSpeech: 1.684244155883789
MCD std on LJSpeech: 0.253206342458725


In [58]:
import pyworld as pw
import matplotlib.pyplot as plt

In [None]:
def world_extract(
    x: np.ndarray,
    fs: int,
    f0min: int = 40,
    f0max: int = 800,
    n_fft: int = 512,
    n_shift: int = 256,
    mcep_dim: int = 13,
    mcep_alpha: float = 0.41,
) -> np.ndarray:
    """Extract World-based acoustic features.

    Args:
        x (ndarray): 1D waveform array.
        fs (int): Minimum f0 value (default=40).
        f0 (int): Maximum f0 value (default=800).
        n_shift (int): Shift length in point (default=256).
        n_fft (int): FFT length in point (default=512).
        n_shift (int): Shift length in point (default=256).
        mcep_dim (int): Dimension of mel-cepstrum (default=25).
        mcep_alpha (float): All pass filter coefficient (default=0.41).

    Returns:
        ndarray: Mel-cepstrum with the size (N, n_fft).
        ndarray: F0 sequence (N,).

    """
    # extract features
    x = x.astype(np.float64)
    # f0, time_axis = pw.harvest(
    #     x,
    #     fs,
    #     f0_floor=f0min,
    #     f0_ceil=f0max,
    #     frame_period=n_shift / fs * 1000,
    # )
    pitch, t = pw.dio(
        x,
        fs,
         frame_period=n_shift / fs * 1000,
        )
    sp = pw.cheaptrick(x, f0, time_axis, fs, fft_size=n_fft)
    if mcep_dim is None or mcep_alpha is None:
        mcep_dim, mcep_alpha = _get_best_mcep_params(fs)
    mcep = pysptk.sp2mc(sp, 13, mcep_alpha)

    return mcep, f0


def _get_basename(path: str) -> str:
    return os.path.splitext(os.path.split(path)[-1])[0]


def _get_best_mcep_params(fs: int) -> Tuple[int, float]:
    if fs == 16000:
        return 23, 0.42
    elif fs == 22050:
        return 34, 0.45
    elif fs == 24000:
        return 34, 0.46
    elif fs == 44100:
        return 39, 0.53
    elif fs == 48000:
        return 39, 0.55
    else:
        raise ValueError(f"Not found the setting for {fs}.")

In [1]:
%cd "D:\Schoolwork\TERM 3\WORK\visual_prosody"

D:\Schoolwork\TERM 3\WORK\visual_prosody


In [57]:
import os, sys, glob, shutil
import argparse
import json
import yaml
import numpy as np
from pprint import pprint
import logging
import os
import os.path as op

import torch
import torch.nn.functional as F
import numpy as np
import torchaudio
import pandas as pd

from tqdm.notebook import tqdm

In [3]:
import argparse
import fnmatch
import logging
import multiprocessing as mp
import os
from typing import Dict, List, Tuple

import librosa
import numpy as np
import pysptk
import pyworld as pw
import soundfile as sf
from fastdtw import fastdtw
from scipy import spatial

In [10]:
# ref_folder = r'.\output\0629lj\result\LJSpeech\mel\gt'
# syn_folder = r'.\output\0629lj\result\LJSpeech\mel\syn'
ref_folder = r'.\output\0629lj\result\LJSpeech\wav\reconstructed'
syn_folder = r'.\output\0629lj\result\LJSpeech\wav\synthesized'

In [11]:
split_txt_val_path = r'.\preprocessed_data\LJSpeech\val.txt'
val_uids = []
with open(split_txt_val_path) as file:
    for line in file:
        # print(line.split('|')[0])
        val_uids.append(line.split('|')[0])

In [6]:
uid = val_uids[0]
# gt_mcep = torch.load(op.join(ref_folder, f"{uid}.pt")).numpy()
# gen_mcep = torch.load(op.join(syn_folder, f"{uid}.pt")).numpy()
# print(gt_mcep.shape, gen_mcep.shape)

(80, 691) (80, 691)


In [13]:
sr = 22050
melkwargs = {
    "n_fft": int(0.05 * sr), "win_length": int(0.05 * sr),
    "hop_length": int(0.0125 * sr), "f_min": 20,
    "n_mels": 80, "window_fn": torch.hann_window
}
mfcc_fn = torchaudio.transforms.MFCC(
    sr, n_mfcc=13, log_mels=True, melkwargs=melkwargs
)

In [26]:
wav_1, sr = torchaudio.load(op.join(ref_folder, f"{uid}.wav"))
wav_2, sr = torchaudio.load(op.join(syn_folder, f"{uid}.wav"))
wav_1 = wav_1.squeeze()
wav_2 = wav_2.squeeze()

In [47]:
mel_1 = mfcc_fn(wav_1).T.numpy()
mel_2 = mfcc_fn(wav_2).T.numpy()

In [48]:
mel_1.shape, mel_2.shape

((644, 13), (644, 13))

In [49]:
# DTW
_, path = fastdtw(mel_2, mel_1, dist=spatial.distance.euclidean)
twf = np.array(path).T
mel_2 = mel_2[twf[0]]
mel_1 = mel_1[twf[1]]

In [63]:
# We sum the squared differences over the first K MFCCs, skipping ct,0
mel_1 = mel_1[:, 1:]
mel_2 = mel_2[:, 1:]

In [66]:
mel_2.shape

(656, 12)

In [69]:
(((mel_1 - mel_2) ** 2).sum(axis=1)**0.5).mean()

5.2226214

In [71]:
uids = []
MCDs = []

In [72]:
for uid in tqdm(val_uids):
    uids.append(uid)
    wav_1, sr = torchaudio.load(op.join(ref_folder, f"{uid}.wav"))
    wav_2, sr = torchaudio.load(op.join(syn_folder, f"{uid}.wav"))
    wav_1 = wav_1.squeeze()
    wav_2 = wav_2.squeeze()
    mel_1 = mfcc_fn(wav_1).T.numpy()
    mel_2 = mfcc_fn(wav_2).T.numpy()
    # DTW
    _, path = fastdtw(mel_2, mel_1, dist=spatial.distance.euclidean)
    twf = np.array(path).T
    mel_2 = mel_2[twf[0]]
    mel_1 = mel_1[twf[1]]
    # We sum the squared differences over the first K MFCCs, skipping ct,0
    mel_1 = mel_1[:, 1:]
    mel_2 = mel_2[:, 1:]
    result = (((mel_1 - mel_2) ** 2).sum(axis=1)**0.5).mean()
    MCDs.append(result)

  0%|          | 0/512 [00:00<?, ?it/s]

In [73]:
import pandas as pd

lj_mcd_df = pd.DataFrame({
    'uid': uids,
    'MCD': MCDs,
})

In [74]:
lj_mcd_df.to_csv(r".\jupyter_walkthrough\metrics\MCD_LJ.csv")

In [75]:
print(f"MCD mean on LJSpeech: {torch.tensor(MCDs).mean()}")
print(f"MCD std on LJSpeech: {torch.tensor(MCDs).std()}")

MCD mean on LJSpeech: 5.480271339416504
MCD std on LJSpeech: 0.7322819232940674


# log f0

In [76]:
# https://github.com/espnet/espnet/blob/3e0dad524d62ccd45e067e9b36049f2214ea972a/egs2/TEMPLATE/asr1/pyscripts/utils/evaluate_f0.py

In [77]:
def world_extract(
    x: np.ndarray,
    fs: int,
    f0min: int = 40,
    f0max: int = 800,
    n_fft: int = 512,
    n_shift: int = 256,
    mcep_dim: int = 25,
    mcep_alpha: float = 0.41,
) -> np.ndarray:
    """Extract World-based acoustic features.

    Args:
        x (ndarray): 1D waveform array.
        fs (int): Minimum f0 value (default=40).
        f0 (int): Maximum f0 value (default=800).
        n_shift (int): Shift length in point (default=256).
        n_fft (int): FFT length in point (default=512).
        n_shift (int): Shift length in point (default=256).
        mcep_dim (int): Dimension of mel-cepstrum (default=25).
        mcep_alpha (float): All pass filter coefficient (default=0.41).

    Returns:
        ndarray: Mel-cepstrum with the size (N, n_fft).
        ndarray: F0 sequence (N,).

    """
    # extract features
    x = x.astype(np.float64)
    f0, time_axis = pw.harvest(
        x,
        fs,
        f0_floor=f0min,
        f0_ceil=f0max,
        frame_period=n_shift / fs * 1000,
    )
    sp = pw.cheaptrick(x, f0, time_axis, fs, fft_size=n_fft)
    if mcep_dim is None or mcep_alpha is None:
        mcep_dim, mcep_alpha = _get_best_mcep_params(fs)
    mcep = pysptk.sp2mc(sp, mcep_dim, mcep_alpha)

    return mcep, f0


def _get_basename(path: str) -> str:
    return os.path.splitext(os.path.split(path)[-1])[0]


def _get_best_mcep_params(fs: int) -> Tuple[int, float]:
    if fs == 16000:
        return 23, 0.42
    elif fs == 22050:
        return 34, 0.45
    elif fs == 24000:
        return 34, 0.46
    elif fs == 44100:
        return 39, 0.53
    elif fs == 48000:
        return 39, 0.55
    else:
        raise ValueError(f"Not found the setting for {fs}.")

In [87]:
uids = []
logf0_rmses = []
logf0_corrs = []

In [88]:
for uid in tqdm(val_uids):
    uids.append(uid)
    # load wav file as int16
    gen_x, gen_fs = sf.read(op.join(syn_folder, f"{uid}.wav"), dtype="int16")
    gt_x, gt_fs = sf.read(op.join(ref_folder, f"{uid}.wav"), dtype="int16")
    fs = gen_fs
    # extract ground truth and converted features
    gen_mcep, gen_f0 = world_extract(
        x=gen_x,
        fs=fs,
        f0min=40,
        f0max=800,
        n_fft=1024,
        n_shift=256,
        mcep_dim=None,
        mcep_alpha=None,
    )
    gt_mcep, gt_f0 = world_extract(
        x=gt_x,
        fs=fs,
        f0min=40,
        f0max=800,
        n_fft=1024,
        n_shift=256,
        mcep_dim=None,
        mcep_alpha=None,
    )
    
    # DTW
    _, path = fastdtw(gen_mcep, gt_mcep, dist=spatial.distance.euclidean)
    twf = np.array(path).T
    gen_f0_dtw = gen_f0[twf[0]]
    gt_f0_dtw = gt_f0[twf[1]]
    
    # Get voiced part
    nonzero_idxs = np.where((gen_f0_dtw != 0) & (gt_f0_dtw != 0))[0]
    gen_f0_dtw_voiced = np.log(gen_f0_dtw[nonzero_idxs])
    gt_f0_dtw_voiced = np.log(gt_f0_dtw[nonzero_idxs])

    # log F0 RMSE
    log_f0_rmse = np.sqrt(np.mean((gen_f0_dtw_voiced - gt_f0_dtw_voiced) ** 2))
    # print(f"{uid} {log_f0_rmse:.4f}")

    # log F0 corr
    log_f0_corr = np.corrcoef(gen_f0_dtw_voiced, gt_f0_dtw_voiced)[0][1]
    # print(f"{uid} {log_f0_corr:.4f}")

    logf0_rmses.append(log_f0_rmse)
    logf0_corrs.append(log_f0_corr)



  0%|          | 0/512 [00:00<?, ?it/s]

In [89]:
lj_logf0_df = pd.DataFrame({
    'uid': uids,
    'logf0_rmse': logf0_rmses,
    'logf0_corr': logf0_corrs,
})
lj_logf0_df.to_csv(r".\jupyter_walkthrough\metrics\logF0_LJ.csv")

In [90]:
print(f"logf0_rmse mean on LJSpeech: {torch.tensor(logf0_rmses).mean()}")
print(f"logf0_rmse std on LJSpeech: {torch.tensor(logf0_rmses).std()}")

print(f"logf0_corr mean on LJSpeech: {torch.tensor(logf0_corrs).mean()}")
print(f"logf0_corr std on LJSpeech: {torch.tensor(logf0_corrs).std()}")

logf0_rmse mean on LJSpeech: 0.2128476233348171
logf0_rmse std on LJSpeech: 0.07755068307229493
logf0_corr mean on LJSpeech: 0.7322283435658924
logf0_corr std on LJSpeech: 0.16015679184393214


In [None]:
def calculate(
    file_list: List[str],
    gt_file_list: List[str],
    args: argparse.Namespace,
    f0_rmse_dict: Dict[str, float],
):
    """Calculate log-F0 RMSE."""
    for i, gen_path in enumerate(file_list):
        corresponding_list = list(
            filter(lambda gt_path: _get_basename(gt_path) in gen_path, gt_file_list)
        )
        assert len(corresponding_list) == 1
        gt_path = corresponding_list[0]
        gt_basename = _get_basename(gt_path)

        # load wav file as int16
        gen_x, gen_fs = sf.read(gen_path, dtype="int16")
        gt_x, gt_fs = sf.read(gt_path, dtype="int16")

        fs = gen_fs
        if gen_fs != gt_fs:
            gt_x = librosa.resample(gt_x.astype(np.float), gt_fs, gen_fs)

        # extract ground truth and converted features
        gen_mcep, gen_f0 = world_extract(
            x=gen_x,
            fs=fs,
            f0min=args.f0min,
            f0max=args.f0max,
            n_fft=args.n_fft,
            n_shift=args.n_shift,
            mcep_dim=args.mcep_dim,
            mcep_alpha=args.mcep_alpha,
        )
        gt_mcep, gt_f0 = world_extract(
            x=gt_x,
            fs=fs,
            f0min=args.f0min,
            f0max=args.f0max,
            n_fft=args.n_fft,
            n_shift=args.n_shift,
            mcep_dim=args.mcep_dim,
            mcep_alpha=args.mcep_alpha,
        )

        # DTW
        _, path = fastdtw(gen_mcep, gt_mcep, dist=spatial.distance.euclidean)
        twf = np.array(path).T
        gen_f0_dtw = gen_f0[twf[0]]
        gt_f0_dtw = gt_f0[twf[1]]

        # Get voiced part
        nonzero_idxs = np.where((gen_f0_dtw != 0) & (gt_f0_dtw != 0))[0]
        gen_f0_dtw_voiced = np.log(gen_f0_dtw[nonzero_idxs])
        gt_f0_dtw_voiced = np.log(gt_f0_dtw[nonzero_idxs])

        # log F0 RMSE
        log_f0_rmse = np.sqrt(np.mean((gen_f0_dtw_voiced - gt_f0_dtw_voiced) ** 2))
        logging.info(f"{gt_basename} {log_f0_rmse:.4f}")
        f0_rmse_dict[gt_basename] = log_f0_rmse

In [None]:
for uid in tqdm(val_uids):
    uids.append(uid)
            # load wav file as int16
    gen_x, gen_fs = sf.read(gen_path, dtype="int16")
    gt_x, gt_fs = sf.read(gt_path, dtype="int16")
    wav_1, sr = torchaudio.load(op.join(ref_folder, f"{uid}.wav"))
    wav_2, sr = torchaudio.load(op.join(syn_folder, f"{uid}.wav"))
    wav_1 = wav_1.squeeze()
    wav_2 = wav_2.squeeze()
    mel_1 = mfcc_fn(wav_1).T.numpy()
    mel_2 = mfcc_fn(wav_2).T.numpy()
    # DTW
    _, path = fastdtw(mel_2, mel_1, dist=spatial.distance.euclidean)
    twf = np.array(path).T
    mel_2 = mel_2[twf[0]]
    mel_1 = mel_1[twf[1]]
    # We sum the squared differences over the first K MFCCs, skipping ct,0
    mel_1 = mel_1[:, 1:]
    mel_2 = mel_2[:, 1:]
    result = (((mel_1 - mel_2) ** 2).sum(axis=1)**0.5).mean()
    MCDs.append(result)