# RAVE Latent Space Exploration

In [None]:
import librosa
import torch
import torchaudio
import IPython.display as ipd
import numpy as np
import tempfile
import soundfile as sf
import os

import rave

from pydub import AudioSegment
from pydub.playback import play

In [None]:
tapped_dict = {
    "ac_dc": "../data/tapped/ac_dc-back_in_black-tapped.flac",
    "amy_winehouse": "../data/tapped/Tears Dry On Their Own - Amy Whinehouse-tapped.flac",
    "amália": "../data/tapped/Barco Negro (Mão Preta) - Amália Rodrigues-tapped.flac",
    "arctic_monkeys": "../data/tapped/arctic_monkeys-the_view_from_the_afternoon-tapped.flac",
    "beatles1": "../data/tapped/Come Together - The Beatles-tapped.flac",
    "beatles2": "../data/tapped/the_beatles-in_my_life-tapped.flac",
    "capitão_fausto": "../data/tapped/Morro na Praia - Capitão Fausto-tapped.flac",
    "david_miguel": "../data/tapped/Inatel - David e Miguel-tapped.flac",
    "gorillaz": "../data/tapped/Clint Eastwood - Gorillaz-tapped.flac",
    "ibeyi": "../data/tapped/River - Ibeyi-tapped.flac",
    "linda_martini": "../data/tapped/Febril - Linda Martini-tapped.flac",
    "michael_jackson": "../data/tapped/Billie Jean - Michael Jackson-tapped.flac",
    "qotsa": "../data/tapped/qotsa_no_one_knows-tapped.flac",
    "queen": "../data/tapped/queen-another_one_bites_the_dust-tapped.flac",
    "slowj": "../data/tapped/slowj-mundança-tapped.flac",
    "strokes": "../data/tapped/Reptilia - The Strokes-tapped.flac",
    "white_stripes": "../data/tapped/white_stripes-seven_nation_army-tapped.flac"
}

In [None]:
def read_audio(file_path, trim_interval=None, mono=True, print_it=False):
    audio, sr = librosa.load(file_path, mono=mono)
    audio_dim = len(audio.shape)
    if not mono and audio_dim == 1:
        audio = np.asarray((audio, audio))
    if trim_interval is not None:
        ti = trim_interval[0]
        tf = trim_interval[1]
        audio = trim_audio(audio, sr, ti, tf, mono)
    if print_it:
        print(audio.shape)
        print(sr)
    return audio, sr

def trim_audio(audio, sr, ti, tf, mono=True):
    i = ti * sr
    f = tf * sr
    if mono:
        t_audio = audio[i:f]
    else:
        t_audio = audio[:, i:f]
    return t_audio


def remix_audio(left_audio_array, right_audio_array, sample_rate=44100):
    # Ensure both audio arrays have the same length
    length = min(len(left_audio_array), len(right_audio_array))
    left_audio_array = left_audio_array[:length]
    right_audio_array = right_audio_array[:length]

    # Create a stereo audio array with left channel and right channel
    stereo_audio_array = np.column_stack((left_audio_array, right_audio_array))

    # Save the remixed audio to the specified output path
    #sf.write(output_path, stereo_audio_array, sample_rate)

    return stereo_audio_array

In [None]:
#song = "ac_dc"
#song = "amália"
#song = "amy_winehouse"
#song = "arctic_monkeys"
#song = "beatles1"
#song = "beatles2"
##song = "capitão_fausto"
#song = "david_miguel"
#song = "gorillaz"
#song = "ibeyi"
#song = "linda_martini"
#song = "michael_jackson"
#song = "qotsa"
##song = "queen"
song = "slowj"
#song = "strokes"
#song = "white_stripes"

trim = True

trim_interval = None
# choose the interval in which you want to trim the audio (in seconds)
if trim:
    trim_interval = (15, 17)


In [None]:
tapped_path = tapped_dict[song]
#tapped_path = '../data/KickSequence.wav'

tapped_audio, sr = read_audio(tapped_path, trim_interval)
ipd.Audio(tapped_audio, rate=sr)

