In [17]:
import numpy as np

# Generating a random E matrix to simulate embedded features
np.random.seed(0)  # For reproducibility
E = np.random.rand(5000, 10)  # 500 data points with 10 dimensions

E.shape


(5000, 10)

In [3]:
import librosa

def compute_recurrence_matrix_librosa(E, k=0.04):
    """Compute the recurrence matrix using the original method."""
    N = E.shape[0]
    k_val = int(k * N)
    R = librosa.segment.recurrence_matrix(E.T, k=k_val, width=1, metric="euclidean", sym=True).astype(np.float32)
    return R

# Let's test the original method to ensure it works correctly
R_Librosa = compute_recurrence_matrix_librosa(E)
R_Librosa.shape


(5000, 5000)

In [4]:
from annoy import AnnoyIndex

def compute_recurrence_matrix_annoy(E, k=0.04):
    """Compute the recurrence matrix using Annoy for approximate nearest neighbors."""
    N, D = E.shape
    k_val = int(k * N)
    R = np.zeros((N, N), dtype=np.float32)

    # Build the Annoy index
    t = AnnoyIndex(D, metric='euclidean')
    for i in range(N):
        t.add_item(i, E[i])
    t.build(10)  # 10 trees for the index

    # Find the k nearest neighbors for each point
    for i in range(N):
        neighbors = t.get_nns_by_item(i, k_val)
        R[i, neighbors] = 1

    return R

# Let's test the Annoy method to ensure it works correctly
R_annoy = compute_recurrence_matrix_annoy(E)
R_annoy.shape


(5000, 5000)

In [8]:
from joblib import Parallel, delayed
from scipy.spatial import distance

def pairwise_distance_parallel(i, E):
    """
    Compute pairwise distance for a given row index i.
    """
    return distance.cdist([E[i]], E, 'euclidean').flatten()

def parallel_recurrence_matrix(E, k):
    """
    Compute the recurrence matrix using parallel processing.
    """
    # Compute pairwise distances in parallel
    pairwise_distances = np.array(Parallel(n_jobs=-1)(delayed(pairwise_distance_parallel)(i, E) for i in range(E.shape[0])))
    
    # Create the recurrence matrix based on k-nearest neighbors
    sort_idx = np.argsort(pairwise_distances, axis=1)
    nearest_idx = sort_idx[:, :int(k*E.shape[0])]
    
    R_parallel = np.zeros(pairwise_distances.shape, dtype=np.float32)
    for i in range(E.shape[0]):
        R_parallel[i, nearest_idx[i]] = 1

    return R_parallel

# Let's test the parallel method to ensure it works correctly
R_parallel = parallel_recurrence_matrix(E, k=0.04)
R_parallel.shape

# Run the approximate assertions
assert np.allclose(R_Librosa, R_parallel), "Librosa and parallel methods do not match!"


(5000, 5000)

In [9]:
from scipy.spatial import distance
def vectorized_recurrence_matrix(E, k):
    """
    Compute the recurrence matrix using a vectorized approach.
    """
    # Compute pairwise distances
    pairwise_distances = distance.cdist(E, E, 'euclidean')
    
    # Create the recurrence matrix based on k-nearest neighbors
    sort_idx = np.argsort(pairwise_distances, axis=1)
    nearest_idx = sort_idx[:, :int(k*E.shape[0])]
    
    R_vectorized = np.zeros(pairwise_distances.shape, dtype=np.float32)
    for i in range(E.shape[0]):
        R_vectorized[i, nearest_idx[i]] = 1

    return R_vectorized

# Let's test the vectorized method to ensure it works correctly
R_vectorized = vectorized_recurrence_matrix(E, k=0.04)
R_vectorized.shape



(5000, 5000)

In [10]:
def compute_recurrence_matrix(E, k=0.04):
    """Compute the recurrence matrix using pairwise distance computations."""
    N = E.shape[0]
    k_val = int(k * N)
    R = np.zeros((N, N), dtype=np.float32)
    
    for i in range(N):
        # Compute pairwise distances to all other points
        distances = np.linalg.norm(E[i] - E, axis=1)
        
        # Find the k smallest distances
        k_smallest_indices = np.argpartition(distances, k_val)[:k_val]
        
        # Set the recurrence matrix values for these indices
        R[i, k_smallest_indices] = 1
    
    return R

