# Lecture de dataset

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
import os,sys
import io
import librosa
from pyspark.sql.types import BinaryType, StructType, StructField, IntegerType, FloatType, StringType, BinaryType, FloatType
import numpy as np
import soundfile as sf
import glob

In [2]:
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [3]:
spark = (SparkSession.builder
         .master("local[*]")
         .config("spark.pyspark.python", sys.executable)
         .config("spark.pyspark.driver.python", sys.executable)
         .config("spark.python.worker.reuse", "true")
         .config("spark.python.worker.timeout", "300")
         .getOrCreate())

spark.sparkContext.setLogLevel("ERROR")



# Le shéma des métadonnées audio qu'on va récupérer 

# Version 01 : Traitement depuis le contenu binaire(Spark UDF) : Très lent voir inexécutable (sur ma machine)

In [12]:
audio_metadata_schema = StructType([
    StructField("audio_data", BinaryType(), True), #on parle sur les valeurs y de nos signaux 
    StructField("sample_rate", IntegerType(), True), #les fréquences d'échantillonages
    StructField("duration", FloatType(), True),  #les valeurs x les durées 
    StructField("num_samples", IntegerType(), True)  
])

In [None]:
def preprocess_audio(file_content):
    try:
        #On charge l'audio
        audio_bytes = io.BytesIO(file_content) #permet de transformer des bytes en faux fichiers en mémoire (objets mémoire virtuel RAM)
        y, sr = librosa.load(audio_bytes, sr=None, mono=False) #on récupère les valeurs y, on change pas la fr d'échantillonage et on force pas la transformation vers mono

        #Convertir en mono si stréo
        if y.ndim > 1:
            y = librosa.to_mono(y)

        #réchantillionage à 22050 hz => fr stadard
        target_sr = 22050
        if sr != target_sr:
            y = librosa.resample(y, orig_sr=sr, target_sr=target_sr)
            sr = target_sr

        # normalisation des apmlitudes y 
        y = librosa.util.normalize(y)

        # supression des silences de début et de fin (20 db plus faible que le son originale)
        y_trimmed, _ = librosa.effects.trim(y, top_db=20)


        # filtarge du bruit seuil = 0.01 => une fois après la normalisation,
        # les valeurs de y sont entre 0 et 1 dpnc max = 1 et 0.01 reprèsente 1% de max => trop faible pour etre un vrai son
        threshold = 0.01
        y_trimmed[np.abs(y_trimmed) < threshold] = 0

        # sauvegarder en bytes 
        # on transfrome y_trimmed qui est np.array en un wav encodé en RAM avec un header et des les données audios : en les codes en bytes 
        buffer = io.BytesIO()
        sf.write(buffer, y_trimmed, sr, format='WAV') #recréer un wav propres ave le signal audio : numpy array : y_trimmed et la fréquence sr dans le buffer mémoire
        audio_bytes_processed = buffer.getvalue() #récupère les bytes finaux 

        # Métadonnées
        duration = len(y_trimmed) / sr #le nbr d'échantillons total / le nbr d'échantillions par seconde = durée 
        num_samples = len(y_trimmed) #le nbr total des échantillions du signal après traitement (nbr d'element du tableau y-trimmed)
       

        return (audio_bytes_processed, sr, duration, num_samples)
    
    except Exception as e:
        print(f"Erreur de prétraitement: {str(e)}")
        return (None, None, None, None)

# Créeration de l'UDF
preprocess_udf = udf(preprocess_audio, audio_metadata_schema)

# Charger les fichiers WAV
df_audio = spark.read.format("binaryFile").load("C:/spark_project/Dataset_Sorted_by_class/air_conditioner/*.wav")
print("df_audio récupéré")

# Appliquer le prétraitement
df_preprocessed = df_audio.withColumn("preprocessed", preprocess_udf(col("content"))) \
    .select(
        col("path"),
        col("preprocessed.audio_data").alias("audio_preprocessed"),
        col("preprocessed.sample_rate").alias("sample_rate"),
        col("preprocessed.duration").alias("duration"),
        col("preprocessed.num_samples").alias("num_samples")
    
    .filter(col("audio_preprocessed").isNotNull()) # filtrage des fichiers qui ont échoué
    .cache()
    )