In [None]:
# model_name = 'GMDrums_v3_29-09_3M_streaming'
#model_name = 'percussion'
#model_name = 'darbouka_onnx'
model_name = 'drumkit_v1'
model = torch.jit.load(f'../models/{model_name}.ts')

desired_length = 44100

# Calculate the amount of padding needed
padding_width = max(0, desired_length - len(tapped_audio))
print(len(tapped_audio))

# Pad the array with zeros
tapped_audio = np.pad(tapped_audio, (0, padding_width), mode='constant', constant_values=0)

with torch.no_grad():
    x = torch.from_numpy(tapped_audio).reshape(1,1,-1)

    # encode and decode the audio with RAVE
    z = model.encode(x)
    latent_space_matrix = torch.squeeze(z, 0)
    #print(latent_space_matrix)


    x_hat = model.decode(z)
waveform_tensor = torch.squeeze(x_hat, 0)

In [None]:
import torch
import torch.nn.functional as F

def calculate_distance(tensor1, tensor2, distance_metric='euclidean'):
    if distance_metric == 'euclidean':
        return F.pairwise_distance(tensor1, tensor2)
    elif distance_metric == 'cosine':
        return 1 - F.cosine_similarity(tensor1, tensor2)
    else:
        raise ValueError("Unsupported distance metric")

desired_length = 44100

# Calculate the amount of padding needed
padding_width = max(0, desired_length - len(tapped_audio))

# Pad the array with zeros
tapped_audio = np.pad(tapped_audio, (0, padding_width), mode='constant', constant_values=0)

# Example usage
with torch.no_grad():
    x = torch.from_numpy(tapped_audio).reshape(1,1,-1)

    # encode and decode the audio with RAVE
    z = model.encode(x)
    tapped_latent_space_matrix = torch.squeeze(z, 0)
    
kick_path = '../data/WAV/Individual Hits/01. Bass Drum/04. Color/Color 04/03. Long/BD 909 Color 04 Long B 01.wav'

kick_audio, sr = read_audio(kick_path)

# Calculate the amount of padding needed
padding_width = max(0, desired_length - len(kick_audio))

# Pad the array with zeros
padded_kick_audio = np.pad(kick_audio, (0, padding_width), mode='constant', constant_values=0)

with torch.no_grad():
    x = torch.from_numpy(padded_kick_audio).reshape(1,1,-1)

    # encode and decode the audio with RAVE
    z = model.encode(x)
    kick_latent_space_matrix = torch.squeeze(z, 0)

distance = calculate_distance(tapped_latent_space_matrix, kick_latent_space_matrix, distance_metric='euclidean')
#print(distance)

In [None]:
ipd.Audio(padded_kick_audio, rate=sr)

In [None]:
ipd.Audio(tapped_audio, rate=sr)

In [None]:
def interpolate_tensors(tensor1, tensor2, alpha):
    return alpha * tensor1 + (1 - alpha) * tensor2

# Example usage
alpha_value = 0.5  # Adjust alpha as needed
interpolated_tensor = interpolate_tensors(kick_latent_space_matrix, tapped_latent_space_matrix, alpha_value)

with torch.no_grad():

    z = torch.unsqueeze(interpolated_tensor, 0)
    print(interpolated_tensor.shape)


    x_hat = model.decode(z)
waveform_tensor = torch.squeeze(x_hat, 0)


In [None]:
output_file = 'output.wav'
torchaudio.save(output_file, waveform_tensor, sr)

interpolated_audio, sr = read_audio(output_file)
print(interpolated_audio.shape)

ipd.Audio(interpolated_audio, rate=sr)

In [None]:
import os

def find_files_with_keyword(folder_path, keyword):
    matching_files = []
    
    # Iterate over all files in the folder
    for filename in os.listdir(folder_path):
        file_path = os.path.join(folder_path, filename)
        
        # Check if the file is a regular file and contains the keyword
        if os.path.isfile(file_path) and keyword in filename:
            matching_files.append(file_path)

    return sorted(matching_files)

# Replace 'your_folder_path' with the path to the folder you want to search
folder_path = '../data/Roland TR-909'
keyword = 'Kick'

