In [None]:
%load_ext autoreload
%autoreload 2

### Load the model

In [None]:
device = "cpu"

import yaml
import os
cfg_path = "checkpoints/almtokenizer/config.yaml"

with open(cfg_path) as f:
    cfg = yaml.safe_load(f)

cfg["device"] = device
cfg["model"]["patchify_args"]["device"] = device
cfg["model"]["unpatchify_args"]["device"] = device

from src.utils import load_model_from_config

model = load_model_from_config(cfg)

# Find and load last epoch in the directory
dir = os.path.join(cfg["training"]["checkpoint_dir"], "model")
last_epoch = max([int(f.split("_")[1].removesuffix(".pth")) for f in os.listdir(dir) if f.startswith("epoch_")], default=0)
model.load_model(os.path.join(dir, f"epoch_{last_epoch}.pth"))
print(last_epoch)

model.eval()

### Sound Reconstruction

In [None]:
import torchaudio
import torch
import IPython.display as ipd
wav_path = "docs/audio/speech-female.wav"  # Replace with your wav file path
start_sec = 0

waveform, sr = torchaudio.load(wav_path, normalize=True)

waveform = torchaudio.functional.resample(
                waveform, orig_freq=sr, new_freq=24000
            )

a = torch.tensor(waveform)

# Convert a to mono keeping the channel dimension
a = a.mean(dim=0, keepdim=True)
x = a[None, :]
print(x.shape)

ipd.display(ipd.Audio(waveform, rate=24000))

for w in [3, 6, 10]:
    model.window_size = w
    with torch.no_grad():
        audio = x.to(device)
    reconstructed = model(audio)["x_hat"]

    ipd.display(ipd.Audio(reconstructed.detach().cpu().numpy().flatten(), rate=24000))
    torchaudio.save(f"docs/audio/reconstructed_window_{w}.wav", torch.tensor(reconstructed.detach().cpu().numpy()).squeeze(0), 24000)
model.window_size = 3

### Filter the Database

In [None]:
import pandas as pd

json_df = pd.read_json("../good_sounds_dataset/sounds.json").T
mask3 = json_df["klass"] == "good-sound"

valid_subset = json_df[mask3]

takes = pd.read_json("../good_sounds_dataset/takes.json").T
db = valid_subset.merge(takes, left_on="id", right_on="sound_id")

db

In [None]:
from encodec import EncodecModel
import torch
from utils import process_good_sounds_dataset

device = "cpu"

encodec_latents, encodec_attributes, alm_latents, alm_attributes = process_good_sounds_dataset(db, model, device=device, trim=True)


In [None]:
import numpy as np
from utils import aggregate_latents

X, df = aggregate_latents(encodec_latents, encodec_attributes, alm_latents, alm_attributes)

### Define the labels

In [None]:
from utils import get_good_bad, generate_label_encoder

# Transform labels to integer indices
df["klass"] = df["klass"].apply(get_good_bad)
attributes = ["instrument", "note", "octave", "klass"]
label_encoders, num_labels = generate_label_encoder(df, attributes)

### Define subsets for projections

In [None]:
from utils import stratified_sample

indices = stratified_sample(
    df, 
    source_col="source",
    n=10000,
    group_cols=("instrument", "note", "octave"),
    random_state=123
)

df_subset = df.loc[indices]
x_subset = X.loc[indices]
num_labels_subset = num_labels.loc[indices]

In [None]:
from sklearn.decomposition import PCA
from utils import projection

plot_kwargs = {
    "xlabel": "PCA 1",
    "ylabel": "PCA 2",
}

attributes = ["instrument", "note", "octave"]

for method in x_subset.index.levels[0]:
    plot_kwargs["suptitles"] = f"{method}"
    x_grp = x_subset.loc[method]
    df_grp = df_subset.loc[method]
    fig = projection(x_grp, df_grp, attributes, PCA, plot_kwargs=plot_kwargs)
    fig.savefig(f"figs/{method}_pca.pdf", bbox_inches='tight')

In [None]:
from MulticoreTSNE import MulticoreTSNE as TSNE

plot_kwargs = {
    "xlabel": "t-SNE 1",
    "ylabel": "t-SNE 2",
}

for method in x_subset.index.levels[0]:
    plot_kwargs["suptitles"] = f"{method}"
    x_grp = x_subset.loc[method]
    df_grp = df_subset.loc[method]
    fig = projection(x_grp, df_grp, attributes, TSNE, proj_fn_kwargs={'n_jobs': -1}, plot_kwargs=plot_kwargs)
    fig.savefig(f"figs/{method}_tsne.pdf", bbox_inches='tight')

In [None]:
import matplotlib.pyplot as plt

from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA

plot_kwargs = {
    "xlabel": "LDA 1",
    "ylabel": "LDA 2",
}

