# Data Cleaning
Cette étape est cruciale pour garantir la qualité et la fiabilité des données avant de les utiliser pour l'analyse et la modélisation.

In [None]:
import IPython.display as ipd
from scipy.signal import hilbert
import numpy as np
from scipy.stats import skew
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import scipy

# change style
plt.style.use('ggplot')
import librosa
import librosa.display
from tqdm import tqdm
from tools import play_audio, load_audio_file, pad_signal

tqdm.pandas()
import os
from glob import glob
import random

from concurrent.futures import ThreadPoolExecutor, as_completed

In [None]:
from params import SOUNDS_DATASET_PATH, SAMPLE_RATE, CLASS_COLORS
from tools import play_audio, load_audio_file, pad_signal

## Load Dataset

In [None]:
dataset_csv_path = os.path.join(SOUNDS_DATASET_PATH, 'dataset_handcrafted_features_extracted.csv')
df_drums = pd.read_csv(dataset_csv_path)

df_drums = df_drums.head(200)
# drop 'Unnamed: 0' column
df_drums.drop(columns=['Unnamed: 0'], inplace=True)
# set index to file_path
df_drums.set_index('file_path', inplace=True)
print("Dataset shape: ", df_drums.shape)
df_drums

In [None]:
df_drums.info(verbose=2)

In [None]:
df_drums.describe().T

### Keep only float columns

In [None]:
# get columns names with float type use .info()
columns_float = [k for k, v in df_drums.dtypes.to_dict().items() if v == 'float64' or v == 'int64']
columns_float

## Delete Full of NaN rows

In [None]:
# get rows with all NaN values
df_drums[df_drums.isna().all(axis=1)]

In [None]:
# delete rows with all NaN values (focus on features)
df_drums.dropna(how='all', subset=columns_float, inplace=True)
df_drums

## Duplicates
Suppression des duplicats

### Duplicates rows

In [None]:
print(f"Nombre de lignes totalement dupliquées : {df_drums.duplicated().sum()}")

In [None]:
duplicated_focus_on_features = df_drums.duplicated(subset=columns_float)
print(
    f"Nombre de lignes dupliquées (focus on features) : {duplicated_focus_on_features.sum()} lignes (qu'on peut potentiellement supprimer)")

# print per class
df_drums[duplicated_focus_on_features].groupby("class").count()["file_name"].sort_values(ascending=False)

In [None]:
duplicated_focus_on_features = df_drums.duplicated(subset=columns_float, keep=False)
# Afficher les lignes dupliquées (toutes les copies)
duplicates_df = df_drums[duplicated_focus_on_features].sort_values(by=columns_float)
duplicates_df

In [None]:

# Regroupez les lignes en double en fonction de leurs valeurs de features
grouped_duplicates = duplicates_df.groupby(columns_float)

# Créez une liste contenant des listes de file_paths pour chaque groupe de duplicatas
duplicate_groups = []
for _, group in grouped_duplicates:
    duplicate_groups.append(list(group.index))

# Affichez les groupes de duplicatas
for i, group in enumerate(duplicate_groups):
    print(f"# Duplicate Group {i + 1}:")
    for file_path in group:
        print(f"  - {file_path}")
    print()

maintenant qu'on a les lignes dupliquées, on va les supprimer

#### Delete duplicates in dataset

In [None]:
duplicates_file_to_delete = []
file_to_delete_num_group_map = {}

# Parcourez chaque groupe de doublons
for num_group, group in enumerate(duplicate_groups, start=1):
    # Triez les fichiers audio du groupe par la taille de leur nom de fichier
    file_name_len = lambda file_path: len(os.path.basename(file_path))
    sorted_group = sorted(group, key=file_name_len)

    # Gardez le fichier audio avec le plus petit nom de fichier (le premier de la liste triée)
    to_keep = sorted_group[0]

    # Ajoutez les autres fichiers audio du groupe à la liste des fichiers à supprimer
    duplicates_file_to_delete.extend(sorted_group[1:])

    for file_path in sorted_group[1:]:
        file_to_delete_num_group_map[file_path] = num_group

print(len(set(duplicates_file_to_delete)))


In [None]:
# Copiez votre DataFrame pour ne pas modifier l'original
cleaned_df_drums = df_drums.copy()
# Supprimez les autres fichiers audio du groupe de doublons du DataFrame
cleaned_df_drums = cleaned_df_drums.drop(file_path for file_path in duplicates_file_to_delete)

