In [None]:
from paths import (LJLISTS_DIR, LOGS_DIR, 
                  WAVS_DIR, ENCODED_AUDIO_EN_DIR,
                  DATA_DIR) 

version = "v1"
grad_filename = "grad_4750.pt"
version_dir = LOGS_DIR / version

In [None]:
import torch
from model import GradTTS
from configs import params_v1

model = GradTTS(
        params_v1.n_ipa_feats,
        params_v1.n_spks,
        None if params_v1.n_spks == 1 else params_v1.spk_emb_dim, #spk_emb_dim
        params_v1.n_enc_channels,
        params_v1.filter_channels,
        params_v1.filter_channels_dp,
        params_v1.n_heads,
        params_v1.n_enc_layers,
        params_v1.enc_kernel,
        params_v1.enc_dropout,
        params_v1.window_size,
        params_v1.n_feats,
        params_v1.dec_dim,
        params_v1.beta_min,
        params_v1.beta_max,
        params_v1.pe_scale,
    )

ckpt_state_dict = torch.load(version_dir / grad_filename,
                  map_location=torch.device('cpu'))
model.load_state_dict(ckpt_state_dict)

In [None]:
from data_phnm import PhnmArticDataset, PhnmArticBatchCollate, PhnmBatchCollate
from torch.utils.data import DataLoader

batch_size = 2

train_dataset = PhnmArticDataset(
        params_v1.train_filelist_path,
        data_root_dir=DATA_DIR,
        load_coder=False,
        merge_diphtongues=params_v1.merge_diphtongues,
    )

valid_dataset = PhnmArticDataset(
    params_v1.valid_filelist_path,
    data_root_dir=DATA_DIR,
    load_coder=False,
    merge_diphtongues=params_v1.merge_diphtongues,
)


train_dataset.filepaths_list #= train_dataset.filepaths_list[:10]
valid_dataset.filepaths_list #= valid_dataset.filepaths_list[:10]
print("train_size", len(train_dataset), "valid_size", len(valid_dataset))

batch_collate = PhnmArticBatchCollate()
train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=params_v1.batch_size,
    collate_fn=batch_collate,
    drop_last=True,
    num_workers=3,
    shuffle=False,
)
val_loader = DataLoader(
    dataset=valid_dataset,
    batch_size=params_v1.batch_size,
    collate_fn=batch_collate,
    drop_last=False,
    num_workers=3,
    shuffle=False,
)

## arttts_inference

In [None]:
ema_dir = DATA_DIR / "LJSpeech-1.1" / "encoded_audio_en" / "emasrc"
avail_list = list(ema_dir.glob('*'))
avail_list = [x.name[:-4] for x in avail_list]
val_avail_samples = set(avail_list).intersection(set([e[0].split("/")[-1][:-4] for e in valid_dataset.filepaths_list]))
print("val_avail_samples", len(val_avail_samples))

In [None]:
from configs import params_v1
import numpy as np

reorder_feats = params_v1.reorder_feats

dataset = valid_dataset
filepaths_list = dataset.filepaths_list[:2]
batch_size = 2
collator = PhnmBatchCollate()

#model.eval()
#with torch.no_grad():
#    for i in range(0, len(filepaths_list), batch_size):
#        batch_filepaths = filepaths_list[i:i + batch_size]
#        phnm3_filepaths = [fp[1] for fp in batch_filepaths]
#        phnm_embs = [{"x" : dataset.get_phnm_emb(phnm3_fp)} 
#                     for phnm3_fp in phnm3_filepaths]
#        batch = collator(phnm_embs)
#        x = batch['x'].to(torch.float32)
#        x_lengths = batch['x_lengths']
#        y_enc, y_dec, attn = model(x, x_lengths, n_timesteps=50)  # (B, 16, T) x 2 , (B,1,T0,T)
#        print("y_enc.shape", y_enc.shape, "y_dec.shape", y_dec.shape, "attn.shape", attn.shape)
#        y_enc_14 = y_enc[:, reorder_feats, :].detach().cpu()
#        y_dec_14 = y_dec[:, reorder_feats, :].detach().cpu()
#        print("y_enc_14.shape", y_enc_14.shape, "y_dec_14.shape", y_dec_14.shape)
#        for j, (filepath, y_enc_j, y_dec_j) in enumerate(zip(batch_filepaths, y_enc_14, y_dec_14)):
#            #save_path = save_dir / f"{filepath[0].split('/')[-1][:-4]}.npy"
#            y_enc_dec_j = np.array([y_enc_j.numpy(),
#                                    y_dec_j.numpy()]) # (2, 14, T)
#            #print(f"Saved {save_path}")

    #for filepaths in tqdm(filepaths_list, desc="Processing files"):
    #    phnm3_filepath = filepaths[1]
    #    phnm_emb = dataset.get_phnm_emb(phnm3_filepath)
    #    print("phnm_emb.shape", phnm_emb.shape)
    #    x = phnm_emb.to(torch.float32).unsqueeze(0)  # Add batch dimension
    #    x_lengths = torch.LongTensor([x.shape[-1]])
    #    y_enc, y_dec, attn = model(x, x_lengths, n_timesteps=50)  # (1, n_feats, T) x 2 , (1,1,T0,T)