# Let's test the pairwise distance method to ensure it works correctly
R_pairwise = compute_recurrence_matrix(E)
R_pairwise.shape


(5000, 5000)

In [16]:
import timeit

# Define the number of repetitions for benchmarking
repetitions = 3

# Benchmark the original method
time_librosa = timeit.timeit(lambda: compute_recurrence_matrix_librosa(E), number=repetitions)

# Benchmark the Annoy method
time_annoy = timeit.timeit(lambda: compute_recurrence_matrix_annoy(E), number=repetitions)

# Benchmark the numpy-optimized method

time_numpy_optimized = timeit.timeit(lambda: compute_recurrence_matrix(E), number=repetitions)

# Benchmark the parallel method
time_parallel = timeit.timeit(lambda: parallel_recurrence_matrix(E, k=0.04), number=repetitions)

# Benchmark the vectorized method
time_vectorized = timeit.timeit(lambda: vectorized_recurrence_matrix(E, k=0.04), number=repetitions)


print("Librosa method: {:.2f} seconds".format(time_librosa))
print("Annoy method: {:.2f} seconds".format(time_annoy))
print("Numpy method: {:.2f} seconds".format(time_numpy_optimized))
print("Parallel method: {:.2f} seconds".format(time_parallel))
print("Vectorized method: {:.2f} seconds".format(time_vectorized))


Librosa method: 20.10 seconds
Annoy method: 9.23 seconds
Numpy method: 8.54 seconds
Parallel method: 19.16 seconds
Vectorized method: 15.02 seconds


In [18]:
repetitions = 2
E = np.random.rand(10000, 10)  # 500 data points with 10 dimensions

# Benchmark the Annoy method
time_annoy = timeit.timeit(lambda: compute_recurrence_matrix_annoy(E), number=repetitions)

# Benchmark the numpy-optimized method
time_numpy_optimized = timeit.timeit(lambda: compute_recurrence_matrix(E), number=repetitions)

# Benchmark the vectorized method
time_vectorized = timeit.timeit(lambda: vectorized_recurrence_matrix(E, k=0.04), number=repetitions)

print("Annoy method: {:.2f} seconds".format(time_annoy))
print("Numpy method: {:.2f} seconds".format(time_numpy_optimized))
print("Vectorized method: {:.2f} seconds".format(time_vectorized))


Librosa method: 20.10 seconds
Annoy method: 7.53 seconds
Numpy method: 7.02 seconds
Parallel method: 19.16 seconds
Vectorized method: 14.18 seconds


# Novelty Function

In [26]:
def compute_novelty_curve_optimized(X):
    """
    Computes the novelty curve from the structural features using vectorized operations.

    Args:
    - X (np.array): Structural features.

    Returns:
    - nc (np.array): Novelty curve.
    """
    # Compute Euclidean distance between consecutive feature vectors
    diffs = np.diff(X, axis=0)
    nc = np.linalg.norm(diffs, axis=1)

    # Append a zero at the end to match the size of the original function's output
    nc = np.append(nc, 0)

    # Normalize the novelty curve to a range [0, 1]
    if nc.max() != 0:
        nc -= nc.min()
        nc /= nc.max()
    
    return nc


In [27]:
def compute_novelty_curve(X, M=8, axis=0):
    """
    Computes the novelty curve from the structural features.

    Args:
    - X (np.array): Structural features.
    - M (int, optional): Size of Gaussian kernel. Default is 8.
    - axis (int, optional): Axis along which to compute the difference. Default is 0.

    Returns:
    - nc (np.array): Novelty curve.

    Examples:
    --------
        X = np.array([[1, 2], [3, 4], [5, 6]])
        compute_novelty_curve(X)
    array([1., 1., 0.])
    """
        # Determine the size of the feature matrix
    N = X.shape[0]

    # Initialize the novelty curve with zeros
    nc = np.zeros(N)

    # Compute Euclidean distance between consecutive feature vectors to capture novelty
    for i in range(N - 1):
        nc[i] = distance.euclidean(X[i, :], X[i + 1, :])

    # Normalize the novelty curve to a range [0, 1]
    nc += np.abs(nc.min())
    nc /= float(nc.max())
    
    return nc