df_preprocessed.count()
df_preprocessed.show(1, False)

'''
# Sauvegarder les résultats
out_path = r"C:\spark_project\Dataset_Sorted_by_class\processed_audio"
df_preprocessed.write.mode("overwrite").parquet(out_path)
print("df_preprocessed")

print(f"Nombre de fichiers prétraités: {df_preprocessed.count()}")
df_preprocessed.show(5, truncate=False)
        
'''

# Version 02: Traitement direct depuis le chemin du fichier (plus rapide)

In [4]:
files = glob.glob(r"C:\spark_project\Dataset_Sorted_by_class\air_conditioner\*.wav")
rdd = spark.sparkContext.parallelize(files, numSlices=4) #On donne à spark une liste d'objets (chemains vers les fichiers) et il applique la fct python de praitrraitment à chacun en parallèle


In [5]:
def preprocess_audio(file_path):
    try:
        #Charger l'audio depuis le chemin (pas de BytesIO)
        y, sr = sf.read(file_path, dtype="float32", always_2d=False)

        #Convertir en mono si stréo
        if y.ndim > 1:
            y = librosa.to_mono(y)

        #réchantillionage à 22050 hz => fr stadard
        target_sr = 22050
        if sr != target_sr:
            y = librosa.resample(y, orig_sr=sr, target_sr=target_sr)
            sr = target_sr

        # normalisation des apmlitudes y 
        y = librosa.util.normalize(y)

        # supression des silences de début et de fin (20 db plus faible que le son originale)
        y_trimmed, _ = librosa.effects.trim(y, top_db=20)


        # filtarge du bruit seuil = 0.01 => une fois après la normalisation,
        # les valeurs de y sont entre 0 et 1 dpnc max = 1 et 0.01 reprèsente 1% de max => trop faible pour etre un vrai son
        threshold = 0.01
        y_trimmed[np.abs(y_trimmed) < threshold] = 0

        # sauvegarder en bytes 
        # on transfrome y_trimmed qui est np.array en un wav encodé en RAM avec un header et des les données audios : en les codes en bytes 
        buffer = io.BytesIO()
        sf.write(buffer, y_trimmed, sr, format='WAV') #recréer un wav propres ave le signal audio : numpy array : y_trimmed et la fréquence sr dans le buffer mémoire
        audio_bytes_processed = buffer.getvalue() #récupère les bytes finaux 

        # Métadonnées
        duration = float(len(y_trimmed) / sr) #le nbr d'échantillons total / le nbr d'échantillions par seconde = durée 
        num_samples = int(len(y_trimmed)) #le nbr total des échantillions du signal après traitement (nbr d'element du tableau y-trimmed)
       

        return (file_path, audio_bytes_processed, sr, duration, num_samples)
    
    except Exception as e:
        print(f"Erreur de prétraitement: {str(e)}")
        return (file_path, None, None, None, None)




In [7]:

schema = StructType([
    StructField("path", StringType(), True),
    StructField("audio_preprocessed", BinaryType(), True),
    StructField("sample_rate", IntegerType(), True),
    StructField("duration", FloatType(), True),
    StructField("num_samples", IntegerType(), True),
])

df_preprocessed = spark.createDataFrame(rdd.map(preprocess_audio), schema) \
    .filter("audio_preprocessed is not null") \
    .cache()

df_preprocessed.show(10, False)


+--------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+-----------+------------+-----------+
|path                                                                      |audio_preprocessed                                                                                                                         |sample_rate|duration    |num_samples|
+--------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+-----------+------------+-----------+
|C:\spark_project\Dataset_Sorted_by_class\air_conditioner\13230-0-0-1.wav  |[52 49 46 46 26 00 00 00 57 41 56 45 66 6D 74 20 10 00 00 00 01 00 01 00 22 56 00 00 44 AC 00 00 02 00 10 00 64 61 74 61 02 00 00 00 00 80]|22050      |4.5351473E

In [8]:
rdd.getNumPartitions()


4