## arttts results analysis

In [None]:
results_dir = DATA_DIR / "LJSpeech-1.1" / "arttts_pred" / version / grad_filename[:-3]
ema_src_dir = DATA_DIR / "LJSpeech-1.1" / "encoded_audio_en" / "emasrc"

reslist = sorted(list(results_dir.glob("*.npy")))
print("Files in reslist", len(reslist))

In [None]:
def normalize_pitch_channel(art: np.ndarray, pitch_idx=12) -> np.ndarray:
    """
    Normalize the pitch channel to have zero mean and unit variance.
    must be called after reordering the features.
    """
    std = np.std(art[:, pitch_idx])
    if std > 0:
        art[:, pitch_idx] = (
            art[:, pitch_idx] - np.mean(art[:, pitch_idx])
        ) / np.std(art[:, pitch_idx])
    else:
        print("Zero variance in pitch channel. Centering to zero mean.")
        art[:, pitch_idx] = art[:, pitch_idx] - np.mean(
            art[:, pitch_idx]
        )
    return art

In [None]:
fp = reslist[1]
art_res = np.load(fp)
print("art_res.shape", art_res.shape)
y_enc_14 = art_res[0].T  # (T, 14)
y_dec_14 = art_res[1].T  # (T, 14)
print("y_enc_14.shape", y_enc_14.shape, "y_dec_14.shape", y_dec_14.shape)
y_gt_ = np.load(ema_src_dir / fp.name)[:,:14]
y_gt = normalize_pitch_channel(y_gt_, pitch_idx=12)
print("y_gt.shape", y_gt.shape)

In [None]:
import matplotlib.pyplot as plt
from metrics import signals_from_path, normalized_dtw_score
import numpy as np
from utils import plot_art_14, plot_tensor

fig1, im_data = plot_art_14([y_gt.T], title = "y_gt",)
fig2, im_data = plot_art_14([y_enc_14.T], title = "y_enc_14",)
fig3, im_data = plot_art_14([y_dec_14.T], title = "y_dec_14",)

fig, axes = plt.subplots(1, 3, figsize=(12, 10))

axes[0].imshow(fig1.canvas.renderer.buffer_rgba())
axes[1].imshow(fig2.canvas.renderer.buffer_rgba())
axes[2].imshow(fig3.canvas.renderer.buffer_rgba())

plt.tight_layout()
plt.show()

In [None]:
art_feats = np.array([y_enc_14.T, y_dec_14.T])

fig, im_data = plot_art_14(art_feats, title="y_enc, y_dec")
fig

In [None]:
fig, im_data = plot_art_14(np.array([y_gt.T]), title="y_gt")
fig

### DTW on enc and dec articulatory features

In [None]:
dist_gt_enc, y_gt_enc_ada, y_enc_14_ada = normalized_dtw_score(y_gt, y_enc_14)
dist_gt_dec, y_gt_dec_ada, y_dec_14_ada = normalized_dtw_score(y_gt, y_dec_14)

print("dist_gt_enc", dist_gt_enc, "dist_gt_dec", dist_gt_dec)

In [None]:
fig, im_data = plot_art_14([y_gt_enc_ada.T,
            y_enc_14_ada.T,],
            title = "y_gt vs y_enc",)
fig

In [None]:
fig, im_data = plot_art_14([y_gt_dec_ada.T,
            y_dec_14_ada.T,],
            title = "y_gt vs y_dec",)
fig

