In [12]:
#------------------------------------------------------------------------------------------------
# 03. Corpus-based analysis and synthesis
#
# This notebook demonstrates how to:
#  - extract features from a pre-recorded corpus of audio (here, a large audio file)
#  - perform dimensionality reduction
#  - transform feature arrays into a pandas dataframe
#  - create interactive plots to play back grains in latent feature space
#  - use audio-rate modulation to modulate playback of grains
#  - use timeline-based sequencing to trigger playback of grains
#
# TODO: Different segmentation methods
#
# Requirements:
#   pip3 install numpy librosa scikit-learn ipython matplotlib altair pandas anywidget isobar
#------------------------------------------------------------------------------------------------

import librosa
import warnings
import numpy as np
import pandas as pd
import altair as alt
import sklearn.cluster
import IPython.display
import sklearn.decomposition
import sklearn.preprocessing
import matplotlib.pyplot as plt


# Hide numerical warnings and permit large datasets
warnings.filterwarnings('ignore')
alt.data_transformers.disable_max_rows()

from signalflow import *
graph = AudioGraph()

GraphAlreadyCreatedException: AudioGraph has already been created

In [None]:
import os
import torchaudio
import random as r
from tqdm import tqdm

path = "/Volumes/T7_Touch/ocd/generation/data_output/staging/rendered_audio/"
files = os.listdir(path)
r.shuffle(files)
fs = []
embeddings = []
audio_list = []

for file in tqdm(files[:500]):
    if ".flac" in file:
        try:
            full_path = path + file
            audio, _ = torchaudio.load(full_path)
            audio_list.append( audio.squeeze(0).numpy())

            
            fs.append(full_path)
        except Exception as e:
            print("error", e)

In [None]:
buffer = np.concatenate(audio_list, axis=0)


In [None]:
#--------------------------------------------------------------------------------
# Set global variables: FFT size, hop size, sample rate
#--------------------------------------------------------------------------------
# fft_size = 16384
fft_size = 4096
hop_size = fft_size // 2
# sample_rate = buffer.sample_rate
sample_rate = 44100

#--------------------------------------------------------------------------------
# Extract MFCC, and rescale to zero-mean, unit-variance.
# FluCoMa has a nice interactive explainer that gives some intuition:
# https://learn.flucoma.org/reference/mfcc/
#--------------------------------------------------------------------------------
# X = librosa.feature.mfcc(y=buffer.data[0], sr=sample_rate, n_fft=fft_size, hop_length=hop_size, n_mfcc=20)
X = librosa.feature.mfcc(y=buffer, sr=sample_rate, n_fft=fft_size, hop_length=hop_size, n_mfcc=20)
X = sklearn.preprocessing.scale(X)
X = X.T
print("MFCC coefficient shape: %s" % str(X.shape))
print(np.round(X[:8,:8], 4))

In [None]:
#--------------------------------------------------------------------------------
# Perform Principal Component Analysis (PCA) to reduce the dimensionality of
# each input MFCC frame, and extract various manually-specified features.
#--------------------------------------------------------------------------------
model = sklearn.decomposition.PCA(n_components=3, whiten=True)
model.fit(X)
Y = model.transform(X)
print("Y shape: %s" % str(Y.shape))

#--------------------------------------------------------------------------------
# Create data series, containing per-segment properties which are later needed
# for display and playback:
#  - ordinal index
#  - timestamp (in seconds)
#  - duration (the same for every block, as we're using identically-sized blocks)
#--------------------------------------------------------------------------------
index = np.arange(len(Y))
timestamp = index * hop_size / sample_rate
duration = fft_size / sample_rate
duration_array = np.array([duration] * len(Y))

def floats_to_ordinals(floats):
    sorted_indices = np.argsort(floats)
    positions = np.argsort(sorted_indices)
    return positions.tolist()

#--------------------------------------------------------------------------------
# Manually extract a few features to add to the data frame:
#  - spectral centroid
#  - spectral flatness
#  - k-means cluster
#--------------------------------------------------------------------------------
centroid = librosa.feature.spectral_centroid(y=buffer, sr=44100, n_fft=fft_size, hop_length=hop_size)[0]
flatness = librosa.feature.spectral_flatness(y=buffer, n_fft=fft_size, hop_length=hop_size)[0]
flatness = floats_to_ordinals(flatness)
kmeans = sklearn.cluster.KMeans(n_clusters=4)
labels = kmeans.fit_predict(Y)

#--------------------------------------------------------------------------------
# Aggregate all features into a pandas DataFrame, with columns for each feature
# and a row for each segment
#--------------------------------------------------------------------------------
df = pd.DataFrame({
    "a": Y[:,0],
    "b": Y[:,1],
    "c": Y[:,2],
    "centroid": centroid,
    "flatness": flatness,
    "index": index,
    "timestamp": timestamp,
    "duration": duration_array,
    "cluster": labels,
})
df

In [6]:
#--------------------------------------------------------------------------------
# Altair scatter plot displaying each grain's location within feature space,
# with a point-based selector to trigger playback of grains
#--------------------------------------------------------------------------------