In [30]:
# Let's test the vectorized method to ensure it works correctly
X = np.array([[1, 2], [3, 4], [5, 6]])
nc_vectorized = compute_novelty_curve_optimized(X)
nc_vectorized

# Let's test the original method to ensure it works correctly
nc_original = compute_novelty_curve(X)

# Run the approximate assertions
assert np.allclose(nc_original, nc_vectorized), "Original and vectorized methods do not match!"



In [34]:
# Benchmark the original method with a large feature matrix 
repetitions = 2
X = np.random.rand(100000, 10)  # 500 data points with 10 dimensions

time_original = timeit.timeit(lambda: compute_novelty_curve(X), number=repetitions)

# Benchmark the vectorized method with a large feature matrix
time_vectorized = timeit.timeit(lambda: compute_novelty_curve_optimized(X), number=repetitions)

print("Original method: {:.2f} seconds".format(time_original))
print("Vectorized method: {:.2f} seconds".format(time_vectorized))


Original method: 1.41 seconds
Vectorized method: 0.03 seconds


# Cumculative Sum

In [36]:
from numba import jit
def cummulative_sum_Q(R):
    len_x, len_y = R.shape
    Q = np.zeros((len_x + 2, len_y + 2))
    for i in range(len_x):
        for j in range(len_y):
            Q[i+2, j+2] = max(
                    Q[i+1, j+1],
                    Q[i, j+1],
                    Q[i+1, j]) + R[i, j]
    return np.max(Q)


@jit(nopython=True)
def cummulative_sum_Q_numba(R):
    len_x, len_y = R.shape
    Q = np.zeros((len_x + 2, len_y + 2))
    for i in range(len_x):
        for j in range(len_y):
            Q[i+2, j+2] = max(
                    Q[i+1, j+1],
                    Q[i, j+1],
                    Q[i+1, j]) + R[i, j]
    return np.max(Q)

# Generate a random matrix to test and benchmark the functions
R_test = np.random.rand(100, 100)

# Benchmarking the original function
%timeit cummulative_sum_Q(R_test)

# Benchmarking the optimized function
%timeit cummulative_sum_Q_numba(R_test)


7.5 ms ± 42.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
12.7 µs ± 94.8 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)


# Normalization

In [56]:


def min_max_normalize(X, floor=0.0):
    """Min-max normalization."""
    X_min = X.min(axis=1, keepdims=True)
    X_max = X.max(axis=1, keepdims=True)
    norm_X = floor + (X - X_min) / (X_max - X_min)
    return norm_X


def lognormalize(X, floor=0.0, min_db=-80):
    """
    Optimized version of the logarithmic scaling function for the feature matrix.

    Parameters:
        X (np.array): Feature matrix where each row represents a feature vector.
        floor (float, optional): Value to replace zeros in the matrix. Default is 0.0.
        min_db (float, optional): Minimum decibel level for scaling. Default is -80.

    Returns:
        np.array: Logarithmically scaled feature matrix.
    """
    # Use numpy's where function to replace zeros with the floor value
    X = np.where(X == 0, floor, X)

    # Convert to dB scale using numpy's built-in functions
    X_db = 10 * np.log10(np.abs(X))

    # Clip to the specified minimum dB value using numpy's maximum function
    X_db = np.maximum(X_db, X_db.max() + min_db)

    return X_db


def normalize(X, norm_type, floor=0.0, min_db=-80):
    """Normalizes the given matrix of features."""
    if isinstance(norm_type, str):
        if norm_type == "min_max":
            return min_max_normalize(X, floor=floor)
        if norm_type == "log":
            return lognormalize(X, floor=floor, min_db=min_db)
    return librosa.util.normalize(X, norm=norm_type, axis=1)



@jit(nopython=True)
def min_max_normalize_numba(X, floor=0.0):
    """Numba-optimized min-max normalization."""
    X_min = np.zeros(X.shape[0])
    X_max = np.zeros(X.shape[0])
    
    for i in range(X.shape[0]):
        X_min[i] = X[i, :].min()
        X_max[i] = X[i, :].max()
    
    norm_X = floor + (X - X_min.reshape(-1, 1)) / (X_max.reshape(-1, 1) - X_min.reshape(-1, 1))
    return norm_X