### DTW on smoothened articulatory features

In [None]:
def smooth_multivariate_signal(signal: np.ndarray, window_size: int = 5) -> np.ndarray:
    """
    Smooth a multivariate signal using a moving average.

    Parameters:
    - signal: numpy array of shape (T, d), where T is the number of timesteps and d is the number of features.
    - window_size: size of the moving average window.

    Returns:
    - smoothed_signal: numpy array of the same shape as input.
    """
    kernel = np.ones(window_size) / window_size
    smoothed_signal = np.apply_along_axis(
        lambda x: np.convolve(x, kernel, mode='same'), axis=0, arr=signal
    )
    return smoothed_signal


In [None]:
y_dec_14_smoothed = smooth_multivariate_signal(y_dec_14, window_size=3)
dist_gt_dec, y_gt_dec_ada, y_dec_14_ada = normalized_dtw_score(y_gt, y_dec_14_smoothed)
y_enc_14_smoothed = smooth_multivariate_signal(y_enc_14, window_size=3)
dist_gt_enc, y_gt_enc_ada, y_enc_14_ada = normalized_dtw_score(y_gt, y_enc_14_smoothed)

print("dist_gt_enc (smoothed)", dist_gt_enc, "dist_gt_dec (smoothed)", dist_gt_dec)

In [None]:
fig, im_data = plot_art_14([y_gt_enc_ada.T,
            y_enc_14_ada.T,],
            title = "y_gt vs y_enc",)
fig

In [None]:
fig, im_data = plot_art_14([y_gt_dec_ada.T,
            y_dec_14_ada.T,],
            title = "y_gt vs y_dec",)
fig

## Normal dtw

In [None]:
from metrics import signals_from_path
from tslearn.metrics import dtw_path

In [None]:
enc_scores = []
dec_scores = []

for idx in range(10):
    y_gt = data[idx]["y_gt"]
    y_enc_14 = data[idx]["y_enc"]
    y_dec_14 = data[idx]["y_dec"]
    path_gt_enc, dist_gt_enc = dtw_path(y_gt, y_enc_14)
    path_gt_dec, dist_gt_dec = dtw_path(y_gt, y_dec_14)
    y_gt_enc_ada, y_enc_14_ada = signals_from_path(y_gt, y_enc_14, path_gt_enc)
    y_gt_dec_ada, y_dec_14_ada = signals_from_path(y_gt, y_dec_14, path_gt_dec)
    enc_scores.append(dist_gt_enc)
    dec_scores.append(dist_gt_dec)

plt.plot(enc_scores, label="y_enc")
plt.plot(dec_scores, label="y_dec")
plt.xlabel("Sample index")
plt.ylabel("DTW distance")
plt.title("DTW distance between GT and predicted features")
plt.legend()
plt.show()


In [None]:
idx=0
y_gt = data[idx]["y_gt"]
y_enc_14 = data[idx]["y_enc"]
y_dec_14 = data[idx]["y_dec"]

In [None]:
path_gt_enc, dist_gt_enc = dtw_path(y_gt, y_enc_14)
path_gt_dec, dist_gt_dec = dtw_path(y_gt, y_dec_14)

print("dist_gt_enc", dist_gt_enc, "dist_gt_dec", dist_gt_dec)

y_gt_enc_ada, y_enc_14_ada = signals_from_path(y_gt, y_enc_14, path_gt_enc)
y_gt_dec_ada, y_dec_14_ada = signals_from_path(y_gt, y_dec_14, path_gt_dec)

fid, ax = plt.subplots(1,2, figsize=(10, 3))
ax[0].plot(y_gt_enc_ada[:,0], label="y_gt_ada")
ax[0].plot(y_enc_14_ada[:,0], label="y_enc_14_ada")

ax[1].plot(y_gt_dec_ada[:,0], label="y_gt_ada")
ax[1].plot(y_dec_14_ada[:,0], label="y_dec_14_ada");

In [None]:
plot_art_14(np.array([y_gt_enc_ada,
                      y_enc_14_ada,]), title="y_gt_adapted, y_enc_14_adapted")
plot_art_14(np.array([y_gt_dec_ada,
                      y_dec_14_ada,]), title="y_gt_adapted, y_dec_14_adapted")

## Wavelet dtw

In [None]:
import pywt
from tslearn.metrics import dtw_path_from_metric