buffer = Buffer(buffer)
chart = alt.Chart(df, width=800, height=500)
chart = chart.mark_circle(size=40)
chart = chart.encode(x=alt.X("a"),
                     y=alt.Y("b"),
                     color=alt.Color('timestamp').scale(scheme="plasma"),
                     tooltip=["index", "timestamp"])

selector = alt.selection_point(name="point",
                               on='mouseover',
                               nearest=True)
chart = chart.add_params(selector)

#--------------------------------------------------------------------------------
# SegmentedGranulator plays back a segment of the input file when triggered
#--------------------------------------------------------------------------------
granulator = SegmentedGranulator(buffer,
                                 df.timestamp,
                                 df.duration)
# granulator.set_buffer("envelope", EnvelopeBuffer("hanning"))
granulator.set_buffer("envelope", EnvelopeBuffer("linear-decay"))
#granulator.set_buffer("envelope", EnvelopeBuffer("triangle"))
attenuated = granulator * 0.75
attenuated.play()

#--------------------------------------------------------------------------------
# Altair callback to trigger a grain on hover
#--------------------------------------------------------------------------------
def on_select(change):
    value = change["new"].value
    if value:
        index = value[0]
        granulator.trigger("trigger", index)
        granulator.index = index

#--------------------------------------------------------------------------------
# Add Jupyter interactivity
#--------------------------------------------------------------------------------
jchart = alt.JupyterChart(chart)
jchart.selections.observe(on_select, ["point"])
jchart

JupyterChart(spec={'config': {'view': {'continuousWidth': 300, 'continuousHeight': 300}}, 'data': {'name': 'da…

In [7]:
#--------------------------------------------------------------------------------
# This example demonstrates playing features in real-time, using SignalFlow
# LFO objects to modulate the X/Y position in feature space.
#
# The NearestNeighbour node performs a search for the nearest datapoint to
# the specified `target` coordinate.
# 
# The signalflow_analysis library contains the AudioFeatureBuffer object,
# which encodes N-dimensional frame-wise feature properties. AudioFeatureBuffer
# is a subclass of the generic "Buffer" signalflow class.
#
#  - Each channel of the buffer corresponds to a feature
#  - Each sample in the buffer corresponds to a segment (block) of the input
#--------------------------------------------------------------------------------

from signalflow_analysis import *

xpos = SineLFO(0.5, -1, 1)
ypos = SineLFO(0.71, -1, 1)
feature_buffer = AudioFeatureBuffer([df.a, df.b])
nearest_index = NearestNeighbour(feature_buffer, target=[xpos, ypos])
player = SegmentedGranulator(buffer=buffer,
                             onset_times=df.timestamp,
                             durations=df.duration,
                             index=nearest_index,
                             clock=RandomImpulse(5))
attenuated = player * 0.5
attenuated.play()

In [8]:
#--------------------------------------------------------------------------------
# Modulate playback parameters interactively.
#--------------------------------------------------------------------------------
player.clock = RandomImpulse(20)
nearest_index.target = [xpos, ypos]

In [9]:
#--------------------------------------------------------------------------------
# Example using the isobar sister library for sequencing note events.
#--------------------------------------------------------------------------------

from isobar import *

class NearestNeighbourTrigger (Patch):
    #--------------------------------------------------------------------------------
    # This patch encapsulates a granulator that plays back grains selected via
    # a feature buffer, when triggered.
    #--------------------------------------------------------------------------------
    def __init__(self):
        super().__init__()
        x = self.add_input("centroid")
        y = self.add_input("flatness")
        feature_buffer = AudioFeatureBuffer([df.centroid, df.flatness])
        nn = NearestNeighbour(feature_buffer, [x, y])
        granulator = SegmentedGranulator(buffer,
                                         df.timestamp,
                                         df.duration * 0.2,
                                         index=nn)
        # how to create envelope buffer from segments / shape?
        granulator.set_buffer("envelope", EnvelopeBuffer("linear-decay"))
        delay = AllpassDelay(granulator, 0.1, feedback=0.9)
        output = granulator + delay * 0.5
        self.set_output(output)
        self.set_trigger_node(granulator)

#--------------------------------------------------------------------------------
# Create the patch and connect it to the graph.
#--------------------------------------------------------------------------------
nnpatch = NearestNeighbourTrigger()
nnpatch.play()

#--------------------------------------------------------------------------------
# Create a 150bpm timeline.
#--------------------------------------------------------------------------------
timeline = Timeline(150, output_device=SignalFlowOutputDevice(graph=graph))
timeline.background()

In [10]:
timeline.schedule({
    "patch": nnpatch,
    "type": "trigger",
    "duration": 0.25,
    "params": {
        "centroid": PSequence([300, 2000, 4000, 3000, 700]) + PWhite(-200, 1000),
        "flatness": PSequence([100, 3000, 2000, 3000]) + PWhite(-500, 500)
    }
}, name="track", replace=True)

<isobar.timeline.track.Track at 0x13d983430>

In [11]:
# timeline.clear()
graph.clear()