In [1]:
import os
import shutil
import math
import librosa
import matplotlib.pyplot as plt
import numpy as np
import scipy as sp
import soundfile as sf
from sklearn.cluster import AgglomerativeClustering
from sklearn import metrics

# List of the music files used for the mixing
musicFiles = ["musics/G-Sus - Eruption (Extended Mix).wav",
              "musics/Ragunde & TBR - Illusion.wav",
              "musics/Olly James x KEVU & Luca Testa - Not Around (Extended Mix).wav",
              "musics/Abduction (Extended Mix).wav",
              "musics/Ragunde & Nic Johnston - Force40 (Extended Mix).wav"]

# List of the parts to extract from each music
toExtract = [["Intro"],
             ["Pré-drop"],
             ["Drop"],
             ["Couplet","Pré-drop"],
             ["Drop","Outro"]]

# Load the music files & save the data
datas,samplerates = [],[]
for musicFile in musicFiles:
    y,sr = librosa.load(musicFile)
    datas.append(y)
    samplerates.append(sr)

# Create all melspectrograms
melspectrograms = []
for data,samplerate in zip(datas,samplerates):
    melspectrogram = librosa.feature.melspectrogram(y=data, sr=samplerate, power=1.0)
    melspectrograms.append(melspectrogram)

# Store the BPM, beats frames and beats timings of every music
bpms,beatsFrames,beatsTimings = [],[],[]
for data,samplerate in zip(datas,samplerates):
    tempo,beats = librosa.beat.beat_track(y=data, sr=samplerate)
    beatsTimings.append(librosa.frames_to_time(beats, sr=samplerate))
    bpms.append(tempo)
    beatsFrames.append(beats)

In [2]:
# Functions used all over the code

# Permet de faire la moyenne sur l'ensemble des données situées entre deux intervalles donnés
def beatIntervals(S,tab):
    # Initialisation à vide
    ret = []

    # Itération sur les indices des beats
    for i in range(len(tab)-1):

        # Indices de début et de fin
        firstIndex = tab[i]
        lastIndex = tab[i+1]

        # Extraction des valeurs
        indices = np.array(range(firstIndex,lastIndex))
        test = S[:,indices]

        # Calcul de la moyenne
        ret.append(np.mean(test,axis=1))
    
    # Conversion en NumPy-array et transposition
    ret = np.array(ret)
    ret = np.swapaxes(ret,1,0)    
    return ret

# Permet de calculer la matrice des distances entre les intervalles
def distanceMatrix(intervals):
    # Initialisation de la matrice carrée à 0
    mat = np.zeros((intervals.shape[1],intervals.shape[1]))
    for i in range(intervals.shape[1]):
        for j in range(i+1,intervals.shape[1]):

            # Calcul de la distance entre les intervalles i et j et remplissage de la matrice
            mat[i][j] = mat[j][i] = (sp.spatial.distance.euclidean(intervals[:,i],intervals[:,j]))
    return mat

# Permet de construire la matrice de connectivité pour les beats
def buildBeatsConnectivityMatrix(matrix):
    ret = np.zeros((matrix.shape[1],matrix.shape[1]))
    for i in range(matrix.shape[1]):
        if i-1 >= 0:
            ret[i][i-1] = 1
        if i+1 < matrix.shape[1]:
            ret[i][i+1] = 1
    return ret

# Permet de générer les scores de Silhouette Coefficient, Calinsky-Harabasz Index et Davies-Bouldin Index
def generateAllScores(matrix,range):
    silhouette_scores = np.empty((0))
    calinsky_harabasz_scores = np.empty((0))
    davies_bouldin_scores = np.empty((0))

    for i in range:
        clustering = AgglomerativeClustering(n_clusters=i,connectivity=buildBeatsConnectivityMatrix(matrix),metric='euclidean',linkage='ward',compute_full_tree=True)
        model = clustering.fit_predict(matrix)
        labels = clustering.labels_

        silhouette_score = metrics.silhouette_score(matrix, labels, metric='euclidean')
        calinski_harabasz_score = metrics.calinski_harabasz_score(matrix, labels)
        davies_bouldin_score = metrics.davies_bouldin_score(matrix, labels)

        silhouette_scores = np.append(silhouette_scores,silhouette_score)
        calinsky_harabasz_scores = np.append(calinsky_harabasz_scores,calinski_harabasz_score)
        davies_bouldin_scores = np.append(davies_bouldin_scores,davies_bouldin_score)

    return silhouette_scores,calinsky_harabasz_scores,davies_bouldin_scores