attributes = ["instrument", "note", "octave"]
for method in x_subset.index.levels[0]:
    plot_kwargs["suptitles"] = f"{method}"
    x_grp = x_subset.loc[method]
    df_grp = df_subset.loc[method]
    fig = projection(x_grp, df_grp, attributes, LDA, y=df_grp[attributes], plot_kwargs=plot_kwargs)
    fig.savefig(f"figs/{method}_lda.pdf", bbox_inches='tight')


# Clustering

In [None]:
from sklearn.mixture import GaussianMixture
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score, homogeneity_score
def scan_k(x, range):
    aics = []
    for k in range:
        gmm = GaussianMixture(n_components=k, random_state=42).fit(x)
        aics.append(gmm.aic(x))
    best_k = list(range)[np.argmin(aics)]
    return aics, best_k

def compute_external_metrics(x, true_labels, k):
    gmm = GaussianMixture(n_components=k, random_state=42).fit(x)
    pred_labels = gmm.predict(x)
    return {
        "ARI": adjusted_rand_score(true_labels, pred_labels),
        "mutual_info": normalized_mutual_info_score(true_labels, pred_labels),
        "homogeneity": homogeneity_score(true_labels, pred_labels),
    }

from tqdm import trange

plot_kwargs = {
    "xlabel": "t-SNE 1",
    "ylabel": "t-SNE 2",
}

for method in ["EnCodec", "ALMTokenizer"]:
    range_k = trange(20, 60)

    # Find the best number of clusters
    aics, best_k = scan_k(x_subset.loc[method].sample(2000, random_state=42), range_k)
    plt.figure()
    plt.plot(list(range_k), aics)
    plt.xlabel("Number of clusters")
    plt.ylabel("AIC")
    plt.axvline(best_k, color='r', linestyle='--')
    plt.title(f"Model Selection: {method}")
    plt.tight_layout()
    plt.savefig(f"figs/{method}_k_scan.pdf", bbox_inches='tight')
    plt.show()

    # Cluster with the optimal number of clusters
    gmm = GaussianMixture(n_components=best_k, random_state=42)
    clusters = gmm.fit_predict(x_subset.loc[method])



    cluster_df = df_subset.loc[method].copy()
    cluster_df["cluster"] = clusters
    attributes = ["cluster"]

    fig = projection(x_subset.loc[method], cluster_df, attributes, proj_fn=TSNE, proj_fn_kwargs={'n_jobs': -1}, plot_kwargs=plot_kwargs)
    fig.savefig(f"figs/{method}_clusters.pdf", bbox_inches='tight')
    
    three_attr = df_subset.loc[method, ["instrument", "note", "octave"]].astype(str).agg("_".join, axis=1)
    metrics = compute_external_metrics(x_subset.loc[method], three_attr, best_k)
    print(f"External metrics for {method}:")
    for metric, value in metrics.items():
        print(f"  {metric}: {value:.4f}")


# Linear Separability Tests

In [None]:
from sklearn.svm import SVC

from sklearn.model_selection import train_test_split
from sklearn.utils._testing import ignore_warnings
from sklearn.exceptions import ConvergenceWarning

@ignore_warnings(category=ConvergenceWarning)
def linear_separability_test(label_encoders, x, num_labels, **kwargs):
    for key in attributes:
        X_train, X_test, y_train, y_test = train_test_split(x, num_labels[key], test_size=0.2, random_state=42, stratify=num_labels[key])
        svm = SVC(**kwargs)
        svm.fit(X_train, y_train)
        accuracy = svm.score(X_test, y_test)
        print(f"SVC accuracy for {key}: {accuracy:.2f}")

svc_kwargs = {'kernel': 'linear', 'C': 1e6, 'random_state': 42, 'max_iter': 1e5}
attributes = ["instrument", "note", "octave"]
for method in ["EnCodec", "ALMTokenizer"]:
    print(f"Testing linear separability for {method}")
    linear_separability_test(attributes, x_subset.loc[method], num_labels_subset.loc[method], **svc_kwargs)
    print()

# Classification

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import precision_score

def classification_test(attributes, x, num_labels, classifier, **classifier_kwargs):
    for key in attributes:
        X_train, X_test, y_train, y_test = train_test_split(x, num_labels[key], test_size=0.2, random_state=42, stratify=num_labels[key])
        clf = classifier(**classifier_kwargs)
        clf.fit(X_train, y_train)
        y_pred = clf.predict(X_test)
        accuracy = clf.score(X_test, y_test)
        precision = precision_score(y_test, y_pred, average='macro')

        print(f"Classifier accuracy for {key}: {accuracy:.2f}")
        print(f"Classifier precision for {key}: {precision:.2f}")

rf_kwargs = {'n_estimators': 100, 'random_state': 42, 'n_jobs': -1}
attributes = ["instrument", "note", "octave"]

for method in ["EnCodec", "ALMTokenizer"]:
    print(f"Testing classification for {method}")
    classification_test(attributes, x_subset.loc[method], num_labels_subset.loc[method], RandomForestClassifier, **rf_kwargs)
    print()

