In [10]:
import copy
import librosa
import madmom
import numpy as np
import os
import tensorly
import tensorly.decomposition as tld
from sklearn.decomposition import NMF

In [11]:
def run_algorithm(audio_file, n_templates=[0,0,0], output_savename="extracted_loop"):
    """Complete pipeline of algorithm.

    Parameters
    ----------
    audio_file : string
        Path to audio file to be loaded and analysed.
    n_templates : list of length 3
        The number of sound, rhythm and loop templates.
        Default value (0,0,0) causes the script to estimate reasonable values.
    output_savename: : string
        Base string for saved output filenames.

    Returns
    -------
    A set of files containing the extracted loops.

    Examples
    --------
    >>> run_algorithm("example_song.mp3", [40,20,7], "extracted_loop")
    
    See also
    --------
    tensorly.decomposition.non_negative_tucker
    """
    assert os.path.exists(audio_file)
    assert len(n_templates)==3
    assert type(n_templates) is list
    # Load mono audio:
    signal_mono, fs = librosa.load(audio_file, sr=None, mono=True)
    # Use madmom to estimate the downbeat times:
    downbeat_times = get_downbeats(signal_mono)
    # Convert times to frames so we segment signal:
    downbeat_frames = librosa.time_to_samples(downbeat_times, sr=fs)
    # Create spectral cube out of signal:
    spectral_cube = make_spectral_cube(signal_mono, downbeat_frames)
    # Validate the input n_templates (inventing new ones if any is wrong):
    n_sounds, n_rhythms, n_loops = validate_template_sizes(spectral_cube, n_templates)
    # Use TensorLy to do the non-negative Tucker decomposition:
    core, factors = tld.non_negative_tucker(np.abs(spectral_cube), [n_sounds, n_rhythms, n_loops], n_iter_max=500, verbose=True)
    # Reconstruct each loop:
    for ith_loop in range(n_loops):
        # Multiply templates together to get real loop spectrum:
        loop_spectrum = create_loop_spectrum(factors[0], factors[1], core[:,:,ith_loop])
        # Choose best bar to reconstruct from (we will use its phase):
        bar_ind = choose_bar_to_reconstruct(factors[2], ith_loop)
        # Reconstruct loop signal by masking original spectrum:
        ith_loop_signal = get_loop_signal(loop_spectrum, spectral_cube[:,:,bar_ind])
        # Write signal to disk:
        librosa.output.write_wav("{0}_{1}.wav".format(output_savename,ith_loop), ith_loop_signal, fs)

In [12]:
def get_downbeats(signal):
    """Use madmom package to estimate downbeats for an audio signal.

    Parameters
    ----------
    signal : np.ndarray [shape=(n,), dtype=float]
        Input mono audio signal.

    Returns
    -------
    downbeat_times : np.ndarray [shape=(n,), dtype=float]
        List of estimated downbeat times in seconds.

    Examples
    --------
    >>> signal_mono, fs = librosa.load("example_song.mp3", sr=None, mono=True)
    >>> get_downbeats(signal_mono)
    array([1.000e-02, 1.890e+00, 3.760e+00, 5.630e+00, 7.510e+00, 9.380e+00,
           1.126e+01, 1.313e+01, 1.501e+01, 1.688e+01, 1.876e+01, 2.064e+01,
           2.251e+01, 2.439e+01, 2.626e+01, 2.814e+01, 3.002e+01, 3.189e+01,
           3.376e+01, 3.564e+01, 3.751e+01, 3.939e+01, 4.126e+01, 4.314e+01,
           4.501e+01, 4.689e+01, 4.876e+01, 5.063e+01, 5.251e+01, 5.439e+01,
           5.626e+01, 5.813e+01])
    
    See Also
    --------
    madmom.features.downbeats.RNNDownBeatProcessor
    madmom.features.downbeats.DBNDownBeatTrackingProcessor
    """
    act = madmom.features.downbeats.RNNDownBeatProcessor()(signal)
    proc = madmom.features.downbeats.DBNDownBeatTrackingProcessor(beats_per_bar=[3, 4], fps=100)
    processor_output = proc(act)
    downbeat_times = processor_output[processor_output[:,1]==1,0]
    return downbeat_times