kick_files = find_files_with_keyword(folder_path, keyword)

kick_files.append('../data/Roland TR-909/TR-909Crash.wav')

# Print the list of matching files
#print(f"Files in '{folder_path}' containing the keyword '{keyword}':")
#for file_path in matching_files:
#    print(file_path)


In [None]:
import os
import glob

def find_wav_files(folder_path):
    wav_files = []
    
    # Use os.walk to traverse the directory tree
    for root, dirs, files in os.walk(folder_path):
        # Use glob to find .wav files in the current directory
        wav_files.extend(glob.glob(os.path.join(root, '*.wav')))
    
    return wav_files

# Example usage
folder_path = '../data/WAV/Individual Hits'
wav_files = find_wav_files(folder_path)

print("Found .wav files:")
#for wav_file in wav_files:
    #print(wav_file)
print(len(wav_files))

In [None]:
with torch.no_grad():

    z = torch.unsqueeze(tapped_latent_space_matrix, 0) 
    x_hat = model.decode(z)
tapped_rave_original = torch.squeeze(x_hat, 0)

In [None]:
ipd.Audio(tapped_rave_original, rate=sr)

In [None]:
with torch.no_grad():

    z = torch.unsqueeze(kick_latent_space_matrix, 0) 
    x_hat = model.decode(z)
kick_rave_original = torch.squeeze(x_hat, 0)

In [None]:
ipd.Audio(kick_rave_original, rate=sr)

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def interpolate_one_dimension(point1, point2, t, dim):
    result = np.copy(point1)
    result[:, dim] = (1 - t) * point1[:, dim] + t * point2[:, dim]
    return result

# Example with random points in a 16x22 latent space
latent_point1 = tapped_latent_space_matrix
latent_point2 = kick_latent_space_matrix

# Choose the dimension to interpolate (e.g., the first dimension, dim=0)
interpolated_dimension = 0

# Interpolate with t in the range [0, 1]
t_values = np.linspace(0, 1, num=10)
interpolated_points = [interpolate_one_dimension(latent_point1, latent_point2, t, interpolated_dimension) for t in t_values]

# Visualization (assuming 16x22 latent space)
plt.plot(latent_point1[:, interpolated_dimension], label='Point 1')
plt.plot(latent_point2[:, interpolated_dimension], label='Point 2')
for i, t in enumerate(t_values):
    plt.plot(interpolated_points[i][:, interpolated_dimension], label=f'Interpolated {t:.2f}')

plt.legend()
plt.title(f'Linear Interpolation along Dimension {interpolated_dimension}')
plt.show()


In [None]:
interpolated_audios=[]
for i in interpolated_points:
    with torch.no_grad():
    
        z = torch.unsqueeze(interpolated_tensor, 0)      
        x_hat = model.decode(z)
    interpolated_audios.append(torch.squeeze(x_hat, 0))

In [None]:
output_file = 'output.wav'

torchaudio.save(output_file, interpolated_audios[9], sr)

interpolated_audio, sr = read_audio(output_file)
print(interpolated_audio.shape)

ipd.Audio(interpolated_audio, rate=sr)

In [None]:
kick_latents = []
kick_filtered_files = []
for kick_path in wav_files:

    kick_audio, sr = read_audio(kick_path, print_it=False)
    #kick_audio = np.pad()
    #kick_audio = kick_audio[:1500]
    if kick_audio.shape[0] < 4410:
        continue
    
    kick_filtered_files.append(kick_path)
    desired_length = 44100

    # Calculate the amount of padding needed
    padding_width = max(0, desired_length - len(kick_audio))

    # Pad the array with zeros
    kick_audio = np.pad(kick_audio, (0, padding_width), mode='constant', constant_values=0)
    #print(kick_audio.shape)

    with torch.no_grad():
        x = torch.from_numpy(kick_audio).reshape(1,1,-1)

        # encode and decode the audio with RAVE
        z = model.encode(x)
        kick_latent_space_matrix = torch.squeeze(z, 0)
        kick_latents.append(kick_latent_space_matrix)