# Interpolation test

In [None]:
from utils import process_good_sounds_dataset, aggregate_latents

mask1 = pd.notnull(json_df[["sustain", "decay", "attack"]]).sum(axis=1).astype(bool)
mask2 = pd.notnull(json_df[["release", "offset"]]).sum(axis=1).astype(bool)
mask3 = json_df["klass"] == "good-sound"

valid_subset = json_df[mask1 & mask2 & mask3]
db = valid_subset.merge(takes, left_on="id", right_on="sound_id")

encodec_latents, encodec_attributes, alm_latents, alm_attributes = process_good_sounds_dataset(db, model, device=device, trim=True)
X_synth, df_synth = aggregate_latents(encodec_latents, encodec_attributes, alm_latents, alm_attributes)
df_synth.octave = df_synth.octave.astype(str)

In [None]:

import torch
from collections import defaultdict
from utils import interpolate_latent, create_vectors


trajectory1 = {
    "instrument": ["flute", "flute", "flute"],
    "note": ["C", "E", "G"],
    "octave": ["5", "5", "5"],
}

trajectory2 = {
    "instrument": ["clarinet", "trumpet"],
    "note": ["G", "G"],
    "octave": ["5", "5"],
}

trajectory3 = {
    "instrument": ["clarinet", "flute"],
    "note": ["A", "A"],
    "octave": ["5", "6"],
}



from encodec import EncodecModel
encodec_dec = EncodecModel.encodec_model_24khz().decoder.to(device)

enc_n_latents = 750
alm_n_latents = enc_n_latents // (model.window_size)

audios = defaultdict(dict)

trajectories = [trajectory1, trajectory2, trajectory3]

with torch.no_grad():
    for i, trajectory in enumerate(trajectories):
        # Compute EnCodec audio
        vector_list = create_vectors(trajectory, df_synth.loc["EnCodec"], X_synth.loc["EnCodec"])
        print([np.isnan(v).any().item() for v in vector_list])
        seq = interpolate_latent(vector_list, n=enc_n_latents)
        seq = seq.unsqueeze(0)
        seq = torch.tensor(seq, dtype=torch.float32).to(device)
        seq = seq.permute(0, 2, 1)
        x_hat = encodec_dec(seq).flatten()
        audios["EnCodec"][i] = x_hat.cpu().detach().numpy()

        # Compute ALMTokenizer audio
        vector_list = create_vectors(trajectory, df_synth.loc["ALMTokenizer"], X_synth.loc["ALMTokenizer"])
        seq = interpolate_latent(vector_list, n=alm_n_latents)
        seq = seq.unsqueeze(0)
        seq = torch.tensor(seq).to(model.device)
        x_hat = model.decode(seq).flatten()
        audios["ALMTokenizer"][i] = x_hat.cpu().detach().numpy()


In [None]:
from IPython.display import Audio, display
from torchaudio.transforms import MelSpectrogram
import matplotlib.pyplot as plt
import torchaudio

mel_transform = MelSpectrogram()

for method, audiodict in audios.items():
    for i, audio in audiodict.items():
        print(f"Playing audio {i} for {method}:")
        display(Audio(audio, autoplay=True, rate=24000))
        # Save audio
        torchaudio.save(f"docs/audio/{method}_trajectory_{i}.wav", torch.tensor(audio).unsqueeze(0), 24000)
        spec = mel_transform(torch.tensor(audio))
        spec = spec.squeeze(1)
        plt.imshow(spec.detach().cpu(), aspect='auto', origin='lower')
        plt.show()

# Timbre Transfer

In [None]:
import os
import torchaudio
from utils import timbre_transfer


move_to = {
    "instrument": ["trumpet"],
    "note": ["A"],

    }

before_encodec, wav_encodec, before_alm, wav_alm = timbre_transfer(model, os.path.join("docs/audio", "speech-male.wav"), move_to, X_synth, df_synth, device=device)
torchaudio.save("docs/audio/before_encodec_1.wav", before_encodec.cpu(), 24000)
torchaudio.save("docs/audio/after_encodec_1.wav", wav_encodec.cpu(), 24000)
torchaudio.save("docs/audio/before_alm_1.wav", before_alm.cpu(), 24000)
torchaudio.save("docs/audio/after_alm_1.wav", wav_alm.cpu(), 24000)

before_encodec, wav_encodec, before_alm, wav_alm = timbre_transfer(model, os.path.join("docs/audio", "speech-female.wav"), move_to, X_synth, df_synth, device=device)
torchaudio.save("docs/audio/before_encodec_2.wav", before_encodec.cpu(), 24000)
torchaudio.save("docs/audio/after_encodec_2.wav", wav_encodec.cpu(), 24000)
torchaudio.save("docs/audio/before_alm_2.wav", before_alm.cpu(), 24000)
torchaudio.save("docs/audio/after_alm_2.wav", wav_alm.cpu(), 24000)