In [13]:
def make_spectral_cube(signal_mono, downbeat_frames):
    """Convert audio signal into a spectral cube using
    specified downbeat frames.

    An STFT is taken of each segment of audio, and
    these STFTs are stacked into a 3rd dimension.
    
    The STFTs may have different lengths; they are
    zero-padded to the length of the longest STFT.

    Parameters
    ----------
    signal_mono : np.ndarray [shape=(n,), dtype=float]
        one-dimensional audio signal to convert
    downbeat_frames : np.ndarray [shape=(n,), dtype=int]
        list of frames separating downbeats (or whatever
        time interval is desired)

    Returns
    -------
    tensor  : np.ndarray [shape=(n1,n2,n3), dtype=complex64]
        tensor containing spectrum slices

    Examples
    --------
    >>> signal_mono, fs = librosa.load("example_song.mp3", sr=None, mono=True)
    >>> downbeat_times = get_downbeats(signal_mono)
    >>> downbeat_frames = librosa.time_to_samples(downbeat_times, sr=fs)
    >>> spectral_cube = make_spectral_cube(signal_mono, downbeat_frames)
    >>> spectral_cube.shape
    (1025, 162, 31)
    >>> spectral_cube[:2,:2,:2]
    array([[[ 18.08905602+0.00000000e+00j, -20.48682976+0.00000000e+00j],
            [-16.07670403+0.00000000e+00j, -44.98669434+0.00000000e+00j]],

           [[-19.45080566+3.66026653e-15j,  -8.5700922 +3.14418630e-16j],
            [  1.01680577-3.67251587e+01j,  35.03190231-2.13507919e+01j]]])
    """
    assert len(signal_mono.shape) == 1
    # For each span of audio, compute the FFT using librosa defaults.
    fft_per_span = [librosa.core.stft(signal_mono[b1:b2]) for b1,b2 in zip(downbeat_frames[:-1],downbeat_frames[1:])]
    # Tensor size 1: the number of frequency bins
    freq_bins = fft_per_span[0].shape[0]
    # Tensor size 2: the length of the STFTs.
    # This could vary for each span; use the maximum.
    rhyt_bins = np.max([fpb.shape[1] for fpb in fft_per_span])
    # Tensor size 3: the number of spans.
    bar_bins = len(fft_per_span)
    tensor = np.zeros((freq_bins, rhyt_bins, bar_bins)).astype(complex)
    for i in range(bar_bins):
        tensor[:,:fft_per_span[i].shape[1],i] = fft_per_span[i]
    return tensor

In [14]:
def validate_template_sizes(spectral_cube, n_templates):
    """Ensure that specified number of estimated templates are valid.
    Values must be greater than 1 and strictly less than
    the corresponding dimension of the original tensor.
    So, if the tensor has size [1025,100,20], then
    n_templates = [99,99,10] is valid (though unadvised), while
    n_templates = [30,20,20] is invalid.
    
    If ANY of the values for n_templates are invalid, than
    get_recommended_template_sizes() is used to obtain
    replacement values for n_templates.

    Parameters
    ----------
    spectral_cube : np.ndarray [shape=(n1,n2,n3)]
        Original tensor to be modeled.
    n_templates : list [shape=(3,), dtype=int]
        Proposed numbers of templates.

    Returns
    -------
    output_n_templates : np.ndarray [shape=(3,), dtype=int]
        Validated numbers of templates.

    Examples
    --------
    >>> validate_template_sizes(np.zeros((1025, 162, 31)), [100, 50, 20])
    array([100, 50, 20])
    >>> validate_template_sizes(np.zeros((1025, 162, 31)), [0, 0, 0])
    array([63, 21,  7])
    >>> validate_template_sizes(np.zeros((1025, 162, 31)), [100, 50, 40])
    array([63, 21, 7])
    
    See Also
    --------
    get_recommended_template_sizes
    """
    max_template_sizes = np.array(spectral_cube.shape) - 1
    min_template_sizes = np.ones_like(max_template_sizes)
    big_enough = np.all(min_template_sizes <= n_templates)
    small_enough = np.all(n_templates <= max_template_sizes)
    valid = big_enough & small_enough
    if valid:
        return n_templates
    else:
        return get_recommended_template_sizes(spectral_cube)

