In [17]:
%load_ext autoreload
%autoreload 2

import sys
import os

# Add the parent directory to the Python path
parent_dir = os.path.abspath('../..')
if parent_dir not in sys.path:
    sys.path.append(parent_dir)
    
# Import modules
import ddsp_textures.loss.functions
import ddsp_textures.auxiliar.filterbanks

# Import extra packages
import numpy as np
import librosa
import matplotlib.pyplot as plt
from IPython.display import Audio
import torch
import torchaudio

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [18]:
# Load audio to filter -----------------------------------
fire_path  = ".."+"/sounds/fire_sounds/fire.wav"
water_path = ".."+"/sounds/water_sounds/water.wav"
sr     = 44100
new_sr = sr // 4 # for log_bank
fire_audio, _  = librosa.load(fire_path, sr=sr)
water_audio, _ = librosa.load(water_path, sr=sr)
# Make list of segments for fire and water --------------
frame_size     = 2**15
new_frame_size = frame_size // 4
hop_size   = 2**13
fire_segments = []
water_segments = []
for i in range(0, len(fire_audio)-frame_size, hop_size):
    fire_segments.append(fire_audio[i:i+frame_size])
for i in range(0, len(water_audio)-frame_size, hop_size):
    water_segments.append(water_audio[i:i+frame_size])