# Check the shapes of all tensors

tensor_shapes = [tensor.shape for tensor in kick_latents]
if all(shape == tensor_shapes[0] for shape in tensor_shapes):
    print("All have same shape")
print(tensor_shapes)

kick_latents.append(tapped_latent_space_matrix)
kick_latents.extend(interpolated_points)


# Ensure all tensors have the same shape
if not all(shape == tensor_shapes[0] for shape in tensor_shapes):
    raise ValueError("All input tensors must have the same shape.")

# Convert each PyTorch tensor to a NumPy array
#latent_space_numpy = np.stack([tensor.numpy() for tensor in kick_latents], axis=0)
latent_space_numpy = np.array(kick_latents)
#print(latent_space_numpy)

# Now, latent_space_numpy is a NumPy array that you can use as input to UMAP
#kick_tensor = torch.cat(kick_latents, axis=1)
#kick_tensor = kick_tensor.permute(-1, 0)
#print(kick_tensor.shape)

In [None]:
import umap

# Reshape each tensor to a 1D array while preserving temporal order
flattened_tensors = [tensor.flatten() for tensor in kick_latents]

# Stack the flattened tensors into a 2D array
data_matrix = np.vstack(flattened_tensors)
print(data_matrix.shape)

# Apply UMAP
n_components = 2
umap_model = umap.UMAP(n_components=n_components)
umap_result = umap_model.fit_transform(data_matrix)

print(umap_result.shape)


In [None]:
def remove_common_part(file_names):
    # Find the common prefix
    common_prefix = os.path.commonprefix(file_names)
    
    # Find the common suffix (if needed)
    common_suffix = os.path.commonprefix([name[::-1] for name in file_names])[::-1]

    # Remove the common part from each file name
    modified_file_names = [name[len(common_prefix):] for name in file_names]

    return modified_file_names


modified_file_names = remove_common_part(kick_filtered_files)
modified_file_names.append(tapped_path)
kick_filtered_files.append(tapped_path)

for i in interpolated_points:
    modified_file_names.append(tapped_path)
    kick_filtered_files.append(tapped_path)

#print("Modified file names:", modified_file_names)

In [None]:
#import umap.plot
#umap.plot.points(umap_result)

In [None]:
import matplotlib.pyplot as plt

# Visualize the UMAP result
for i, file_name in enumerate(modified_file_names):
    if "BD" in file_name:
        color = 'red'
    elif "SD" in file_name:
        color = 'green'
    elif "tapped" in file_name:
        color = 'orange'
    else:
        color = 'blue'
    plt.scatter(umap_result[i, 0], umap_result[i, 1], c=color, marker='o', s=10)
print(umap_result.shape)
print(len(modified_file_names))
    
plt.title('UMAP Visualization of Latent Space')
plt.xlabel('UMAP Component 1')
plt.ylabel('UMAP Component 2')
plt.show()


In [None]:
output_file = 'output.wav'
torchaudio.save(output_file, waveform_tensor, sr)

rave_audio, sr = read_audio(output_file)
print(rave_audio.shape)

ipd.Audio(rave_audio, rate=sr)

In [None]:
#!pip install fiftyone

In [None]:
#!pip install torch torchvision umap-learn

In [None]:
#!pip install 'ipywidgets>=8,<9'

In [None]:
import fiftyone as fo
import librosa

# Example list of audio file paths
audio_files = kick_filtered_files
audio_names = modified_file_names

# Create a FiftyOne SampleCollection
dataset = fo.Dataset()

# Iterate over audio files and create samples
for audio_file, audio_name in zip(audio_files, audio_names):
    # Load audio data using librosa or your preferred library
    audio_data, sample_rate = librosa.load(audio_file, sr=None)  # Adjust parameters as needed

    if "BD" in audio_name: label = 'BD'
    elif "SD" in audio_name: label = 'SD'
    elif "mundança" in audio_name: label = 'tap'
    else: label = 'other'
    
    # Create a FiftyOne Sample
    sample = fo.Sample(
        filepath=audio_file,
        id=audio_name,
        audio=audio_data,
        sample_rate=sample_rate,
        label=label,
        audio_name=audio_name,  # Add the audio name as a custom field
    )

    # Add the sample to the dataset
    dataset.add_sample(sample)

