## Setup {.smaller}

- Download the code at [github.com/deepcharles/music-segmentation](https://github.com/deepcharles/music-segmentation).

![QR code](qr-code.png)

- Install the librairies in the `requirements.txt`.

In [None]:
from typing import List

import librosa
import librosa.display
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import ruptures as rpt  # our package
from IPython.display import Audio, display
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer

SAMPLING_RATE = 22050  # Hz
HOP_LENGTH_TEMPO = 256

In [None]:
def fig_ax(figsize=(15, 5), dpi=150):
    """Return a (matplotlib) figure and ax objects with given size."""
    return plt.subplots(figsize=figsize, dpi=dpi)

### Load the data {.smaller .scrollable}

- A number of music files are available in Librosa.
- Here, we choose the Dance of the _Sugar Plum Fairy_ from _The Nutcracker_ by Tchaikovsky.

Listen to the music as well as display the sound envelope.



In [None]:
#| echo: true

duration = 30  # in seconds
signal, _ = librosa.load(librosa.ex("nutcracker"), duration=duration)

# listen to the music
display(Audio(data=signal, rate=SAMPLING_RATE))

# look at the envelope
fig, ax = fig_ax()
ax.plot(np.arange(signal.size) / SAMPLING_RATE, signal)
ax.set_xlim(0, signal.size / SAMPLING_RATE)
ax.set_xlabel("Time (s)")
_ = ax.set(title="Sound envelope")

## Signal transformation

- Transform the signal into a tempogram

- The tempogram measures the tempo (measured in Beats Per Minute, BPM) profile along the time axis.


In [None]:
#| echo: true

def get_tempogram(signal: np.ndarray,
                  hop_length_tempo: int=HOP_LENGTH_TEMPO)->np.ndarray:
    """Return the tempogram of a sound signal."""
    # Compute the onset strength
    oenv = librosa.onset.onset_strength(
        y=signal, sr=SAMPLING_RATE, hop_length=hop_length_tempo
    )
    # Compute the tempogram
    tempogram = librosa.feature.tempogram(
        onset_envelope=oenv,
        sr=SAMPLING_RATE,
        hop_length=hop_length_tempo,
    )
    return tempogram.T

tempogram = get_tempogram(signal=signal, hop_length_tempo=256)

# Display the tempogram
fig, ax = fig_ax()
_ = librosa.display.specshow(
    tempogram.T,
    ax=ax,
    hop_length=HOP_LENGTH_TEMPO,
    sr=SAMPLING_RATE,
    x_axis="s",
    y_axis="tempo",
)

## Detection algorithm

- Search method: dynamic programming (exact).
- We choose to detect changes in the mean of the tempogram, which is a multivariate signal. Associated cost function: $$c(y_{a..b}) = \sum_{t=a}^{b-1} \|y_t - \bar{y}_{a..b} \|^2$$
- What about the number of change-points?

## Number of change-points {.smaller .scrollable}

Observe the sum of costs w.r.t. the number of change-points

In [None]:
#| echo: true

# Choose detection method
algo = rpt.KernelCPD(kernel="linear").fit(tempogram)

# Choose the number of changes (elbow heuristic)
n_bkps_max = 20  # K_max
# Start by computing the segmentation with most changes.
# After start, all segmentations with 1, 2,..., K_max-1 changes are also available for free.
_ = algo.predict(n_bkps_max)

array_of_n_bkps = np.arange(1, n_bkps_max + 1)

def get_bkps(signal: np.ndarray, n_bkps: int)->List[int]:
    algo = rpt.KernelCPD(kernel="linear").fit(signal)
    bkps = algo.predict(n_bkps=n_bkps)
    return bkps

def get_sum_of_cost(algo, n_bkps) -> float:
    """Return the sum of costs for the change points `bkps`"""
    bkps = algo.predict(n_bkps=n_bkps)
    return algo.cost.sum_of_costs(bkps)


fig, ax = fig_ax((7, 4))
ax.plot(
    array_of_n_bkps,
    [get_sum_of_cost(algo=algo, n_bkps=n_bkps) for n_bkps in array_of_n_bkps],
    "-*",
    alpha=0.5,
)
ax.set_xticks(array_of_n_bkps)
ax.set_xlabel("Number of change points")
ax.set_title("Sum of costs")
ax.grid(axis="x")
ax.set_xlim(0, n_bkps_max + 1)

# Visually we choose n_bkps=5 (highlighted in red on the elbow plot)
n_bkps = 5
_ = ax.scatter([5], [get_sum_of_cost(algo=algo, n_bkps=5)], color="r", s=100)


**Visually, we choose 5 change points** (highlighted in red on the elbow plot).

## Combining everything together {.smaller}

Our pipeline has two blocks:

- signal transformation,
- change-point detection.

We use the scikit-learn `Pipeline` pattern. We can use the rich ecosystem to calibrate our algorithms.

In [None]:
#| echo: true

class TempogramTransformer(TransformerMixin, BaseEstimator):
    def __init__(self, hop_length_tempo: int=HOP_LENGTH_TEMPO):
        self.hop_length_tempo = hop_length_tempo

    def fit(self, X=None, y=None, **kwargs):
        """Nothing happens in the .fit()"""
        return self

    def transform(self, X: List[npt.NDArray]) -> List[npt.NDArray]:
        out = list()
        for signal in X:
            tempogram = get_tempogram(
                signal=signal,
                hop_length_tempo=self.hop_length_tempo)
            out.append(tempogram)
        return out


class ChangePointDetector(BaseEstimator):
    def __init__(self, n_bkps: int = 2):
        self.n_bkps = n_bkps

    def fit(self, X=None, y=None, **kwargs):
        """Nothing happens in the .fit()"""
        return self

    def predict(self, X: List[npt.NDArray], **kwargs) -> List[List[int]]:
        """Return a list of segmentations.

        A segmentation itself is a list of breakpoint indexes.
        """
        out = list()
        for signal in X:
            bkps = get_bkps(signal=signal, n_bkps=self.n_bkps)
            out.append(bkps)
        return out
    
steps = [("tempogram_transformer", TempogramTransformer()),
         ("change_detector", ChangePointDetector(n_bkps=5))]
pipeline = Pipeline(steps=steps)
pipeline

## Detecting change-points

- We estimate changes with the defined pipeline.

In [None]:
#| echo: true

# Segmentation
bkps, = pipeline.predict([signal])
# Convert the estimated change points (frame counts) to actual timestamps
bkps_times = librosa.frames_to_time(
    bkps, sr=SAMPLING_RATE, hop_length=pipeline["tempogram_transformer"].hop_length_tempo)

# Displaying results
fig, ax = fig_ax()
_ = librosa.display.specshow(
    tempogram.T,
    ax=ax,
    x_axis="s",
    y_axis="tempo",
    hop_length=pipeline["tempogram_transformer"].hop_length_tempo,
    sr=SAMPLING_RATE,
)

for b in bkps_times[:-1]:
    ax.axvline(b, ls="--", color="white", lw=4)

Visually, the estimated change points indeed separate portions of signal with a relatively constant tempo profile.

## Listening to the change-points

In [None]:
# Compute change points corresponding indexes in original signal
bkps_time_indexes = (SAMPLING_RATE * bkps_times).astype(int).tolist()

for (segment_number, (start, end)) in enumerate(
    rpt.utils.pairwise([0] + bkps_time_indexes), start=1
):
    segment = signal[start:end]
    print(f"Segment n°{segment_number} (duration: {segment.size/SAMPLING_RATE:.2f} s)")
    display(Audio(data=segment, rate=SAMPLING_RATE))


## Some remarks on how to create and maintain Python librairies {.smaller}

:::: {.columns}

::: {.column width="50%"}

- Having a [documentation](https://centre-borelli.github.io/ruptures-docs/).
    - generated from the code comments,
    - add examples
- Clean code:
    - use code formatter ([black](https://black.readthedocs.io/en/stable/)),
    - testing ([pytest](https://docs.pytest.org/en/7.2.x/)) and code coverage ([Codecov](https://about.codecov.io/)),
    - lots of comments

:::

::: {.column width="50%"}

- Automatize everything with GitHub Actions.
    - packaging and release,
    - issue/PR management and changelog.

More info [scikit-hep](https://scikit-hep.org/).

:::

::::


    