In [15]:
def purify_core_tensor(core, factors, new_rank, dim_to_reduce=2):
    """Reduce the size of the core tensor by modelling repeated content
    across loop recipes. The output is a more "pure" set of loop
    recipes that should be more distinct from each other.

    Parameters
    ----------
    core : np.ndarray [shape=(n1,n2,n3)]
        Core tensor to be compressed.
    factors : list [shape=(3,), dtype=np.ndarray]
        List of estimated templates
    new_rank : int
        The new size for the core tensor
    dim_to_reduce : int
        The dimension along which to compress the core tensor.
        (Default value 2 will reduce the number of loop types.)

    Returns
    -------
    new_core : np.ndarray [shape=(n1,n2,new_rank)]
        Compressed version of the core tensor
    new_factors : list [shape=(3,), dtype=np.ndarray]
        New list of templates.
        Note: two templates will be the same as before;
            only the template for the compressed dimension
            will be different.    
    """
    assert new_rank < core.shape[dim_to_reduce]
    X = tensorly.unfold(core,dim_to_reduce)
    model = NMF(n_components=new_rank, init='nndsvd', random_state=0)
    W = model.fit_transform(X)
    H = model.components_
    # Re-construct core tensor and factors based on NMF factors from core tensor:
    new_shape = list(core.shape)
    new_shape[dim_to_reduce] = new_rank
    new_core = tensorly.fold(H, dim_to_reduce, new_shape)
    new_factors = copy.copy(factors)
    new_factors[dim_to_reduce] = np.dot(factors[dim_to_reduce],W)
    return new_core, new_factors

In [16]:
def get_recommended_template_sizes(spectral_cube):
    """Propose reasonable values for numbers of templates
    to estimate.
    
    If a dimension of the tensor is N, then N^(6/10), rounded
    down, seems to give a reasonable value.

    Parameters
    ----------
    spectral_cube : np.ndarray [shape=(n1,n2,n3)]
        Original tensor to be modeled.

    Returns
    -------
    recommended_sizes : np.ndarray [shape=(len(spectral_cube.shape),), dtype=float]
        Suggested number of templates.

    Examples
    --------
    >>> get_recommended_template_sizes(np.zeros((100,200,300)))
    array([15, 23, 30])
    >>> get_recommended_template_sizes(np.zeros((4,400,40000)))
    array([  1,  36, 577])
    """
    max_template_sizes = np.array(spectral_cube.shape) - 1
    min_template_sizes = np.ones_like(max_template_sizes)
    recommended_sizes = np.floor(max_template_sizes**.6).astype(int)
    recommended_sizes = np.max((recommended_sizes, min_template_sizes),axis=0)
    assert np.all(min_template_sizes <= recommended_sizes)
    assert np.all(recommended_sizes <= max_template_sizes)
    return recommended_sizes

In [17]:
def create_loop_spectrum(sounds, rhythms, core_slice):
    """Recreate loop spectrum from a slice of the core tensor
    and the first two templates, the sounds and rhythms.

    Parameters
    ----------
    sounds : np.ndarray [shape=(n_frequency_bins, n_sounds), dtype=float]
        The sound templates, one spectral template per column.
    rhythms : np.ndarray [shape=(n_time_bins, n_rhythms), dtype=float]
        The rhythm templates, or time-in-bar activations functions.
        One rhythm template per column.
    core_slice : np.ndarray [shape=(n_sounds, n_rhythms)]
        A slice of the core tensor giving the recipe for one loop.

    Returns
    -------
    loop_spectrum : np.ndarray [shape=(n_frequency_bins, n_time_bins), dtype=float]
        Reconstruction of spectrum.

    Examples
    --------
    >>> np.random.seed(0)
    >>> factors = [np.abs(np.random.randn(1025, 63)),
            np.abs(np.random.randn(162, 21)),
            np.abs(np.random.randn(31, 7))]
    >>> core = np.abs(np.random.randn(63,21,7))
    >>> create_loop_spectrum(factors[0], factors[1], core[:,:,0])
    array([[727.4153606 , 728.64591236, 625.76726056, ..., 512.94167141,
            592.2098947 , 607.10457107],
           [782.11991843, 778.09690543, 682.71895323, ..., 550.43525375,
            636.51448493, 666.35600624],
           [733.96209316, 720.17586837, 621.80762807, ..., 501.51192504,
            590.14018676, 605.44147057],
           ...,
           [772.43712078, 758.88473642, 654.35159419, ..., 522.69754588,
            628.84580165, 641.66347072],
           [677.58720601, 666.52484723, 583.92269705, ..., 471.24362278,
            558.17441475, 573.31864635],
           [768.96634561, 758.85553214, 639.21515256, ..., 525.83186141,
            634.04799161, 644.35772338]])
    """
    loop_spectrum = np.dot(np.dot(sounds, core_slice), rhythms.transpose())
    return loop_spectrum