# Save the SampleCollection (optional)
#dataset.save("../data")

In [None]:
#import fiftyone as fo
#import fiftyone.core.fields as fof
#import numpy as np

# Assume you have a latent tensor
#extra_latent_tensor = tapped_latent_space_matrix
#print(tapped_latent_space_matrix.shape)

# Create a new sample with the latent tensor
#sample = fo.Sample(filepath=tapped_path, label="input")

# Add the array field to the sample
#sample["latent_tensor"] = fof.ArrayField()

# Assign the latent tensor to the array field
#sample["latent_tensor"].value = extra_latent_tensor

# Add the sample to the dataset
#dataset.add_sample(sample)

In [None]:
import cv2
import numpy as np

import fiftyone.brain as fob

# Construct a ``num_samples x num_pixels`` array of images
embeddings = data_matrix

# Compute 2D representation
results = fob.compute_visualization(
    dataset,
    embeddings=embeddings,
    num_dims=2,
    method="tsne",
    brain_key="mnist_test",
    verbose=True,
    seed=51,
)

In [None]:
print(type(results))
print(results.points.shape)

In [None]:
session = fo.launch_app(dataset)

In [None]:
plot = results.visualize(labels="label")
plot.show(height=720)

session.plots.attach(plot)

In [None]:
fo.close_app()

In [None]:
import numpy as np
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt


# Apply PCA
pca = PCA()
latent_space_pca = pca.fit_transform(latent_space_matrix)
print(latent_space_pca)

# Explained variance ratio
explained_variance_ratio = pca.explained_variance_ratio_

# Plot the cumulative explained variance
cumulative_variance_ratio = np.cumsum(explained_variance_ratio)
plt.plot(cumulative_variance_ratio, marker='o')
plt.xlabel('Number of Principal Components')
plt.ylabel('Cumulative Explained Variance')
plt.title('Cumulative Explained Variance vs. Number of Principal Components')
plt.show()

# Print the principal components
print('Principal Components:')
print(pca.components_.shape)
print(pca.components_)

In [None]:
pca = PCA(n_components=2)
latent_space_2d = pca.fit_transform(latent_space_matrix)

# Scatter plot
plt.scatter(latent_space_2d[:, 0], latent_space_2d[:, 1])
plt.title('Latent Space Matrix in 2D')
plt.xlabel('Principal Component 1')
plt.ylabel('Principal Component 2')
plt.show()

In [None]:
ipd.Audio(tapped_audio, rate=sr)

In [None]:
remixed_audio = remix_audio(tapped_audio, rave_audio, sample_rate=sr)

normalized_audio = remixed_audio / np.max(np.abs(remixed_audio))

# Convert to 16-bit PCM format
pcm_audio = (normalized_audio * 32767).astype(np.int16)

# Save the PCM audio data to a temporary WAV file
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_wav:
    sf.write(temp_wav.name, pcm_audio, sr)

# Display the audio using ipd.Audio
ipd.display(ipd.Audio(temp_wav.name))

# Clean up the temporary file
os.remove(temp_wav.name)

In [None]:
import numpy as np
import librosa
from scipy.signal import correlate

# Assuming rave_audio and tapped_audio are your stereo audio data with shape (length, 2)
# Replace them with your actual stereo audio data

# Choose one channel for onset detection (e.g., left channel)
rave_channel = rave_audio
tapped_channel = tapped_audio

# Normalize the audio to the range [-1, 1]
rave_channel_normalized = rave_channel / np.max(np.abs(rave_channel))
tapped_channel_normalized = tapped_channel / np.max(np.abs(tapped_channel))

# Compute onsets using librosa onset detection
rave_onsets = librosa.onset.onset_detect(rave_channel_normalized, sr=sr)
tapped_onsets = librosa.onset.onset_detect(tapped_channel_normalized, sr=sr)

# Compute cross-correlation
cross_corr = correlate(rave_channel_normalized, tapped_channel_normalized, mode='full')