def wavelet_dtw_path(s1: np.ndarray, s2: np.ndarray, wavelet: str = "db4") -> list[tuple[int, int]]:
    """
    Compute the DTW path between two signals using wavelet transform.
    
    Parameters:
        - s1: First signal (shape: [n_frames, n_features])
        - s2: Second signal (shape: [n_frames, n_features])
        - wavelet: Wavelet type to use for the transform
    
    Returns:
        - path_s1_s2: List of tuples representing the DTW path
                    (i, j) where i is the index in s1 and j in s2.
        - distance_s1_s2: The DTW distance between the two signals
    """
    # ----- Step 1: Create multivariate signals -----
    X = s1.T
    Y = s2.T

    X = (X - X.mean(axis=1, keepdims=True)) / (X.std(axis=1, keepdims=True) + 1e-6)
    Y = (Y - Y.mean(axis=1, keepdims=True)) / (Y.std(axis=1, keepdims=True) + 1e-6)

    # ----- Step 2: Wavelet Approximation -----
    def wavelet_multichannel_approx(data, wavelet=wavelet, level=0):
        """Apply wavelet decomposition per channel and return approximation."""
        return np.array([
            pywt.wavedec(channel, wavelet, mode='constant', level=level)[0]
            for channel in data
        ])

    X_approx = wavelet_multichannel_approx(np.diff(X, axis=1, n=0))
    Y_approx = wavelet_multichannel_approx(np.diff(Y, axis=1, n=0))

    # ----- Step 3: Reshape for DTW -----
    # Transpose to shape: [n_timesteps, n_channels]
    X_seq = X_approx.T
    Y_seq = Y_approx.T

    # ----- Step 4: Define multivariate distance function -----
    def multivariate_dtw_distance(a, b):
        return np.linalg.norm(a - b)

    # Build custom cost matrix
    n, m = X_seq.shape[0], Y_seq.shape[0]
    cost = np.zeros((n, m))
    for i in range(n):
        for j in range(m):
            cost[i, j] = multivariate_dtw_distance(X_seq[i], Y_seq[j])
    
    # ----- Step 5: Compute DTW alignment path -----
    accumulated_cost = np.zeros((n, m))
    accumulated_cost[0, 0] = cost[0, 0]
    for i in range(1, n):
        accumulated_cost[i, 0] = cost[i, 0] + accumulated_cost[i-1, 0]
    for j in range(1, m):
        accumulated_cost[0, j] = cost[0, j] + accumulated_cost[0, j-1]
    for i in range(1, n):
        for j in range(1, m):
            accumulated_cost[i, j] = cost[i, j] + min(
                accumulated_cost[i-1, j],
                accumulated_cost[i, j-1],
                accumulated_cost[i-1, j-1]
            )
    path_s1_s2, distance_s1_s2 = dtw_path_from_metric(accumulated_cost, metric='precomputed',
                                                        global_constraint="sakoe_chiba", sakoe_chiba_radius=50)
    return path_s1_s2, distance_s1_s2


In [None]:
from tslearn.metrics import dtw_path

sakoe_rad = 5  # Sakoe-Chiba radius for DTW

# Perform DTW between y_enc_14 and y_dec_14
path_enc_dec, distance_enc_dec = dtw_path(y_enc_14, y_dec_14, global_constraint="sakoe_chiba", sakoe_chiba_radius=sakoe_rad)

# Perform DTW between y_gt and y_dec_14
path_gt_dec, distance_gt_dec = dtw_path(y_gt[:,:14], y_dec_14[:,:14], global_constraint="sakoe_chiba", sakoe_chiba_radius=sakoe_rad)

# Perform DTW between y_gt and y_enc_14
path_gt_enc, distance_gt_enc = dtw_path(y_gt[:,:14], y_enc_14[:,:14], global_constraint="sakoe_chiba", sakoe_chiba_radius=sakoe_rad)

# Print the distances
print("DTW distance (y_enc_14, y_dec_14):", distance_enc_dec)
print("DTW distance (y_gt, y_enc_14):", distance_gt_enc)
print("DTW distance (y_gt, y_dec_14):", distance_gt_dec)
# At first sight better score on enc than dec

In [None]:
y_enc_14_adapted = signal_from_path(y_enc_14, path_gt_enc)
y_dec_14_adapted = signal_from_path(y_dec_14, path_gt_dec)

