In [2]:
# Imports
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "2"
import librosa
import sys
import torch
import numpy as np
from sparc import load_model
from denoising_diffusion_pytorch import Trainer1D, Unet1D, GaussianDiffusion1D
import IPython.display as ipd
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
torch.cuda.empty_cache()

# Print cuda memory
# print(torch.cuda.memory_summary())

#986_129388_000015_000001.npy
#986_129388_000019_000001.npy
#986_129388_000022_000001.npy
#986_129388_000026_000001.npy
#986_129388_000029_000001.npy
#986_129388_000036_000004.npy
#986_129388_000059_000000.npy
#986_129388_000060_000005.npy

# Paths
sys.path.append('/home/dagarwal/Speech-Articulatory-Coding')
accent_file = '/data/all_data/VCTK/VCTK_MFA_16k/p225/p225_004.wav'
accent_file_2 = '/data/all_data/VCTK/VCTK_MFA_16k/p279/p279_168.wav'
accent_file_txt = '/data/all_data/VCTK/VCTK_MFA_16k/p225/p225_004.txt'
sample_file = '/data/common/LibriTTS_R/articulatory_features/986_129388_000060_000005.npy'
checkpoint_path = '/home/pmendoza/denoising-diffusion-pytorch/results/model-50.pt'
pitch_data_file = '/data/common/LibriTTS_R/pitch_stats.npy'
speaker_embedding_file = '/home/pmendoza/Speech-Articulatory-Coding/sample_audio/sample1.wav'
denoised_sample_path = "/home/pmendoza/denoising-diffusion-pytorch/results/sample-1.npy"

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device", device)

Using device cuda


In [None]:
# Helpers
# Function to preprocess a single file
def preprocess_data(data, file_pitch_stats):
    ema_data = data['ema']
    pitch_data = (np.log(data['pitch']) - np.log(file_pitch_stats[0]))[:-1].reshape(-1, 1)
    loudness_data = data['loudness'][:-1].reshape(-1, 1)[:ema_data.shape[0]]
    if ema_data.shape[0] == pitch_data.shape[0] == loudness_data.shape[0]:
        combined_data = np.concatenate([ema_data, pitch_data, loudness_data], axis=1)
        combined_data = combined_data[:, :, np.newaxis]
        return torch.from_numpy(combined_data).float()
    else:
        raise ValueError("Shape mismatch in data components")

# Preprocessing function for Libri-TTSR
def preprocess_articulatory_features(file_path, pitch_stats_data):
    file_prefix = os.path.basename(file_path).split("_")[0]
    file_pitch_stats = pitch_stats_data[file_prefix]

    data = np.load(file_path, allow_pickle=True).item()
    return preprocess_data(data, file_pitch_stats)

# Contains means/ std. dev. for currently processed samples in VCTK dataset
accented_speech_means = {'p279': [131.92659509230742, 23.73141518322258]}

#Preprocessing function for VCTK
def preprocess_audio_file(audio_file, pitch_stats_data):
    file_prefix = os.path.basename(audio_file).split("_")[0].split("/")[-1]
    file_pitch_stats = pitch_stats_data[file_prefix]
    coder = load_model("en", device="cpu")
    encoded_wav = encode_wav(coder, audio_file)
    return preprocess_data(encoded_wav, file_pitch_stats)
    
# Synthesis function
def perform_synthesis(coder, combined_data, speaker_embedding, ema_data, pitch_data, loudness_data, pitch_stats_data):
    file_prefix = os.path.basename(sample_file).split("_")[0]
    file_pitch_stats = pitch_stats_data[file_prefix]
    denormalized_pitch = np.exp(pitch_data + np.log(file_pitch_stats[0]))
    code_dict = {
        "ema": ema_data,
        "pitch": denormalized_pitch,
        "loudness": loudness_data,
        "spk_emb": speaker_embedding
    }
    wav = coder.decode(**code_dict)
    return wav

# Encode
def encode_wav(coder, wav_file_path):
    # Load the .wav file
    audio, _ = librosa.load(wav_file_path, sr=None) 
    return coder.encode(audio)

def decode_wav(coder, encoded):
    code_dict = {
        "ema": encoded['ema'],
        "pitch": encoded['pitch'],
        "loudness": encoded['loudness'],
        "spk_emb": encoded['spk_emb']
    }
    wav = coder.decode(**code_dict)
    return wav

# Retrieves all wav files in a directory
def find_wav_files(folder_name):
    wav_files = []
    for root, _, files in os.walk(folder_name):
        for file in files:
            if file.endswith(".wav"):
                wav_files.append(os.path.join(root, file))
    return wav_files

# Gets the pitch from a wav file
def get_audio_file_pitch(wav_file):
    coder = load_model("en", device="cpu")
    wav_file_pitch = encode_wav(coder, wav_file)['pitch'].flatten()
    print(wav_file)
    return wav_file_pitch