# Find the time lag (shift) with maximum cross-correlation
time_lag = np.argmax(cross_corr) - len(rave_channel_normalized) + 1

# Shift the tapped_audio to align with rave_audio
aligned_tapped_audio = np.roll(tapped_audio, time_lag, axis=0)

# Now, aligned_tapped_audio and rave_audio are aligned based on onset detection

# Display the aligned audio using ipd.Audio
ipd.display(ipd.Audio(aligned_tapped_audio, rate=sr))

In [None]:
# Compute 2D representation
results = fob.compute_visualization(
    dataset,
    embeddings=embeddings,
    num_dims=2,
    method="tsne",
    brain_key="mnist_test",
    verbose=True,
    seed=51,
)

In [None]:
import pandas as pd

df = pd.DataFrame(results.points, columns=['x', 'y'])
df['label'] = dataset.values('label')
df['audio_name'] = dataset.values('audio_name')
df['filepath'] = dataset.values('filepath')

df.head()

In [None]:
import plotly.graph_objs as go

# Create a scatter plot of the latent space
data = []
for label in df['label'].unique():
    df_label = df[df['label'] == label]
    scatter = go.Scatter(
        x=df_label['x'], 
        y=df_label['y'], 
        mode='markers',
        text=df_label['audio_name'],  # Add the audio name as text
        hovertemplate='%{text}<extra></extra>',  # Customize the hover template
        name=label,  # Use the label as the name of the trace
        customdata=df_label['filepath']  # Add the file path as custom data
    )
    scatter.on_click(lambda x: print(x.points[0].hovertext))  # Print the name of the point when it's clicked
    data.append(scatter)

fig = go.Figure(data=data)

# Adjust the margins (l, r, t, b stand for left, right, top, and bottom)
fig.update_layout(
    autosize=False,
    width=1000,
    height=400,
    margin=dict(l=24,r=24,b=24,t=24,pad=0)
)

import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output
import base64
import io

def get_relative_path(absolute_path):
    base_path = os.getcwd()
    return os.path.relpath(absolute_path, base_path)

app = dash.Dash(__name__)

app.layout = html.Div([
    dcc.Graph(id='scatter-plot', figure=fig),
    html.Pre(id='click-data', style={'padding': '10px', 'color': 'white'}),
    html.Audio(id='input-player', controls=True, autoPlay=True),
    html.Audio(id='output-player', controls=True, autoPlay=False)
])

@app.callback(Output('click-data', 'children'), [Input('scatter-plot', 'clickData')])
def display_click_data(clickData):
    if clickData is None: return 'None'
    absolute_path = clickData["points"][0]["customdata"]
    base_path = os.getcwd()
    return os.path.relpath(absolute_path, base_path)

@app.callback(Output('input-player', 'src'), [Input('scatter-plot', 'clickData')])
def play_input(clickData):
    if clickData is None: return ''

    relative_path = get_relative_path(clickData["points"][0]["customdata"])
    with open(relative_path, 'rb') as audio_file:
        encoded_audio = base64.b64encode(audio_file.read()).decode('ascii')
    src = f'data:audio/mp3;base64,{encoded_audio}'
    
    return src

@app.callback(Output('output-player', 'src'), [Input('scatter-plot', 'clickData')])
def play_output(clickData):
    if clickData is None: return ''
    
    relative_path = get_relative_path(clickData["points"][0]["customdata"])

    audio, sr = read_audio(relative_path)
    with torch.no_grad():
        x = torch.from_numpy(audio).reshape(1 ,1, -1)
        z = model.encode(x)
        x_hat = model.decode(z)
    waveform_tensor = torch.squeeze(x_hat, 0)
    output_path = '../output/output.wav'
    torchaudio.save(output_path, waveform_tensor, sr)

    with open(output_path, 'rb') as audio_file:
        encoded_audio = base64.b64encode(audio_file.read()).decode('ascii')
    src = f'data:audio/mp3;base64,{encoded_audio}'

    return src

if __name__ == '__main__': app.run_server(debug=True)