# Permet d'utiliser les scores générés précédemment pour établir un classement et choisir le meilleur nombre de clusters
def rankingBestNumberClusters(x,silhouette_scores,calinsky_harabasz_scores,davies_bouldin_scores):
    sil_Ranks = np.zeros(x.shape[0])
    cal_ha_Ranks = np.zeros(x.shape[0])
    dav_bou_Ranks = np.zeros(x.shape[0])

    alt_sil = np.sort(silhouette_scores)[::-1]
    alt_cal = np.sort(calinsky_harabasz_scores)[::-1]
    alt_dav = np.sort(davies_bouldin_scores)

    while alt_sil.shape[0] != 0:
        silMax = np.max(alt_sil)
        indice = np.where(silhouette_scores == silMax)[0][0]
        sil_Ranks[indice] = math.floor(x.shape[0] - alt_sil.shape[0] + 1)
        alt_sil = np.delete(alt_sil,0)

    while alt_cal.shape[0] != 0:
        calMax = np.max(alt_cal)
        indice = np.where(calinsky_harabasz_scores == calMax)[0][0]
        cal_ha_Ranks[indice] = math.floor(x.shape[0] - alt_cal.shape[0] + 1)
        alt_cal = np.delete(alt_cal,0)

    while alt_dav.shape[0] != 0:
        davMin = np.min(alt_dav)
        indice = np.where(davies_bouldin_scores == davMin)[0][0]
        dav_bou_Ranks[indice] = math.floor(x.shape[0] - alt_dav.shape[0] + 1)
        alt_dav = np.delete(alt_dav,0)

    total = np.zeros(x.shape[0])
    for i in range(x.shape[0]):
        total[i] = sil_Ranks[i] + cal_ha_Ranks[i] + dav_bou_Ranks[i]

    total_Ranks = np.zeros(x.shape[0])
    copy = total.copy()

    while copy.shape[0] != 0:
        totalMin = np.min(copy)
        indice = np.where(total == totalMin)[0]
        for elt in indice:
            total_Ranks[elt] = math.floor(x.shape[0] - copy.shape[0] + 1)
        for elt in indice:
            copy = copy[copy != totalMin]

    final = np.empty((0))

    for rank in range(1,x.shape[0]+1):
        for k in range(x.shape[0]-1,-1,-1):
            if total_Ranks[k] == rank:
                final = np.append(final,x[k])

    return final[0]

# Permet de déterminer les clusters à partir de la matrice de distances et du nombre de clusters idéal
def determineClusters(matrix,idealClusterNb):
    clustering = AgglomerativeClustering(n_clusters=int(idealClusterNb),connectivity=buildBeatsConnectivityMatrix(matrix),metric='euclidean',linkage='ward',compute_full_tree=True)
    model = clustering.fit_predict(matrix)

    labels = clustering.labels_

    # Minimum size of clusters
    min_cluster_size = 16

    # Calculate the size of each cluster
    cluster_sizes = np.bincount(model)

    # Order all the clusters by their minimal index
    clusters = np.argsort([np.min(np.where(model == i)[0]) for i in range(clustering.n_clusters_)])

    # Merge the two first clusters while they are smaller than the minimum size
    while cluster_sizes[clusters[0]] < min_cluster_size:
        model[model == clusters[0]] = clusters[1]
        cluster_sizes[clusters[1]] += cluster_sizes[clusters[0]]
        cluster_sizes[clusters[0]] = 0
        clusters = np.delete(clusters,0)

    # Find the clusters that are smaller than the minimum size
    small_clusters = np.where(cluster_sizes < min_cluster_size)[0]

    # Erase the labels of the small clusters which don't contain any beat
    for small_cluster in small_clusters:
        if cluster_sizes[small_cluster] == 0:
            small_clusters = np.delete(small_clusters,np.where(small_clusters == small_cluster))

    # Get the minimum index of each small cluster
    min_indices = [np.min(np.where(labels == i)[0]) for i in small_clusters]

    # Sort the small clusters in ascending order of their minimum index
    small_clusters = small_clusters[np.argsort(min_indices)]

    # For each small cluster
    for small_cluster in small_clusters:
        # Find the indices of the small cluster
        indices_small_cluster = np.where(model == small_cluster)[0]

        # Find the nearest larger cluster in terms of index proximity
        min_distance = np.inf
        nearest_larger_cluster = None
        for larger_cluster in np.where(cluster_sizes >= min_cluster_size)[0]:
            # Find the indices of the larger cluster
            indices_larger_cluster = np.where(model == larger_cluster)[0]

            if indices_larger_cluster[-1] < indices_small_cluster[0]:
                # Calculate the minimum absolute difference between the indices
                distance = np.min(np.abs(indices_small_cluster[:, None] - indices_larger_cluster))

                if distance < min_distance:
                    min_distance = distance
                    nearest_larger_cluster = larger_cluster
        # Merge the small cluster into the nearest larger cluster
        model[model == small_cluster] = nearest_larger_cluster

    cluster_sizes = np.bincount(model)

    clusters = []

    for i in range(clustering.n_clusters_+1):
        clusters.append(np.where(model == i))

    clusters = [x for x in clusters if len(x[0]) > 0]

    clusters.sort(reverse=False,key=lambda x : x[0][0])

    limits = np.zeros(len(clusters)*2)

    for i in range(len(clusters)):
        limits[2*i] = math.floor(clusters[i][0][0])
        limits[2*i+1] = math.floor(clusters[i][0][len(clusters[i][0])-1])

    limits = limits.reshape(-1,2)
    limits = np.int_(limits)

    return limits

