In [108]:
# Model parameters (main)
# -----------------------

audio_filename = 'Monk2_master.mp3'
penalty = -2
thumbnail_duration_sec = 30
thumbnail_start_sec = 5

# Model parameters (fine-tuning)
# ------------------------------

# Tempo parameters for path smoothing
tempo_num = 5
tempo_rel_min = 0.66
tempo_rel_max = 1.5

# Self-similarity matrix computation parameters
# ("relative" threshold strategy is applied; see https://www.audiolabs-erlangen.de/resources/MIR/FMP/C4/C4S2_SSM-Thresholding.html)
threshold = 0.15
smoothing_filter_length = 21
smoothing_filter_downsampling_factor = 5
smoothing_length = 12

In [109]:
import numpy as np
import os, math
from numba import jit
import libfmp.c4
import json
import pickle
import argparse

In [110]:
# Ignore warnings

import warnings
warnings.filterwarnings('ignore')

In [111]:
# Display all results of each cell (not just the last produced)

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [112]:
def normalization_properties_ssm(ssm):
    """Normalizes self-similarity matrix to fulfill S(n,n)=1
    Yields a warning if max(S)<=1 is not fulfilled

    Args:
        ssm (np.ndarray): Self-similarity matrix (SSM)

    Returns:
        ssm_normalized (np.ndarray): Normalized self-similarity matrix
    """
    ssm_normalized = ssm.copy()
    size = ssm_normalized.shape[0]
    for n in range(size):
        ssm_normalized[n, n] = 1
        max_s = np.max(ssm_normalized)
    if max_s > 1:
        print('Normalization condition for SSM not fulfill (max > 1)')
    return ssm_normalized

In [113]:
def compute_induced_segment_family_coverage(path_family):
    """ Induces a segment family and computes its absolute coverage given a path family

    Args:
        path_family (list): Path family

    Returns:
        segment_family (np.ndarray): Induced segment family
        coverage (float): Absolute coverage of path family
    """
    num_path = len(path_family)
    coverage = 0
    if num_path > 0:
        segment_family = np.zeros((num_path, 2), dtype=int)
        for n in range(num_path):
            segment_family[n, 0] = path_family[n][0][0]
            segment_family[n, 1] = path_family[n][-1][0]
            coverage = coverage + segment_family[n, 1] - segment_family[n, 0] + 1
    else:
        segment_family = np.empty

    return segment_family, coverage

In [114]:
@jit(nopython=True)
def compute_accumulated_score_matrix(S_seg):
    """Computes the accumulated score matrix for a self-similarity sub-matrix

    Args:
        S_seg (np.ndarray): Sub-matrix of an enhanced and normalized SSM ``S``
            Note: ``S`` must satisfy ``S(n,m) <= 1 and S(n,n) = 1``, where ``m`` is the duration of a potential thumbnail

    Returns:
        D (np.ndarray): Accumulated score matrix
        score (float): Score of optimal path family
    """
    inf = math.inf
    N = S_seg.shape[0]
    M = S_seg.shape[1]+1

    # Initializing score matrix
    D = -inf * np.ones((N, M), dtype=np.float64)
    D[0, 0] = 0.
    D[0, 1] = D[0, 0] + S_seg[0, 0]

    # Dynamic programming
    for n in range(1, N):
        D[n, 0] = max(D[n-1, 0], D[n-1, -1])
        D[n, 1] = D[n, 0] + S_seg[n, 0]
        for m in range(2, M):
            D[n, m] = S_seg[n, m-1] + max(D[n-1, m-1], D[n-1, m-2], D[n-2, m-1])

    # Score of optimal path family
    score = np.maximum(D[N-1, 0], D[N-1, M-1])

    return D, score

In [115]:
@jit(nopython=True)
def compute_optimal_path_family(D):
    """Compute an optimal path family given an accumulated score matrix

    Args:
        D (np.ndarray): Accumulated score matrix

    Returns:
        path_family (list): Optimal path family consisting of list of paths
            (each path being a list of index pairs)
    """
    # Initialization
    inf = math.inf
    N = int(D.shape[0])
    M = int(D.shape[1])

    path_family = []
    path = []

    n = N - 1
    if D[n, M-1] < D[n, 0]:
        m = 0
    else:
        m = M-1
        path_point = (N-1, M-2)
        path.append(path_point)

    # Backtracking
    while n > 0 or m > 0:

        # obtaining the set of possible predecessors given our current position
        if n <= 2 and m <= 2:
            predecessors = [(n-1, m-1)]
        elif n <= 2 and m > 2:
            predecessors = [(n-1, m-1), (n-1, m-2)]
        elif n > 2 and m <= 2:
            predecessors = [(n-1, m-1), (n-2, m-1)]
        else:
            predecessors = [(n-1, m-1), (n-2, m-1), (n-1, m-2)]

        # case for the first row. Only horizontal movements allowed
        if n == 0:
            cell = (0, m-1)
        # case for the elevator column: we can keep going down the column or jumping to the end of the next row
        elif m == 0:
            if D[n-1, M-1] > D[n-1, 0]:
                cell = (n-1, M-1)
                path_point = (n-1, M-2)
                if len(path) > 0:
                    path.reverse()
                    path_family.append(path)
                path = [path_point]
            else:
                cell = (n-1, 0)
        # case for m=1, only horizontal steps to the elevator column are allowed
        elif m == 1:
            cell = (n, 0)
        # regular case
        else:

            # obtaining the best of the possible predecessors
            max_val = -inf
            for i, cur_predecessor in enumerate(predecessors):
                if max_val < D[cur_predecessor[0], cur_predecessor[1]]:
                    max_val = D[cur_predecessor[0], cur_predecessor[1]]
                    cell = cur_predecessor

            # saving the point in the current path
            path_point = (cell[0], cell[1]-1)
            path.append(path_point)

        (n, m) = cell

    # adding last path to the path family
    path.reverse()
    path_family.append(path)
    path_family.reverse()

    return path_family