# Verify the shapes
print("y_enc_14_adapted shape:", y_enc_14_adapted.shape)
print("y_dec_14_adapted shape:", y_dec_14_adapted.shape)

In [None]:
art_feats = np.array([y_gt[[e[0] for e in path_gt_dec]],
                      y_dec_14_adapted,])

plot_art_14(art_feats, title="y_dec, y_gt")

In [None]:
art_feats = np.array([y_gt[[e[0] for e in path_gt_enc]],
                      y_enc_14_adapted,])

plot_art_14(art_feats, title="y_enc, y_gt")

In [None]:
from scipy.ndimage import gaussian_filter1d

def gaussian_smooth_signal(signal, sigma=1):
    """
    Smoothens the signal using a Gaussian kernel.

    Parameters:
    - signal: numpy array of shape (n_timesteps, n_channels)
    - sigma: standard deviation for Gaussian kernel

    Returns:
    - smoothed_signal: numpy array of the same shape as input
    """
    smoothed_signal = np.zeros_like(signal)
    for i in range(signal.shape[1]):  # Iterate over channels
        smoothed_signal[:, i] = gaussian_filter1d(signal[:, i], sigma=sigma)
    return smoothed_signal


In [None]:
def smooth_signal(signal, window_size=5):
    """
    Smoothens the signal using a moving average.

    Parameters:
    - signal: numpy array of shape (n_timesteps, n_channels)
    - window_size: size of the moving average window

    Returns:
    - smoothed_signal: numpy array of the same shape as input
    """
    smoothed_signal = np.zeros_like(signal)
    for i in range(signal.shape[1]):  # Iterate over channels
        smoothed_signal[:, i] = np.convolve(signal[:, i], 
                                            np.ones(window_size)/window_size, 
                                            mode='same')
    return smoothed_signal

In [None]:
from tslearn.metrics import dtw_path_from_metric

eps = 4e-3
n=1


#normalize the signal
x = (y_gt - y_gt.mean(axis=0)) / (y_gt.std(axis=0) + 1e-6)
y = (y_dec_14 - y_dec_14.mean(axis=0)) / (y_dec_14.std(axis=0) + 1e-6)

x = gaussian_smooth_signal(x, sigma=3) # + np.random.uniform(-eps, eps, (x.shape[0], 1))
y = gaussian_smooth_signal(y, sigma=3) #+ np.random.uniform(-eps, eps, (y.shape[0], 1))

x = np.diff(x, axis=0, n=n)[:,:14]
y = np.diff(y, axis=0, n=n)[:,:14]

#x *= local_smoothness_gradient(x, window=11)
#y *= local_smoothness_gradient(y, window=11)
#
#x = np.sign(x) * (np.abs(x))
#y = np.sign(y) * (np.abs(y))
#
#ord=2
#distance_matrix = np.linalg.norm(x[:, None, :] - y[None, :, :], axis=2, ord=ord)

#path_gt_enc, distance_gt_enc = dtw_path_from_metric(distance_matrix, metric='precomputed', global_constraint="itakura", itakura_max_slope=1.3)
#path_gt_dec, distance_gt_dec = dtw_path(x, y, global_constraint="sakoe_chiba", sakoe_chiba_radius=10)
path_gt_dec, distance_gt_dec = dtw_path_from_metric(x, y, metric='cosine', global_constraint="sakoe_chiba", sakoe_chiba_radius=10)

fig, ax = plt.subplots(1,2, figsize=(10, 5))

ax[0].plot(x[:, 0], label="x")
ax[0].plot(y[:, 0], label="y_dec_14")

y_gt_adapted = x[[e[0] for e in path_gt_dec]]
y_dec_14_adapted = signal_from_path(y, path_gt_dec)
ax[1].plot(y_gt_adapted[:,0], label='y_gt')
ax[1].plot(y_dec_14_adapted[:, 0], label='y_dec_14_adapted')
print("DTW distance (y_gt, y_dec_14):", distance_gt_dec)

In [None]:
y_dec_14_adapted = signal_from_path(y_dec_14, path_gt_dec)

y_gt_adapted = y_gt[[e[0] for e in path_gt_dec]]
art_feats = np.array([y_gt_adapted,
                      y_dec_14_adapted,])

plot_art_14(art_feats, title="y_enc, y_enc, y_gt")