In [19]:
def choose_bar_to_reconstruct(loop_templates, ith_loop):
    """...Choose... bar... to... reconstruct!
    
    For now, it just choose the bar with the largest activation.
    More information could / should be included, like reducing
    cross-talk, which would mean considering the activations (but
    ideally the relative *loudnesses*) of the other loops.

    Parameters
    ----------
    loop_templates : np.ndarray [shape=(n_bars, n_loop_types), dtype=float]
        The loop activation templates, one template per column.
    ith_loop : int
        The index of the loop template.

    Returns
    -------
    bar_ind : int
        The index of the bar to choose.

    Examples
    --------
    >>> np.random.seed(0)
    >>> factors = [np.abs(np.random.randn(1025, 63)),
            np.abs(np.random.randn(162, 21)),
            np.abs(np.random.randn(31, 7))]
    >>> choose_bar_to_reconstruct(factors[2], 0)
    10
    """
    bar_ind = np.argmax(loop_templates[:,ith_loop])
    return bar_ind

In [20]:
def get_loop_signal(loop_spectrum, original_spectrum):
    """Reconstruct the signal for a loop given its spectrum
    and the original spectrum.
    
    The original spectrum is used as the basis, and the reconstructed
    loop spectrum is used to mask the spectrum.

    Parameters
    ----------
    loop_spectrum : np.ndarray [shape=(n_freq_bins, n_time_bins_1), dtype=float]
        Reconstructed loop spectrum (real)
    original_spectrum : np.ndarray [shape=(n_freq_bins, n_time_bins_2), dtype=complex]
        Original spectrum (complex; possibly different length of time)

    Returns
    -------
    signal : np.ndarray [shape=(n,), dtype=float]
        Estimated signal of isolated loop.

    Examples
    --------
    >>> np.random.seed(0)
    >>> random_matrix = np.random.randn(1025,130)
    >>> loop_spectrum = np.abs(random_matrix) / np.max(random_matrix)
    >>> random_matrix_2 = np.random.randn(1025,130)
    >>> loop_spectrum_2 = np.abs(random_matrix_2) / np.max(random_matrix_2)
    >>> get_loop_signal(loop_spectrum, loop_spectrum_2)
    array([-5.7243928e-04, -2.3625907e-04, -3.8087784e-04, ...,
            9.2569360e-05,  3.9195133e-04, -2.4777438e-04], dtype=float32)
        
    See also
    --------
    librosa.util.softmask
    """
    assert loop_spectrum.shape[0] == original_spectrum.shape[0]
    min_length = np.min((loop_spectrum.shape[1], original_spectrum.shape[1]))
    orig_mag, orig_phase = librosa.magphase(original_spectrum)
    mask = librosa.util.softmask(loop_spectrum[:,:min_length], orig_mag[:,:min_length], power=1)
    masked_spectrum = original_spectrum[:,:min_length] * mask
    signal = librosa.core.istft(masked_spectrum)
    return signal

In [22]:
run_algorithm("mix1.wav", n_templates=[0,0,0], output_savename="extracted_loop")