# cleaned_df contient maintenant les données sans les doublons indésirables
print(
    f"Nombre de lignes dupliquées (focus on features) : {cleaned_df_drums.duplicated(subset=columns_float).sum()} lignes (qu'on peut potentiellement supprimer)")
cleaned_df_drums

sauvegarder dans un fichier csv les données dupliquées (pour les supprimer manuellement)

In [None]:
# save duplicates in txt file
with open(os.path.join(SOUNDS_DATASET_PATH, "__duplicates.txt"), "w") as f:
    for file_path in duplicates_file_to_delete:
        f.write(f"{file_path} {file_to_delete_num_group_map[file_path]}" + os.linesep)

### Duplicates file_name

In [None]:
print(
    f"Nombre de lignes dupliquées sur la colonne 'file_name' : {cleaned_df_drums.duplicated(subset=['file_name', 'file_extension']).sum()} lignes (qu'on peut potentiellement supprimer)")

cleaned_df_drums[cleaned_df_drums.duplicated(subset=['file_name', 'file_extension'], keep=False)].sort_values(
    by=['file_name', 'file_extension'])

### Duplicates (too similar) audio content

Pour détecter les fichiers audio dupliqués,  on va comparer les caractéristiques audio de chaque fichier audio. Si les caractéristiques audio sont identiques, alors les fichiers audio sont dupliqués.

In [None]:
# euclidean distance with np.linalg.norm
def euclidean_distance(vector1: np.ndarray, vector2: np.ndarray):
    """
    Compute euclidean distance between two vectors
    """
    return np.linalg.norm(vector1 - vector2)


# cosine similarity with np.dot
def cosine_similarity(vector1: np.ndarray, vector2: np.ndarray):
    """
    Compute cosine similarity between two vectors
    """
    return np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2))


def similarity(vector1: np.ndarray, vector2: np.ndarray, metric: str = "euclidean"):
    """
    Compute similarity between two vectors using the specified metric. Use numpy functions.
    """
    if metric == "cosine":
        sim = cosine_similarity(vector1, vector2)
        return sim
        # Normalize cosine similarity to [0, 1]
        #return (sim + 1) / 2
    elif metric == "euclidean":
        dist = euclidean_distance(vector1, vector2)
        # Normalize euclidean distance to [0, 1] by dividing by the maximum possible distance
        max_dist = np.sqrt(len(vector1))
        return 1 - (dist / max_dist)
    else:
        raise ValueError(f"Unknown metric: {metric}")


def compute_similarities(df_X: pd.DataFrame, metric: str = "euclidean"):
    """
    Compute similarities between audio files
    """
    similarities = {}

    for file_i, row_i in tqdm(df_X.iterrows(), total=len(df_X)):
        for file_j, row_j in df_X.iterrows():
            if file_i == file_j:
                continue
            if (file_i, file_j) in similarities or (file_j, file_i) in similarities:
                continue
            vectori = row_i.to_numpy()
            vectorj = row_j.to_numpy()
            similarities[(file_i, file_j)] = similarity(vectori, vectorj, metric=metric)

    return similarities


def compute_df_similarities(df_X: pd.DataFrame, metric: str = "euclidean"):
    """
    Compute similarities between audio files
    """
    similarities = compute_similarities(df_X, metric=metric)

    # crate a dataframe with similarities dict
    df_similarities = pd.DataFrame.from_dict(similarities, orient='index', columns=['similarity']).sort_values(
        by=['similarity'], ascending=False)
    # from index (tuple), create 2 column file_i and file_j
    df_similarities.reset_index(inplace=True)
    df_similarities["file_i"] = df_similarities["index"].apply(lambda x: x[0])
    df_similarities["file_j"] = df_similarities["index"].apply(lambda x: x[1])
    df_similarities.drop(columns=["index"], inplace=True)

    return df_similarities


df_similarities = compute_df_similarities(cleaned_df_drums[columns_float], metric="cosine")
df_similarities

#### Similary > 0.999

In [None]:
df_similarities_0_999 = df_similarities.query("similarity > 0.999")
df_similarities_0_999

In [None]:
# play some audio files with similarity > 0.999
for i, row in df_similarities_0_999.sample(10).iterrows():
    print(f"#### similarity: {row['similarity']}")
    play_audio(row["file_i"])
    play_audio(row["file_j"])

#### Delete similar files

In [None]:
# pandas display full table
#pd.set_option('display.max_rows', None)
table_loser_0_999 = pd.Series(
    df_similarities_0_999["file_i"].to_list() + df_similarities_0_999["file_j"].to_list()).value_counts()
table_loser_0_999