# Gets the pitch mean and standard deviation across multiple wav files in a directory
def get_pitch_mean_stddev(folder_name):
    wav_files = find_wav_files(folder_name)
    mean_total = 0
    m2 = 0
    total_entries = 0

    with ProcessPoolExecutor(max_workers=int(cpu_count() / 2)) as executor:
        futures = [executor.submit(get_audio_file_pitch, wav_file) for wav_file in wav_files]
        for future in futures:
            cur_pitches = future.result()
            # uses welford's algorithm for mean / std. dev calculation
            n = len(cur_pitches)
            total_entries += n
            delta = cur_pitches - mean_total
            mean_total += np.sum(delta) / total_entries
            delta2 = cur_pitches - mean_total
            m2 += np.sum(delta * delta2)

    variance = m2 / total_entries if total_entries > 1 else 0
    stddev = np.sqrt(variance)
    return mean_total, stddev

# Example usage
# mean, stddev = get_pitch_mean_stddev('/data/all_data/VCTK/VCTK_MFA_16k/p279')
# print(f"Mean: {mean}, Standard Deviation: {stddev}")

In [None]:
mean, stddev = get_pitch_mean_stddev('/data/all_data/VCTK/VCTK_MFA_16k/p225')
print(f"Mean: {mean}, Standard Deviation: {stddev}")

In [None]:
preprocess_audio_file(accent_file_2, accented_speech_means)

  ckpt = torch.load(ckpt)
  WeightNorm.apply(module, name, dim)


183 183 183


torch.Size([183, 14, 1])

In [None]:
# Load pitch stats
pitch_stats_data = np.load(pitch_data_file, allow_pickle=True).item()

coder = load_model("en", device="cpu")
# encoded = encode_wav(coder, accent_file)
# combined_data = np.concatenate([encoded['ema'], np.log(encoded['pitch']), np.delete(encoded['loudness'], -1)[:, np.newaxis]], axis=1)
# combined_data = combined_data[:, :, np.newaxis]
# new_sample = torch.from_numpy(combined_data).float()

new_samples = []
new_sample = preprocess_articulatory_features(sample_file, pitch_stats_data)



num_chunks = (new_sample.shape[0] - 128) // 64 + 2  # Calculate total chunks (including overlapping parts)
for i in range(num_chunks):
    start = i * 64
    end = start + 64
    chunk = new_sample[start:end].permute(2, 1, 0).to(device)
    new_samples.append(chunk)

new_sample = torch.cat(new_samples, dim=-1)

# print(new_sample)

# Initialize model and diffusion (same configuration as training)
SEQUENCE_LENGTH = 64
SAMPLE_SIZE = 128
TIMESTEPS = 1000

model = Unet1D(
    seq_length = SEQUENCE_LENGTH,
    dim = 64,
    dim_mults = (1, 2, 4, 8),
    channels = 14,
)

diffusion = GaussianDiffusion1D(
    model,
    seq_length = SEQUENCE_LENGTH,
    timesteps = TIMESTEPS,
    objective = 'pred_x0',
    auto_normalize = False
)

print(checkpoint_path)
data = torch.load(checkpoint_path, weights_only=True)
diffusion.load_state_dict(data['model'])
diffusion.to(device)
diffusion.eval()

t = torch.randint(0, TIMESTEPS, (1,), device=device).long()

out = []
for sample in new_samples:
    t_ = t.clone()
    res = diffusion.model_predictions(sample, t_)
    out.append(res[1])

denoised_sample = torch.cat(out, dim=-1)

# Extract `ema`, `pitch`, and `loudness` and concatenate correctly
# ema_data = denoised_sample[:, :12, :].squeeze().cpu().numpy().transpose() #denoised_sample[:, :12, :].squeeze().cpu().numpy().transpose()
# pitch_data = denoised_sample[:, 12, :].squeeze().cpu().numpy() #denoised_sample[:, 12, :].squeeze().cpu().numpy()
# loudness_data = denoised_sample[:, 13, :].squeeze().cpu().numpy() #denoised_sample[:, 13, :].squeeze().cpu().numpy()

# Load SPARC model and encode speaker embedding
speaker_embedding = coder.encode(speaker_embedding_file)['spk_emb']

# Perform synthesis and display audio
synthesized_audio = perform_synthesis(coder, None, speaker_embedding, 
                                denoised_sample[:, :12, :].detach().squeeze().cpu().numpy().transpose(),
                                denoised_sample[:, 12, :].detach().squeeze().cpu().numpy(),
                                denoised_sample[:, 13, :].detach().squeeze().cpu().numpy(),
                                pitch_stats_data
                                )

clean_audio = perform_synthesis(coder, None, speaker_embedding, 
                                new_sample[:, :12, :].squeeze().cpu().numpy().transpose(),
                                new_sample[:, 12, :].squeeze().cpu().numpy(),
                                new_sample[:, 13, :].squeeze().cpu().numpy(),
                                pitch_stats_data
                                )

ipd.display(ipd.Audio(clean_audio, rate=coder.sr))
ipd.display(ipd.Audio(synthesized_audio, rate=coder.sr))

/home/pmendoza/denoising-diffusion-pytorch/results/model-50.pt


In [222]:
coder = load_model("en", device="cpu")
encoded = encode_wav(coder, accent_file)
decoded = decode_wav(coder, encoded)
ipd.display(ipd.Audio(decoded, rate=coder.sr))

  ckpt = torch.load(ckpt)
  WeightNorm.apply(module, name, dim)


In [230]:
np.delete(encoded['loudness'], -1)[:, np.newaxis].shape

(220, 1)