# Initialize erb_bank and log_bank for statistics loss --
N_filter_bank = 16
M_filter_bank = 6
erb_bank    = ddsp_textures.auxiliar.filterbanks.EqualRectangularBandwidth(frame_size, sr, N_filter_bank, 20, sr // 2)
log_bank    = ddsp_textures.auxiliar.filterbanks.Logarithmic(new_frame_size,       new_sr, M_filter_bank, 10, new_sr // 4)
downsampler = torchaudio.transforms.Resample(sr, new_sr)

# Statistics computation 1

In [19]:
#Testing Statistics functions --------------------------------------------------

fire_segment  = fire_segments[10]
fire_segment  = torch.tensor(fire_segment)
fire_stats  = ddsp_textures.loss.functions.statistics(fire_segment,  N_filter_bank, M_filter_bank, erb_bank, log_bank, downsampler)
print("Fire statistics:\n")
print("stats_11:\n", fire_stats[0])
print("stats_12:\n", fire_stats[1])
print("stats_13:\n", fire_stats[2])
print("stats_14:\n", fire_stats[3])
print("stats_2:\n", fire_stats[4])
print("stats_3:\n", fire_stats[5])
print("stats_4:\n", fire_stats[6])
print("stats_5:\n", fire_stats[7])

Fire statistics:

stats_11:
 tensor([0.0537, 0.0155, 0.0042, 0.0024, 0.0028, 0.0022, 0.0019, 0.0011, 0.0010,
        0.0008, 0.0007, 0.0008, 0.0010, 0.0012, 0.0014, 0.0007])
stats_12:
 tensor([ 0.2743,  0.2284,  0.3165,  0.7013,  1.4983,  3.3608,  9.8653,  6.7365,
         9.5678,  6.0444,  5.7156, 12.7234, 13.8057, 13.9685, 21.7117, 17.8238])
stats_13:
 tensor([ 0.7110,  0.2236,  0.8583,  2.0186,  4.2870,  7.1669, 10.7351, 11.8428,
        11.9904,  9.9097, 11.2044, 14.5283, 16.0173, 15.4713, 18.1143, 13.7290])
stats_14:
 tensor([  3.3320,   2.6130,   3.1892,   7.4373,  26.9978,  66.9166, 133.0751,
        182.5117, 172.2676, 123.7882, 169.5770, 257.0317, 325.7110, 293.0002,
        396.8578, 224.5481])
stats_2:
 tensor([ 0.1724,  0.1014,  0.1575,  0.0959,  0.0071, -0.0457, -0.0205, -0.0197,
        -0.0229, -0.0102, -0.0342, -0.0282, -0.0242, -0.0341, -0.0567,  0.1012,
        -0.0046, -0.0594, -0.0631, -0.0908, -0.0905, -0.1060, -0.1239, -0.1213,
        -0.1094, -0.1098, -0.1003, -

In [20]:
import time

random_seed_1 = np.random.randint(0, min(len(fire_segments),len(water_segments)), 32)
random_seed_2 = np.random.randint(0, min(len(fire_segments),len(water_segments)), 32)
random_seed_3 = np.random.randint(0, min(len(fire_segments),len(water_segments)), 32)

def experiment_1(seed):
    #Take 32 segments and make a batch
    fire_batch = []
    for i in range(32):
        fire_segment = fire_segments[seed[i]]
        fire_segment = torch.tensor(fire_segment)
        fire_batch.append(fire_segment)
    fire_batch = torch.stack(fire_batch, dim=0)

    water_batch = []
    for i in range(32):
        water_segment = water_segments[seed[i]]
        water_segment = torch.tensor(water_segment)
        water_batch.append(water_segment)
    water_batch = torch.stack(water_batch, dim=0)

    start = time.time()
    loss = ddsp_textures.loss.functions.batch_statistics_loss(fire_batch, water_batch, N_filter_bank, M_filter_bank, erb_bank, log_bank, downsampler)
    end = time.time()
    print("Time taken: ", end-start)
    print("Batch loss value: ", loss)

experiment_1(random_seed_1)
experiment_1(random_seed_2)
experiment_1(random_seed_3)

Time taken:  1.7373864650726318
Batch loss value:  tensor(3.9055)
Time taken:  1.7523078918457031
Batch loss value:  tensor(3.6720)
Time taken:  1.7207884788513184
Batch loss value:  tensor(3.3965)


# Statistics computation 2

In [21]:
def correlation_coefficient_2(tensor1, tensor2):
    mean1 = tensor1.mean(dim=-1, keepdim=True)
    mean2 = tensor2.mean(dim=-1, keepdim=True)
    
    tensor1 = tensor1 - mean1
    tensor2 = tensor2 - mean2
    
    std1 = tensor1.norm(dim=-1) / (tensor1.shape[-1] ** 0.5)  # Equivalent to std but avoids computing mean again
    std2 = tensor2.norm(dim=-1) / (tensor2.shape[-1] ** 0.5)
    
    corr = (tensor1 * tensor2).mean(dim=-1) / (std1 * std2 + 1e-8)
    return corr

def compute_statistics_2(signals, N_filter_bank, M_filter_bank, erb_bank, log_bank, downsampler):
    device = signals.device
    if signals.dim() == 1:  # Single signal case
        signals = signals.unsqueeze(0)  # Add batch dimension (1, Size)
        was_single = True
    else:
        was_single = False
    batch_size = signals.shape[0]

    erb_subbands = erb_bank.generate_subbands(signals)[:, 1:-1, :]

    N_filter_bank = erb_subbands.shape[1]

    env_subbands = torch.abs(ddsp_textures.auxiliar.seeds.hilbert(erb_subbands))
    env_subbands_downsampled = downsampler(env_subbands.float())
    length_downsampled       = env_subbands_downsampled.shape[-1]

    subenvelopes = torch.zeros((batch_size, N_filter_bank, M_filter_bank, length_downsampled), device=device)
    for i in range(N_filter_bank):
        banda     = env_subbands_downsampled[:, i, :]
        subbandas = log_bank.generate_subbands(banda)[:, 1:-1, :]
        subenvelopes[:, i, :, :] = subbandas

    mu = env_subbands.mean(dim=-1)
    sigma = env_subbands.std(dim=-1)
    # print("sigma ** 4:", sigma ** 4)

    stats_11 = mu
    stats_12 = (sigma ** 2) / (mu ** 2)
    normalized_env_subbands = (env_subbands - mu.unsqueeze(-1))
    stats_13 = (normalized_env_subbands ** 3).mean(dim=-1) / (sigma ** 3)
    stats_14 = (normalized_env_subbands ** 4).mean(dim=-1) / (sigma ** 4)

    corr_pairs = torch.triu_indices(N_filter_bank, N_filter_bank, 1)
    stats_2 = correlation_coefficient_2(env_subbands[:, corr_pairs[0]], env_subbands[:, corr_pairs[1]])

    subenv_sigma = subenvelopes.std(dim=-1)
    stats_3 = (subenv_sigma / (env_subbands_downsampled.std(dim=-1, keepdim=True))).reshape(-1)

    cross_corr_across_subbands = correlation_coefficient_2(subenvelopes[:, None, :, :, :], subenvelopes[:, :, None, :, :])
    stats_4 = cross_corr_across_subbands[:, torch.triu_indices(N_filter_bank, N_filter_bank, 1)[0], torch.triu_indices(N_filter_bank, N_filter_bank, 1)[1]].reshape(-1)

    cross_corr_subenvs = correlation_coefficient_2(subenvelopes[:, :, None, :, :], subenvelopes[:, :, :, None, :])
    stats_5 = cross_corr_subenvs[:, :, torch.triu_indices(M_filter_bank, M_filter_bank, 1)[0], torch.triu_indices(M_filter_bank, M_filter_bank, 1)[1]].reshape(-1)

    return [stats_11, stats_12, stats_13, stats_14, stats_2, stats_3, stats_4, stats_5]

def batch_statistics_loss_2(original_signals, reconstructed_signals, N_filter_bank, M_filter_bank, erb_bank, log_bank, downsampler, alpha=torch.tensor([0.3, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1])):
    original_stats      = compute_statistics_2(original_signals, N_filter_bank, M_filter_bank, erb_bank, log_bank, downsampler)
    reconstructed_stats = compute_statistics_2(reconstructed_signals, N_filter_bank, M_filter_bank, erb_bank, log_bank, downsampler)

    losses = [torch.sqrt((o - r).pow(2).sum(dim=-1)) / o.shape[-1] for o, r in zip(original_stats, reconstructed_stats)]
    losses = torch.stack([l.mean() for l in losses])

    return (losses * alpha.to(losses.device)).sum()


In [22]:
#Testing Statistics functions --------------------------------------------------

fire_segment  = fire_segments[10]
fire_segment  = torch.tensor(fire_segment)
fire_stats    = compute_statistics_2(fire_segment,  N_filter_bank, M_filter_bank, erb_bank, log_bank, downsampler)
print("Fire statistics:\n")
print("stats_11:\n", fire_stats[0])
print("stats_12:\n", fire_stats[1])
print("stats_13:\n", fire_stats[2])
print("stats_14:\n", fire_stats[3])
print("stats_2:\n", fire_stats[4])
print("stats_3:\n", fire_stats[5])
print("stats_4:\n", fire_stats[6])
print("stats_5:\n", fire_stats[7])

Fire statistics:

stats_11:
 tensor([[0.0537, 0.0155, 0.0042, 0.0024, 0.0028, 0.0022, 0.0019, 0.0011, 0.0010,
         0.0008, 0.0007, 0.0008, 0.0010, 0.0012, 0.0014, 0.0007]])
stats_12:
 tensor([[ 0.2743,  0.2284,  0.3165,  0.7013,  1.4983,  3.3608,  9.8653,  6.7365,
          9.5678,  6.0444,  5.7156, 12.7234, 13.8057, 13.9685, 21.7117, 17.8238]])
stats_13:
 tensor([[ 0.7110,  0.2236,  0.8583,  2.0186,  4.2870,  7.1669, 10.7351, 11.8428,
         11.9904,  9.9097, 11.2044, 14.5283, 16.0173, 15.4713, 18.1143, 13.7290]])
stats_14:
 tensor([[  3.3320,   2.6130,   3.1892,   7.4373,  26.9978,  66.9166, 133.0751,
         182.5117, 172.2676, 123.7882, 169.5770, 257.0317, 325.7110, 293.0002,
         396.8578, 224.5481]])
stats_2:
 tensor([[ 0.1724,  0.1014,  0.1575,  0.0959,  0.0071, -0.0457, -0.0205, -0.0197,
         -0.0229, -0.0102, -0.0342, -0.0282, -0.0242, -0.0341, -0.0567,  0.1012,
         -0.0046, -0.0594, -0.0631, -0.0908, -0.0905, -0.1060, -0.1239, -0.1213,
         -0.1094, -0

In [23]:
def experiment_2(seed):
    #Take 32 segments and make a batch
    fire_batch = []
    for i in range(32):
        fire_segment = fire_segments[seed[i]]
        fire_segment = torch.tensor(fire_segment)
        fire_batch.append(fire_segment)
    fire_batch = torch.stack(fire_batch, dim=0)

    water_batch = []
    for i in range(32):
        water_segment = fire_segments[seed[i]-1]
        # water_segment = water_segments[seed[i]]
        water_segment = torch.tensor(water_segment)
        water_batch.append(water_segment)
    water_batch = torch.stack(water_batch, dim=0)

    start = time.time()
    loss = batch_statistics_loss_2(fire_batch, water_batch, N_filter_bank, M_filter_bank, erb_bank, log_bank, downsampler)
    end = time.time()
    print("Time taken: ", end-start)
    print("Batch loss value: ", loss)

experiment_2(random_seed_1)
experiment_2(random_seed_2)
experiment_2(random_seed_3)

Time taken:  1.733560562133789
Batch loss value:  tensor(2.1289)
Time taken:  1.7220184803009033
Batch loss value:  tensor(2.3315)
Time taken:  1.7147324085235596
Batch loss value:  tensor(1.5985)


In [24]:
def experiment_3(seed):
    #Take 32 segments and make a batch
    fire_batch = []
    for i in range(32):
        fire_segment = fire_segments[seed[i]]
        fire_segment = torch.tensor(fire_segment)
        fire_batch.append(fire_segment)
    fire_batch = torch.stack(fire_batch, dim=0)

    water_batch = []
    for i in range(32):
        water_segment = water_segments[seed[i]]
        water_segment = torch.tensor(water_segment)
        water_batch.append(water_segment)
    water_batch = torch.stack(water_batch, dim=0)

    start = time.time()
    loss = ddsp_textures.loss.functions.multiscale_spectrogram_loss(fire_batch, water_batch)
    end = time.time()
    print("Time taken: ", end-start)
    print("Batch loss value: ", loss)

experiment_3(random_seed_1)
experiment_3(random_seed_2)
experiment_3(random_seed_3)

Time taken:  0.10750055313110352
Batch loss value:  tensor(14.1983)
Time taken:  0.10199260711669922
Batch loss value:  tensor(13.8998)
Time taken:  0.09697246551513672
Batch loss value:  tensor(14.6349)