In [None]:
play_audio("G:\Shared drives\PFE - ING3 Mlamali\DrumClassifier - Sounds Dataset\Conga\Conga (119).wav")
play_audio("G:\Shared drives\PFE - ING3 Mlamali\DrumClassifier - Sounds Dataset\Conga\Conga (118).wav")

In [None]:
similar_file_to_delete = []  # list of file to delete
for i, row in df_similarities_0_999.iterrows():
    if row["file_i"] in similar_file_to_delete or row["file_j"] in similar_file_to_delete:
        continue
    if table_loser_0_999[row["file_j"]] > table_loser_0_999[row["file_i"]]:
        #print(f"{os.path.basename(row['file_j'])} lose vs. {os.path.basename(row['file_i'])} (because {table_loser_0_999[row['file_j']]} > {table_loser_0_999[row['file_i']]})")
        similar_file_to_delete.append(row["file_j"])
    else:
        #print(f"{os.path.basename(row['file_i'])} lose vs. {os.path.basename(row['file_j'])} (because {table_loser_0_999[row['file_i']]} > {table_loser_0_999[row['file_j']]})")
        similar_file_to_delete.append(row["file_i"])

len(similar_file_to_delete)

In [None]:
# delete similar files from cleaned_df_drums
cleaned_df_drums = cleaned_df_drums.drop(similar_file_to_delete)
cleaned_df_drums

on affiche maintenant les fichiers qui ont été supprimés, pour aller les supprimer manuellement

In [None]:
similar_file_to_delete

## Missing values Processing
Traitement des valeurs manquantes

In [None]:
cleaned_df_drums.isnull().any()

In [None]:
cleaned_df_drums.isnull().sum()

In [None]:
cleaned_df_drums.describe().T

In [None]:
cleaned_df_drums.loc

## Outliers

Un outlier (ou valeur aberrante) est une observation qui se situe à une distance anormalement grande des autres observations dans un ensemble de données. Les outliers peuvent être causés par des erreurs de mesure, des erreurs d'enregistrement, ou par des variations naturelles dans les données. Ils peuvent avoir un impact significatif sur l'analyse et la modélisation des données, en introduisant des biais et en réduisant la performance des modèles prédictifs.

Dans le contexte de notre projet, les outliers peuvent correspondre à des sons de batterie ayant des caractéristiques très différentes des autres sons, qui pourraient rendre difficile la classification ou l'analyse ultérieure.

In [None]:
cleaned_df_drums.describe().T

In [None]:
cleaned_df_drums["duration"].boxplot()

In [None]:
from collections import Counter


def get_outliers_iqr_per_class(df, column, class_column, multiplier=1.5):
    outliers_indices = []

    # Divisez le dataframe en sous-groupes en fonction des classes.
    for class_value in df[class_column].unique():
        class_df = df[df[class_column] == class_value]

        # Appliquez la méthode IQR pour chaque sous-groupe.
        Q1 = class_df[column].quantile(0.25)
        Q3 = class_df[column].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - multiplier * IQR
        upper_bound = Q3 + multiplier * IQR
        outliers_class_df = class_df[(class_df[column] < lower_bound) | (class_df[column] > upper_bound)]

        outliers_indices.extend(outliers_class_df.index.tolist())

    return outliers_indices


# Remplacez 'class_column' par le nom de la colonne contenant les classes dans votre dataframe.
class_column = 'class'

# Parcourez toutes les colonnes de cleaned_df_drums pour lesquelles vous souhaitez détecter les outliers.
outliers_counter = Counter()
for col in tqdm(columns_float):
    outliers_indices = get_outliers_iqr_per_class(cleaned_df_drums, col, class_column)
    outliers_counter.update(outliers_indices)

limit_outliers_count = 15
# Trouvez les index des lignes qui ont été détectées au moins 3 fois comme outliers.
outliers_to_remove = [index for index, count in outliers_counter.items() if count >= limit_outliers_count]
print(f"Nombre d'outliers à supprimer: {len(outliers_to_remove)}")

for file_outlier in outliers_to_remove[:10]:
    play_audio(file_outlier)

In [None]:
# Supprimez les outliers du dataframe.
cleaned_df_drums = cleaned_df_drums.drop(outliers_to_remove)
print(f"Nombre de lignes restantes: {len(cleaned_df_drums)}")
cleaned_df_drums

## Save cleaned dataframe

In [None]:
cleaned_df_drums.to_csv(os.path.join(SOUNDS_DATASET_PATH, "dataset_cleaned_and_features.csv"), index=False)