In [116]:
def compute_fitness(path_family, score, N):
    """Compute the fitness measure and other metrics of a path family

    Args:
        path_family (list): Path family
        score (float): Score
        N (int): Length of feature sequence

    Returns:
        fitness (float): Fitness
        score (float): Score
        score_n (float): Normalized score
        coverage (float): Coverage
        coverage_n (float): Normalized coverage
        path_family_length (int): Length of path family (total number of cells)
    """
    eps = 1e-16
    num_path = len(path_family)
    M = path_family[0][-1][1] + 1

    # Normalized score
    path_family_length = 0
    for n in range(num_path):
        path_family_length = path_family_length + len(path_family[n])
    score_n = (score - M) / (path_family_length + eps)

    # Normalized coverage
    segment_family, coverage = compute_induced_segment_family_coverage(path_family)
    coverage_n = (coverage - M) / (N + eps)

    # Fitness measure
    fitness = 2 * score_n * coverage_n / (score_n + coverage_n + eps)

    return fitness, score, score_n, coverage, coverage_n, path_family_length

In [117]:
# Self-similarity matrix computation from audio

def initialize(audio_fn):
    """Load an audio file and compute a normalized self-similarity matrix

    Args:
        audio_fn (string): Audio file name

    Returns:
        audio: Audio data
        audio_duration: Audio duration (in sec)
        ssm: Normalized self-similarity matrix of input audio
        fs_feature: Feature rate
    """

    tempo_rel_set = libfmp.c4.compute_tempo_rel_set(tempo_rel_min, tempo_rel_max, tempo_num)

    audio, audio_duration, _, fs_feature, ssm, I = libfmp.c4.compute_sm_from_filename\
        (audio_fn, L=smoothing_filter_length, H=smoothing_filter_downsampling_factor, L_smooth=smoothing_length, tempo_rel_set=tempo_rel_set, penalty=penalty, thresh=threshold)

    ssm = normalization_properties_ssm(ssm)

    return audio, audio_duration, ssm, fs_feature

In [118]:
# parser = argparse.ArgumentParser(description='Audio file thumbnailing.')
# parser.add_argument('files', metavar='file', type=str, nargs='+', help='A file to identify an optimal thumbnail in.')
# args, unknown = parser.parse_known_args(['-h', 'file1', 'file2', 'file3'])

x, x_duration, s, Fs_feature = initialize(audio_filename)

In [119]:
seg_sec = [thumbnail_start_sec, thumbnail_start_sec+thumbnail_duration_sec]
seg = [int(seg_sec[0]*Fs_feature), int(seg_sec[1]*Fs_feature)]
s_seg = s[:,seg[0]:seg[1]+1]
d, score = compute_accumulated_score_matrix(s_seg)
path_family = compute_optimal_path_family(d)

In [121]:
n = s.shape[0]

segment_family, coverage = compute_induced_segment_family_coverage(path_family)
fitness, score, score_n, coverage, coverage_n, path_family_length = compute_fitness(
    path_family, score, n)

thumbnail = {
    "filename" : audio_filename,
    "thumbnail": {
        "nominal_duration_seconds": thumbnail_duration_sec,
        "instances_seconds": json.dumps((segment_family / Fs_feature).tolist()),
        "instances_count": len(path_family),
        # "instances_total_length": '%d'%path_family_length,
        "score": '%0.3f'%score,
        "coverage_seconds": coverage / Fs_feature,
        "normalized_score": '%0.3f'%score_n,
        "normalized_coverage": '%0.3f'%coverage_n,

        "fitness": '%0.3f'%fitness
    },
    "context": {
        "audio_duration_seconds": '{:.2f}'.format(x_duration),
        "feature_rate_hz" : Fs_feature,
        "ssm_dimensions": {
            "x": s.shape[0],
            "y": s.shape[1]
        }
    }
}

print(json.dumps(thumbnail, indent=2))

{
  "filename": "Monk2_master.mp3",
  "thumbnail": {
    "nominal_duration_seconds": 30,
    "instances_seconds": "[[5.0, 35.0], [140.0, 175.0]]",
    "instances_count": 2,
    "score": "71.031",
    "coverage_seconds": 66.0,
    "normalized_score": "0.086",
    "normalized_coverage": "0.196",
    "fitness": "0.119"
  },
  "context": {
    "audio_duration_seconds": "181.16",
    "feature_rate_hz": 2.0,
    "ssm_dimensions": {
      "x": 363,
      "y": 363
    }
  }
}