def normalize_optimized(X, norm_type, floor=0.0, min_db=-80):
    """Optimized normalization of the given matrix of features."""
    if isinstance(norm_type, str):
        if norm_type == "min_max":
            return min_max_normalize_numba(X, floor=floor)
    return librosa.util.normalize(X, norm=norm_type, axis=1)


In [57]:
X_test = np.random.rand(100, 100) + 1e-6  # Adding a small value to prevent log(0)

# Test the original function
X_test_original = normalize(X_test, norm_type="min_max")

# Test the optimized function
X_test_optimized = normalize_optimized(X_test, norm_type="min_max")

# Run the approximate assertions
assert np.allclose(X_test_original, X_test_optimized), "Original and optimized methods do not match!"



In [59]:

X_test = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]], dtype=float)

# test the original function
X_test_original = normalize(X_test, norm_type="log")


In [39]:

# Benchmark the original function
%timeit normalize(X_test, norm_type="min_max")

# Benchmark the optimized function
%timeit normalize_optimized(X_test, norm_type="min_max")

9.28 µs ± 25.1 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
1.5 µs ± 3.81 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)


In [47]:

# Benchmark the original function
%timeit normalize(X_test, norm_type=None)


17.9 µs ± 115 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)


In [None]:

# Benchmark the original function
%timeit normalize(X_test, norm_type="log")

# Benchmark the optimized function
%timeit normalize_optimized(X_test, norm_type="log")

# Shift matrix circularly

In [48]:
def shift_matrix_circularly(X):
    """
    Shifts the matrix X circularly to get a time-lag matrix.

    Args:
    - X (np.array): Square matrix.

    Returns:
    - L (np.array): Time-lag matrix.
    """
    N = X.shape[0]
    L = np.zeros(X.shape)
    for i in range(N):
        L[i, :] = np.asarray([X[(i + j) % N, j] for j in range(N)])
    return L


@jit(nopython=True)
def shift_matrix_circularly_numba(X):
    """Numba-optimized calculation of circular shift to get a time-lag matrix."""
    N = X.shape[0]
    L = np.zeros(X.shape)
    for i in range(N):
        L[i, :] = np.asarray([X[(i + j) % N, j] for j in range(N)])
    return L


In [49]:
# Generate a random matrix to test and benchmark the functions
X_test = np.random.rand(100, 100)

# Test the original function
X_test_original = shift_matrix_circularly(X_test)

# Test the optimized function
X_test_optimized = shift_matrix_circularly_numba(X_test)

# Run the approximate assertions
assert np.allclose(X_test_original, X_test_optimized), "Original and optimized methods do not match!"
# Benchmarking the original function
%timeit shift_matrix_circularly(X_test)

# Benchmarking the optimized function
%timeit shift_matrix_circularly_numba(X_test)

1.77 ms ± 31.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
163 µs ± 926 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)


# Librosa PCT

In [62]:
import librosa


In [60]:
def audio_extract_pcp(audio, sr, n_fft=4096, hop_len=int(4096 * 0.75),
                      pcp_bins=84, pcp_norm=np.inf, pcp_f_min=27.5,
                      pcp_n_octaves=6):
    """
    Extract Pitch Class Profiles (PCP) from audio.

    Args:
    - audio (np.array): Audio waveform.
    - sr (int): Sample rate of the audio.
    - n_fft (int, optional): FFT size. Default is 4096.
    - hop_len (int, optional): Hop length. Default is 75% of n_fft.
    - pcp_bins (int, optional): Number of bins for PCP. Default is 84.
    - pcp_norm (float, optional): Norm value for PCP. Default is infinity.
    - pcp_f_min (float, optional): Minimum frequency for PCP. Default is 27.5Hz.
    - pcp_n_octaves (int, optional): Number of octaves for PCP. Default is 6.

    Returns:
    - pcp (np.array): Extracted Pitch Class Profiles.
    """

    # Separate harmonic component from audio
    audio_harmonic, _ = librosa.effects.hpss(audio)

    # Compute Constant-Q transform of the harmonic component
    pcp_cqt = np.abs(librosa.hybrid_cqt(audio_harmonic, sr=sr, hop_length=hop_len,
                                        n_bins=pcp_bins, norm=pcp_norm, fmin=pcp_f_min)) ** 2

    # Compute PCP from the CQT
    pcp = librosa.feature.chroma_cqt(C=pcp_cqt, sr=sr, hop_length=hop_len,
                                     n_octaves=pcp_n_octaves, fmin=pcp_f_min).T

    return pcp