reconstruction error=0.6179850611230522, variation=0.005703809007929328.
reconstruction error=0.610230558500448, variation=0.007754502622604131.
reconstruction error=0.6000840642037207, variation=0.010146494296727315.
reconstruction error=0.587761143866124, variation=0.012322920337596743.
reconstruction error=0.5741594468162394, variation=0.013601697049884565.
reconstruction error=0.5603048675313641, variation=0.013854579284875324.
reconstruction error=0.5464938662356071, variation=0.013811001295756964.
reconstruction error=0.5324755203830667, variation=0.01401834585254047.
reconstruction error=0.5188389772704957, variation=0.013636543112571009.
reconstruction error=0.5069801633450972, variation=0.011858813925398448.
reconstruction error=0.49766928384637515, variation=0.009310879498722058.
reconstruction error=0.4907136122031824, variation=0.006955671643192773.
reconstruction error=0.48546888638326435, variation=0.005244725819918028.
reconstruction error=0.4813199449925478, variation=0

reconstruction error=0.36588526852545283, variation=0.0003156719310917211.
reconstruction error=0.3655728244605631, variation=0.00031244406488972754.
reconstruction error=0.3652636002515636, variation=0.0003092242089994812.
reconstruction error=0.3649575847231432, variation=0.00030601552842041135.
reconstruction error=0.3646547597496466, variation=0.000302824973496596.
reconstruction error=0.36435509798825305, variation=0.00029966176139356593.
reconstruction error=0.3640585621264168, variation=0.0002965358618362557.
reconstruction error=0.36376510535944073, variation=0.0002934567669760635.
reconstruction error=0.36347467267906763, variation=0.00029043268037309833.
reconstruction error=0.3631872025571468, variation=0.00028747012192081867.
reconstruction error=0.36290262870150275, variation=0.0002845738556440658.
reconstruction error=0.362620881684184, variation=0.0002817470173187564.
reconstruction error=0.3623418903509574, variation=0.0002789913332266192.
reconstruction error=0.3620655

reconstruction error=0.3440068165589094, variation=0.00011409512019050494.
reconstruction error=0.3438940290817652, variation=0.00011278747714421611.
reconstruction error=0.34378251076956495, variation=0.00011151831220024278.
reconstruction error=0.34367222449978446, variation=0.00011028626978049072.
reconstruction error=0.3435631346556499, variation=0.0001090898441345467.
reconstruction error=0.34345520716992384, variation=0.00010792748572607813.
reconstruction error=0.3433484094964471, variation=0.00010679767347671065.
reconstruction error=0.34324271053618405, variation=0.00010569896026307868.
reconstruction error=0.34313808053769546, variation=0.00010462999848859234.
reconstruction error=0.34303449098622096, variation=0.00010358955147449223.
reconstruction error=0.342931914491034, variation=0.00010257649518696788.
reconstruction error=0.34283032467741587, variation=0.00010158981361813035.
reconstruction error=0.34272969608697995, variation=0.00010062859043591388.
reconstruction erro

# Prototype

In [1]:
import pygame

pygame 2.0.1 (SDL 2.0.14, Python 3.8.5)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [42]:
pygame.mixer.init()
sound = pygame.mixer.Sound("extracted_loop_1.wav")
sound2 = pygame.mixer.Sound("extracted_loop_12.wav")
sound3 = pygame.mixer.Sound("extracted_loop_15.wav")
sound.play(-1)
sound2.play(-1)
sound3.play(-1)

<Channel at 0x20d0a430310>

In [43]:
pygame.quit()

In [2]:
# import pygame
# import pygame
# pygame.init()
# pygame.display.set_mode(pygame.display.list_modes()[-1]) # smallest resolution available
# pygame.mixer.init()
# pygame.mixer.music.load("sample1.wav")
# pygame.mixer.music.play(5) # repeat 5 times
# pygame.mixer.music.queue("sample2.wav")   # queue test2.wav after test.wav plays 5 times
# clock = pygame.time.Clock()
# clock.tick(10)
# while pygame.mixer.music.get_busy():
#     pygame.event.poll()
#     clock.tick(10)


pygame 2.0.1 (SDL 2.0.14, Python 3.8.5)
Hello from the pygame community. https://www.pygame.org/contribute.html
