### Note:
Run Setup first, then restart and run all

# Setup

In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
%cd /content/drive/MyDrive/AdapExperiments/Performance/

Mounted at /content/drive
/content/drive/MyDrive/AdapExperiments/Performance


In [2]:
import altair

print(altair.__version__)
!pip install --upgrade altair anywidget -q

5.5.0
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m220.7/220.7 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m765.5/765.5 kB[0m [31m11.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m
[?25h

# Functions and Loadings

In [3]:
# @title Import Model
import torch
model1 = torch.jit.load(f'/content/drive/MyDrive/AdapExperiments/models/RAVE_MODELS/musicnet.ts')
model2 = torch.jit.load(f'/content/drive/MyDrive/AdapExperiments/models/RAVE_MODELS/voice_vocalset_b2048_r48000_z16.ts')

In [4]:
#@title Get Latent Space
import librosa
import numpy as np
import pandas as pd

def get_audio(path, model, sr=48000, samplingSize=None, normalize=False):
  audio, sr = librosa.load(path, sr=sr)

  if normalize:
    audio = librosa.util.normalize(audio)

  x = torch.from_numpy(audio)
  x = x[None, None, :]

  with torch.no_grad():
    z = model.encode(x)
    reconst = model(x).squeeze(0).detach().numpy()

  lat = z.squeeze(0).detach().numpy().T

  if samplingSize is None or samplingSize == 1:
    return x, z, reconst, lat

  # regroup per sample samples
  lat_df = pd.DataFrame(lat)
  lat_ss = lat_df.groupby(np.arange(len(lat_df)) // samplingSize).mean().values

  m = lat_ss.shape[0]
  n = int(np.ceil(audio.shape[0] / lat_ss.shape[0]))
  pads = m*n - audio.shape[0]

  reconstp = reconst.squeeze(0)

  if pads > 0:
    samples = np.pad(audio.astype(float), (0, pads), mode='constant', constant_values=0)
    reconstp = np.pad(reconstp.astype(float), (0, pads + (audio.shape[0] - reconstp.shape[0])), mode='constant', constant_values=0)
  else:
    samples = audio[:pads]
    reconstp = reconstp[:pads]

  samples = np.reshape(np.asarray(samples), (m,n))
  rsamples = np.reshape(np.asarray(reconstp), (m,n))

  return samples, z, rsamples, lat_ss

In [5]:
#@title Functions
import itertools
import librosa
import numpy as np
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE, MDS, Isomap, LocallyLinearEmbedding

import altair as alt
from altair import datum
print(alt.__version__) # above 5.0

def get_dimensionality_reduction(algorithm, latent_spaces, n_components=2):
    if algorithm == 'pca':
        from sklearn.preprocessing import StandardScaler
        pca = PCA(n_components=n_components)
        predictions = pca.fit_transform(
            StandardScaler().fit_transform(latent_spaces))
    elif algorithm == 'tsne':
        tsne = TSNE(n_components=n_components, perplexity=30 if len(latent_spaces) > 30 else len(latent_spaces)-1)
        predictions = tsne.fit_transform(np.asarray(latent_spaces))
    elif algorithm == 'mds':
        mds = MDS(n_components=n_components, normalized_stress="auto", max_iter=100)
        predictions = mds.fit_transform(latent_spaces)
    elif algorithm == 'isomap':
        iso = Isomap(n_components=n_components)
        predictions = iso.fit_transform(latent_spaces)
    elif algorithm == 'lle':
        lle = LocallyLinearEmbedding(n_components=n_components)
        predictions = lle.fit_transform(latent_spaces)
    else:
        import umap
        reducer = umap.UMAP(n_components=n_components)
        predictions = reducer.fit_transform(latent_spaces)
    return predictions

def get_vegas_data(predictions, labels, radius=.01):
  source = {
      'source': ['_'.join(l.split('_')[0:-1]) for l in labels],
      'stype': [l.split('_')[0] for l in labels],
      'id_s': [l.split('_')[-1] for l in labels],
      'id_s_o': [int(l.split('_')[-1]) for l in labels],
  }
  if len(predictions.shape) == 1:
      source['x'] = [int(l.split('_')[-1]) for l in labels]
      source['y'] = predictions
  else:
      source['x'] = predictions[:, 0]
      source['y'] = predictions[:, 1]

  return pd.DataFrame(source)

def get_plot2(latsS, lats0, lats, dimensions=[0, 1], algorithm='lle'):
  if len(latsS) == 0:
    all_elements = np.vstack(tuple([lats0] + list(lats.values())))
  else:
    all_elements = np.vstack(tuple([latsS, lats0] + list(lats.values())))
  #print(all_elements.shape)

  if len(dimensions) > 2:
      points = get_dimensionality_reduction(algorithm, all_elements[:, dimensions], 2)
  elif len(dimensions) == 2:
      points = all_elements[:, dimensions]
  elif len(dimensions) == 1:
      points = all_elements[:, dimensions[0]]
  else:
      points = get_dimensionality_reduction(algorithm, all_elements, 2)

  def flatten_comprehension(matrix):
    return [item for row in matrix for item in row]

  return get_vegas_data(points, labels=[f'Score_{i}' for i in list(range(len(latsS)))]
                        + [f'Original_{i}' for i in list(range(len(lats0)))] +
   flatten_comprehension([[f'{n}_{i}' for i in list(range(len(l)))] for n, l in lats.items()]))

def get_plot_all_equal(lats, dimensions=[0, 1], algorithm='lle'):
  all_elements = np.vstack(list(lats.values()))
  print(all_elements.shape)

  if len(dimensions) > 2:
      points = get_dimensionality_reduction(algorithm, all_elements[:, dimensions], 2)
  elif len(dimensions) == 2:
      points = all_elements[:, dimensions]
  elif len(dimensions) == 1:
      points = all_elements[:, dimensions[0]]
  else:
      points = get_dimensionality_reduction(algorithm, all_elements, 2)

  def flatten_comprehension(matrix):
    return [item for row in matrix for item in row]

  return get_vegas_data(points, labels=flatten_comprehension([[f'{n}_{i}' for i in list(range(len(l)))] for n, l in lats.items()]))


def plot_vegas_data(source, titleX="x", titleY="y"):

  search_input = alt.selection_point(
      fields=['id_s'],
      empty=False,  # Start with no points selected
      bind=alt.binding(
          input='search',
          placeholder='Grain',
          name='Search ',
      )
  )
  selection = alt.selection_interval(bind='scales')
  highlight = alt.selection_point(
    on="pointerover", fields=["source"], nearest=True
  )

  param_checkbox = alt.param(
      bind=alt.binding_checkbox(name='View Trajectories:'),
      name='Trajectories')
  score_grain_checkbox = alt.param(
      bind=alt.binding_checkbox(name='View Score Grains:'),
      value=True,
      name='ScoreGrain')

  chart = alt.Chart(source[source['source'] != 'Score']).encode(
    x=alt.X('x:Q', title=titleX),
    y=alt.Y('y:Q', title=titleY),
    color='stype:N',
    tooltip=['source:N', 'id_s:N']
  )
  points = chart.mark_circle().encode(
    opacity=alt.condition(
        search_input,
        alt.value(1),
        alt.value(0.3)
    )
  ).add_params(
      selection,
      highlight,
      search_input
  ).properties(
      name='Grains',width=800,height=500
  )

  # create marks for first and last points
  lines = chart.mark_line().encode(
      size=alt.condition(~highlight, alt.value(1), alt.value(2)),
      opacity=alt.condition(
        param_checkbox,
        alt.value(1),
        alt.value(0)
    ),
    strokeDash='source',
    order="id_s_o:Q",
  ).add_params(
      param_checkbox
  ).properties(
      name='Trajectories',width=800,height=500
  )
  circles = chart.mark_circle(size=60).encode(
      opacity=alt.condition(
        param_checkbox,
        alt.value(1),
        alt.value(0)
      )
  )
  arrows = chart.mark_point(shape='wedge', size=60).encode(
      opacity=alt.condition(
        param_checkbox,
        alt.value(1),
        alt.value(0)
      )
  )

  chartS = alt.Chart(source[source['source'] == 'Score']).encode(
    x=alt.X('x:Q', title=titleX),
    y=alt.Y('y:Q', title=titleY),
    color='source:N',
    tooltip=['source:N', 'id_s:N']
  )
  pointsS = chartS.mark_circle().encode(
    opacity=alt.condition(
        score_grain_checkbox,
        alt.value(.2),
        alt.value(0)
    )
  ).transform_filter(
      datum.source == 'Score'
  ).add_params(
      selection,
      score_grain_checkbox
  ).properties(
      name='Score',width=800,height=500
  )



  return pointsS, points, lines, circles, arrows


5.5.0


# Visualizations

In [None]:
#@title Show Graphs

import glob
import time
import tqdm.notebook as tqdm
import ipywidgets as widgets
from IPython.display import display, clear_output
from tqdm.notebook import tqdm
import numpy as np
import matplotlib.pyplot as plt

score_audio, original_audio, selected_audios, model = None, None, None, model2

model_selector = widgets.Select(options=['MusicNet', 'VocalSet'], description='Model', index=1)
experiment_selector = widgets.IntSlider(1, 1, 3, 1, description='Experiment')
phrase_selector = widgets.IntSlider(1, 1, 3, 1, description='Phrase')

opts = sorted([x for x in glob.glob(f'PhraseRecordings/Experiment1/phrase_1/*.wav') if 'original' not in x])
selector = widgets.SelectMultiple(options=opts,
                                  value=opts[0:1], description='Phrase Type',
                                  layout=widgets.Layout(width='75%', height='100px'),
                                  style={'button_color': 'red'},)
sample_size = widgets.IntSlider(4, 1, 12, 1.0, description='Spl Group')
dimension_selector = widgets.SelectMultiple(options=list(range(16)), description='Dimensions', value=list(range(16)))

startA = widgets.Button(description='Get Audios')
startb = widgets.Button(description='Start')

def on_change_exp(change):
    global selector
    phrase_selector.max = 4 if experiment_selector.value == 2 else 3
    exp_name = 'Experiment1' if experiment_selector.value == 1 else 'Experiment2' if experiment_selector.value == 2 else 'Experiment1-Multiple'
    opts = sorted([x for x in glob.glob(f'PhraseRecordings/{exp_name}/phrase_{phrase_selector.value}/*.wav') if 'original' not in x and 'electronics' not in x])
    selector.options = opts

def on_change(change):
    global selector
    exp_name = 'Experiment1' if experiment_selector.value == 1 else 'Experiment2' if experiment_selector.value == 2 else 'Experiment1-Multiple'
    opts = sorted([x for x in glob.glob(f'PhraseRecordings/{exp_name}/phrase_{phrase_selector.value}/*.wav') if 'original' not in x and 'electronics' not in x])
    selector.options = opts
    selector.value = opts[0:1]

def on_change_m(change):
    global model
    if model_selector.value == 'MusicNet':
        model = model1
    else:
        model = model2

    dimension_selector.options = list(range(16))

experiment_selector.observe(on_change_exp)
phrase_selector.observe(on_change)
model_selector.observe(on_change_m)

display(widgets.HBox([model_selector, dimension_selector]))
display(widgets.HBox([experiment_selector, phrase_selector]))
display(selector)
display(sample_size)
display(widgets.HBox([startA, startb]))

out1 = widgets.Output(layout={'border': '0px solid black', 'padding': '.5em', 'width': '90%'})

def on_startA(change):
    global score_audio, original_audio, selected_audios

    with out1:
      clear_output(wait=False)
      time.sleep(1)

      print(f'\nStarting latent space generation for Phrase {phrase_selector.value} with samplingSize {sample_size.value}')

      print('Getting original audios')
      if experiment_selector.value == 1:
          original_audio = get_audio(f'PhraseRecordings/Experiment{experiment_selector.value}/phrase_{phrase_selector.value}/phrase_{phrase_selector.value}_original.wav', model, samplingSize=sample_size.value)
          print('Original Phrase Done')
          score_audio = get_audio('Lamento-Sax-Complete-Audio/Lamento-Take1.wav', model, samplingSize=sample_size)
          print('Original Score Done')
      elif experiment_selector.value == 3:
          original_audio = get_audio(f'PhraseRecordings/Experiment1-Multiple/phrase_{phrase_selector.value}/OR_T02.wav', model, samplingSize=sample_size.value)
          print('Original Phrase Done')
          score_audio = [[],[],[],[]] #get_audio('Lamento-Sax-Complete-Audio/Lamento-Take3.wav', model, samplingSize=sample_size)
          print('Original Score Done')
      else:
          original_audio = get_audio(f'PhraseRecordings/Experiment{experiment_selector.value}/phrase_{phrase_selector.value}/phrase_{phrase_selector.value}_electronics.wav', model, samplingSize=sample_size.value)
          print('Original Phrase Done')
          score_audio = get_audio('Audios/Lamento_Villa_Rojo.mpeg', model, samplingSize=sample_size)
          print('Original Score Done')

      print(f'Getting selected audios')
      if experiment_selector.value == 3:
        selected_audios = {v.split('/')[-1][:-4]:get_audio(v, model, samplingSize=sample_size.value) for v in tqdm(selector.value)}
      else:
        selected_audios = {'_'.join(v.split('/')[-1].split('_')[2:])[:-4]:get_audio(v, model, samplingSize=sample_size.value) for v in tqdm(selector.value)}

def on_change2(change):
    global model, original_audio, selected_audios

    with out1:
        clear_output(wait=False)
        time.sleep(1)

        print(f'Generating latent space visualization for dimensions {dimension_selector.value}')

        source = get_plot2(score_audio[3], original_audio[3], {v:sa[3] for v, sa in selected_audios.items()}, dimension_selector.value, algorithm='tsne')
        print('Extracted Source')

        titleY = f"Latent Dimension {dimension_selector.value[0]}" if len(dimension_selector.value) == 1 else f"Latent Dimension {dimension_selector.value[1]}" if len(dimension_selector.value) == 2 else "t-SNE 2"
        titleX = f"Unit" if len(dimension_selector.value) == 1 else f"Latent Dimension {dimension_selector.value[0]}" if len(dimension_selector.value) == 2 else "t-SNE 1"
        scorep, points, lines, circleSt, arrowEnd = plot_vegas_data(source, titleX, titleY)
        print('Extracted Plot')

        max_R = max(source[source['source'] != 'Score']['id_s'].apply(func=lambda x: int(x)))
        jchart = alt.JupyterChart(alt.layer(points + lines + circleSt.transform_filter((datum.id_s_o == 0)) + arrowEnd.transform_filter((datum.id_s_o == max_R))))
        jchart.chart = jchart.chart.properties(width=800,height=500).interactive()

        rangeR = widgets.IntRangeSlider(value=[0,max_R], min=0, max=max_R)
        def on_change_range_R(change):
          m = rangeR.value[0]
          n = rangeR.value[1]

          if len(dimension_selector.value) == 1:
            jchart.chart =   (
                points.encode(
                x=alt.X('x:Q', title=titleX, scale=alt.Scale(domain=[m-1,n+1], nice=False))).transform_filter((datum.id_s_o >= m) & (datum.id_s_o <= n))
                + lines.transform_filter((datum.id_s_o >= m) & (datum.id_s_o <= n))
                ) + circleSt.transform_filter((datum.id_s_o == m)) + arrowEnd.transform_filter((datum.id_s_o == n))
          else:
            jchart.chart =   (
                points.transform_filter((datum.id_s_o >= m) & (datum.id_s_o <= n))
                + lines.transform_filter((datum.id_s_o >= m) & (datum.id_s_o <= n))
                ) + circleSt.transform_filter((datum.id_s_o == m)) + arrowEnd.transform_filter((datum.id_s_o == n))

          if len(dimension_selector.value) == 1:
            jchart.chart

          jchart.chart = jchart.chart.properties(width=800,height=500).interactive()

        rangeR.observe(on_change_range_R)
        display(rangeR)
        time.sleep(1)
        display(jchart)

startA.on_click(on_startA)
startb.on_click(on_change2)
display(out1)

HBox(children=(Select(description='Model', index=1, options=('MusicNet', 'VocalSet'), value='VocalSet'), Selec…

HBox(children=(IntSlider(value=1, description='Experiment', max=3, min=1), IntSlider(value=1, description='Phr…

SelectMultiple(description='Phrase Type', index=(0,), layout=Layout(height='100px', width='75%'), options=('Ph…

IntSlider(value=4, description='Spl Group', max=12, min=1)

HBox(children=(Button(description='Get Audios', style=ButtonStyle()), Button(description='Start', style=Button…

Output(layout=Layout(border='0px solid black', padding='.5em', width='90%'))

# Dataset Construction

In [21]:
# Annotations

import io
import pandas as pd

annotations = pd.read_csv('../AudioMostly/annotations.csv', header=0, index_col=[0,1])

'Original of Phrase 1'

In [28]:
import glob
import h5py
import librosa
import tqdm.notebook as tqdm
import torch
import numpy as np
import matplotlib.pyplot as plt

all_audios = glob.glob(f'PhraseRecordings/Experiment1-Multiple/*/*.wav')
model = model2

with h5py.File('Motiv.hdf5', 'w') as f:

  for audio_path in tqdm.tqdm(all_audios):
    sample_name = audio_path.split('/')[-1][:-4]

    sax = sample_name.split('_')[1]
    phrase = int(sample_name.split('_')[0][2:])
    motion = sample_name.split('_')[2]

    # Load audio file (replace with real path)
    waveform, sr = librosa.load(audio_path, sr=48000)

    # Latent Vector
    x = torch.from_numpy(waveform)[None, None, :]
    with torch.no_grad():
      z = model.encode(x)
    lat = z.squeeze(0).detach().numpy().T

    # Example annotation

    try:
      annotation = annotations.loc[(sax, motion)].iloc[phrase-1]
    except:
      annotation = 'None'

    # Store audio
    f.create_dataset(f"audio_samples/{sample_name}", data=waveform)

    # Store latent space
    f.create_dataset(f"latent_vectors/{sample_name}", data=lat)

    # Store annotation
    f.attrs[f"annotations/{sample_name}"] = annotation

    # Store reference to musical score image (file saved separately)
    f.attrs[f"musical_score/{sample_name}"] = f"{sample_name}.png"

  0%|          | 0/117 [00:00<?, ?it/s]

In [30]:
with h5py.File("Motiv.hdf5", "r") as f:
    sample_name = "PH01_S01_C02"

    # Load audio
    waveform = f[f"audio_samples/{sample_name}"][:]

    # Load latent vectors
    latent_vectors = f[f"latent_vectors/{sample_name}"][:]

    # Load annotation
    annotation = f.attrs[f"annotations/{sample_name}"]

    # Load musical score (image path reference)
    score_path = f.attrs[f"musical_score/{sample_name}"]

    print("Annotation:", annotation)
    print("Audio Shape:", waveform.shape)
    print("Latent Shape:", latent_vectors.shape)
    print("Score Image Path:", score_path)

Annotation: Dynamics; Intervals
Audio Shape: (1440000,)
Latent Shape: (703, 16)
Score Image Path: PH01_S01_C02.png