In [61]:
def optimized_audio_extract_pcp(audio, sr, n_fft=4096, hop_len=int(4096 * 0.75),
                                pcp_bins=84, pcp_norm=np.inf, pcp_f_min=27.5,
                                pcp_n_octaves=6):
    """
    Extract Pitch Class Profiles (PCP) from audio using optimized methods.

    Args:
    - audio (np.array): Audio waveform.
    - sr (int): Sample rate of the audio.
    - n_fft (int, optional): FFT size. Default is 4096.
    - hop_len (int, optional): Hop length. Default is 75% of n_fft.
    - pcp_bins (int, optional): Number of bins for PCP. Default is 84.
    - pcp_norm (float, optional): Norm value for PCP. Default is infinity.
    - pcp_f_min (float, optional): Minimum frequency for PCP. Default is 27.5Hz.
    - pcp_n_octaves (int, optional): Number of octaves for PCP. Default is 6.

    Returns:
    - pcp (np.array): Extracted Pitch Class Profiles.
    """

    # Separate harmonic component from audio
    audio_harmonic, _ = librosa.effects.hpss(audio)

    # Compute Constant-Q transform of the harmonic component
    pcp_cqt = np.abs(librosa.hybrid_cqt(audio_harmonic, sr=sr, hop_length=hop_len,
                                        n_bins=pcp_bins, norm=pcp_norm, fmin=pcp_f_min)) ** 2

    # Compute PCP from the CQT
    pcp = librosa.feature.chroma_cqt(C=pcp_cqt, sr=sr, hop_length=hop_len,
                                     n_octaves=pcp_n_octaves, fmin=pcp_f_min).T

    return pcp


In [64]:
audio_path = "/Users/nimamanaf/Library/CloudStorage/GoogleDrive-ndizbin14@ku.edu.tr/My Drive/Techno/technob/docs/examples/cse.WAV"
audio_data, sr = librosa.load(audio_path)
    
    

In [70]:
def optimized_audio_extract_pcp(audio, sr, n_fft=4096, hop_len=int(4096 * 0.75),
                                pcp_bins=84, pcp_norm=np.inf, pcp_f_min=27.5,
                                pcp_n_octaves=6):
    """
    Extract Pitch Class Profiles (PCP) from audio with optimizations.

    Args:
    - audio (np.array): Audio waveform.
    - sr (int): Sample rate of the audio.
    - n_fft (int, optional): FFT size. Default is 4096.
    - hop_len (int, optional): Hop length. Default is 75% of n_fft.
    - pcp_bins (int, optional): Number of bins for PCP. Default is 84.
    - pcp_norm (float, optional): Norm value for PCP. Default is infinity.
    - pcp_f_min (float, optional): Minimum frequency for PCP. Default is 27.5Hz.
    - pcp_n_octaves (int, optional): Number of octaves for PCP. Default is 6.

    Returns:
    - pcp (np.array): Extracted Pitch Class Profiles.
    """

    # Separate harmonic component from audio with reduced margin
    audio_harmonic, _ = librosa.effects.hpss(audio, margin=(1, 4))

    # Compute Constant-Q transform of the harmonic component with increased hop length
    pcp_cqt = np.abs(librosa.cqt(audio_harmonic, sr=sr, hop_length=hop_len * 2,
                                 n_bins=pcp_bins, norm=pcp_norm, fmin=pcp_f_min)) ** 2

    # Compute PCP from the CQT
    pcp = librosa.feature.chroma_cqt(C=pcp_cqt, sr=sr, hop_length=hop_len * 2,
                                     n_octaves=pcp_n_octaves, fmin=pcp_f_min).T

    return pcp

In [71]:
repetitions = 1

# Benchmark the original method
time_original = timeit.timeit(lambda: audio_extract_pcp(audio_data, sr), number=repetitions)
# Benchmark the optimized method
time_optimized = timeit.timeit(lambda: optimized_audio_extract_pcp(audio_data, sr), number=repetitions)

print("Original method: {:.2f} seconds".format(time_original))
print("Optimized method: {:.2f} seconds".format(time_optimized))

Original method: 18.75 seconds
Optimized method: 18.15 seconds