# Permet de générer toutes les sous-parties d'une musique en fonction des clusters déterminés précédemment
def generatePartsWithClusteringBeats(tab,beatsTimings,nb):
    array = []

    donnees,samplerate = sf.read(musicFiles[nb])
    print("Durée de la musique :",donnees.shape[0]//(60*samplerate),"minutes",(donnees.shape[0]%(samplerate*60))//(samplerate),"secondes")

    for i in range(len(tab)):
        beginBeat = tab[i][0]
        endBeat = tab[i][1]

        beginTime = beatsTimings[beginBeat]
        endTime = beatsTimings[endBeat]

        if i == 0:
            beginData = 0
        else:
            beginData = math.floor(beginTime*samplerate)+1

        if i == len(tab)-1:
            endData = len(donnees)-1
        else:
            endTime = beatsTimings[endBeat+1]
            endData = math.floor(endTime*samplerate)

        print(beginData,endData)

        data = donnees[beginData:endData]
        array.append(data)
        filename = "clusters/song"+str(nb+1)
        if not os.path.exists(filename):
            os.makedirs(filename)
        filename = filename+"/part"+str(i+1)+".wav"
        sf.write(filename,data,samplerate,'PCM_16')

    return array
        

In [3]:
# Création des intervalles pour chaque musique
intervals = []
for melspectrogram,beatsFrame in zip(melspectrograms,beatsFrames):
    intervals.append(beatIntervals(melspectrogram,beatsFrame))

# Création des matrices de distances pour chaque musique
distanceMatrixes = []
for interval in intervals:
    distanceMatrixes.append(distanceMatrix(interval))

# Génération des scores pour chaque musique
scores = []
for distanceMatrix in distanceMatrixes:
    scores.append(generateAllScores(distanceMatrix,np.arange(12,26)))

# Établissement des nombres de clusters idéaux pour chaque musique
idealClusterNbs = []
for score in scores:
    idealClusterNbs.append(rankingBestNumberClusters(np.arange(12,26),score[0],score[1],score[2]))

# Création des clusters pour chaque musique
clusters = []
clusteredMusics = []

for distanceMatrix,idealClusterNb in zip(distanceMatrixes,idealClusterNbs):
    clusters.append(determineClusters(distanceMatrix,idealClusterNb))

if not os.path.exists("clusters"):
    os.makedirs("clusters")

for nb in range(len(musicFiles)):
    clusteredMusic = generatePartsWithClusteringBeats(clusters[nb],beatsTimings[nb],nb)
    clusteredMusics.append(clusteredMusic)

Durée de la musique : 3 minutes 57 secondes
0 704400
704401 1315178
1315179 2433079
2433080 4544052
4544053 5158173
5158174 6289449
6289450 7052921
7052922 7687105
7687106 8384818
8384819 9082531
9082532 9780244
9780245 10392137
10392138 11407499
Durée de la musique : 3 minutes 42 secondes
0 774617
774618 1477903
1477904 2192335
2192336 2901194
2901195 3520888
3520889 4253152
4253153 4850555
4850556 5581705
5581706 6269387
6269388 7664814
7664815 8379245
8379246 9103708
9103709 9791390
9791391 10659631
Durée de la musique : 3 minutes 2 secondes
0 620808
620809 1159140
1159141 2464287
2464288 3079523
3079524 3616740
3616741 4461574
4461575 5017739
5017740 6305053
6305054 6919174
6919175 7533296
7533297 8071627
8071628 8749836
Durée de la musique : 3 minutes 50 secondes
0 669849
669850 1253877
1253878 2005089
2005090 2672709
2672710 3665780
3665781 6011924
6011925 7473110
7473111 8015899
8015900 9268662
9268663 11085216
Durée de la musique : 3 minutes 39 secondes
0 663161
663162 1241617


In [4]:
# Permet de déterminer un étiquettage pour chaque sous-partie de musique
def determineTags(tab,spec,beat):
    arr = []
    
    for i in range(len(tab)):
        beginBeat = tab[i][0]
        endBeat = tab[i][1]

        beginPart = beat[beginBeat]
        endPart = beat[endBeat+1]

        if i == 0:
            beginPart = 0

        if i == len(tab)-1:
            endPart = spec.shape[1]-1

        part = spec[:,beginPart:endPart]

        arr.append(np.median(part,axis=1))

    arr = np.array(arr)

    arr2 = []

    for elt in range(arr.shape[0]):
        arr2.append(np.sum(arr[elt]))

    arr2 = np.array(arr2)

    tags = []
    for i in range(len(tab)):
        tags.append("")

    indMax = np.argmax(arr2)
    tags[indMax] = "Drop"

    tags[0] = "Intro"
    tags[len(tab)-1] = "Outro"

    for i in range(len(tab)):
        if arr2[i] > 0.9*arr2[indMax] and i>1:
            tags[i] = "Drop"
        elif tags[i-1] == "Intro" and tags[i] == "" and not i>1:
            tags[i] = "Intro"

    for i in range(len(tab)-1):
        if tags[i-1] == "Drop" and tags[i+1] == 'Outro':
            tags[i] = "Outro"

    for i in range(len(tab)):
        if tags[i] == "" and tags[i+1] == "Drop":
            tags[i] = "Pré-drop"
        elif tags[i] == "":
            tags[i] = "Couplet"

    for i in range(len(tab)):
        if tags[i-2] == "Drop" and tags[i-1] == "Couplet" and tags[i] == "Couplet" and 2/3*arr2[i-2] < arr2[i-1]:
            tags[i-1] = "Drop"

    return tags

In [5]:
# Détermination des étiquettes pour chaque sous-partie de musique
musicTags = []

for nb in range(len(musicFiles)):
    S = melspectrograms[nb]
    limits = clusters[nb]
    beats = beatsFrames[nb]

    musicTags.append(determineTags(limits,S,beats))

for elt in musicTags:
    print(elt)

['Intro', 'Intro', 'Couplet', 'Pré-drop', 'Drop', 'Couplet', 'Couplet', 'Couplet', 'Pré-drop', 'Drop', 'Drop', 'Outro', 'Outro']
['Intro', 'Intro', 'Couplet', 'Pré-drop', 'Drop', 'Couplet', 'Couplet', 'Couplet', 'Couplet', 'Pré-drop', 'Drop', 'Drop', 'Outro', 'Outro']
['Intro', 'Intro', 'Couplet', 'Pré-drop', 'Drop', 'Couplet', 'Couplet', 'Pré-drop', 'Drop', 'Drop', 'Outro', 'Outro']
['Intro', 'Intro', 'Pré-drop', 'Drop', 'Drop', 'Couplet', 'Couplet', 'Pré-drop', 'Drop', 'Outro']
['Intro', 'Intro', 'Pré-drop', 'Drop', 'Couplet', 'Pré-drop', 'Drop', 'Drop', 'Outro', 'Outro']


In [6]:
# Permet de déterminer la plus longue chaîne de nombres consécutifs dans une liste
def longestChain(tab):
    # Sort each subarray
    for sub in tab:
        sub.sort()

    # Create a dictionary to store the longest chain ending at each number
    dp = {}

    for i, sub in enumerate(tab):
        for num in sub:
            # If it's in the first subarray, set the longest chain ending at this number to [num]
            # If it's not in the first subarray, set the longest chain ending at this number to the longest chain of the previous number plus [num], if the previous number exists in the dictionary and the new chain is longer
            if num - 1 in dp and (num not in dp or len(dp[num - 1]) + 1 > len(dp.get(num, []))):
                dp[num] = dp[num - 1] + [num]
            elif num not in dp:
                dp[num] = [num]

    # Find the longest chain
    longest_chain = max(dp.values(), key=len)

    # If the longest chain is of size 1, return the maximum number from the original list enclosed in an array
    if len(longest_chain) == 1:
        return [max(sum(tab, []))]  # sum(tab, []) flattens the list

    # Return the longest chain
    return longest_chain

# Permet de générer le mixage final
chosenParts = []

for i in range(len(toExtract)):
    elements = toExtract[i]

    tags = musicTags[i]

    indices = []
    for elt in elements:
        sub = []
        for j in range(len(tags)):
            if tags[j] == elt:
                sub.append(j+1)
        indices.append(sub)

    chosenParts.append(longestChain(indices))

musicParts = []
for i in range(len(musicFiles)):
    
    test = []
    chosen = chosenParts[i]
    for elt in chosen:
        filename = "clusters/song"+str(i+1)+"/part"+str(elt)+".wav"
        data,samplerate = sf.read(filename)
        musicParts.append(data)

sf.write("result/final_mix.wav",np.concatenate(musicParts),samplerate,'PCM_16')

if os.path.exists("clusters"):
    shutil.rmtree("clusters")

print("Mixage terminé !")


Mixage terminé !
