<a href="https://colab.research.google.com/github/Ayrsz/SignalAndSistemyProject/blob/main/FeatureExtract.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#Scientific computation
import numpy as np
import jax
import jax.numpy as jnp

#Plot and view
from matplotlib import pyplot as plt
import IPython as ipy
from IPython import display
from IPython.display import Audio

#Data manipulation
import os
import gc

#Audio manipulation
import librosa
import soundfile as sf


In [2]:
from zipfile import ZipFile
try:
    from google.colab import drive
    drive.mount("/content/drive", force_remount= True)
except:
    print("Código rodando localmente")

Mounted at /content/drive


In [3]:
#Unzip in a specific folder
root_path = "/content/drive/MyDrive/Dataset/DatasetAudios/"
if os.path.exists(root_path+"GTZAN"):
    print("Arquivo já unzipado, sem necessidade de ações.")
else:
    path_zip = root_path + "GTZAN.zip"
    with ZipFile(path_zip , 'r') as zip_archive:
        zip_archive.extractall(root_path + "GTZAN")

root_path = "/content/drive/MyDrive/Dataset/DatasetAudios/" + "GTZAN"

Arquivo já unzipado, sem necessidade de ações.


In [4]:
def dir_genres(path_all_genres : str) -> list[str]:
    return [path_all_genres + '/' + pasta for pasta in os.listdir(path_all_genres)]

#Return all files.wav separated per genres
def dir_files_wav(path_genre : str) -> list[str]:
  return [path_genre + '/' + arquivo for arquivo in os.listdir(path_genre)]


##Generate Escpectogram

In [79]:
def is_dir_empty(path : str) -> bool:
    with os.scandir(path) as iterator:
        for entry in iterator:
            if entry.name == '.ipynb_checkpoints':
                continue
            return False  # Se encontrar qualquer outro arquivo/pasta, não está vazio
        return True  # Se só havia .ipynb_checkpoints (ou nada), está vazio

def create_batch(files : list[str], size : int, genre : str, shape = 661794):
    batch_size = min(size, len(files))
    batch = jnp.zeros((batch_size, shape))  # Inicializa batch zerado

    nums = []
    new_files = []  # Lista para armazenar arquivos que foram processados corretamente

    for i in range(batch_size):
        try:

            audio, sample_rate = sf.read(files[i])  # Mantém o SR original

            if len(audio) < shape:
                # Preenche com zeros se o áudio for menor que o tamanho esperado
                audio = jnp.pad(audio, (0, shape - len(audio)))
            elif len(audio) > shape:
                # Se for maior, corta
                audio = audio[:shape]

            batch = batch.at[i].set(audio)

            num = files[i].split("/")[-1].split(".")[-2]

            nums.append(num)
            new_files.append(files[i])  # Adiciona à lista de arquivos processados

        except Exception as e:
            print(f"Erro ao processar {files[i]}: {e}")

    # Remove os arquivos processados da lista original
    for file in new_files:
        files.remove(file)
    batch = jax.device_put(batch)
    return batch, nums, sample_rate, files


def spectro_feat(audio : jnp.ndarray, sample_rate : int) -> jnp.ndarray:

    audio = jax.device_put(audio)
    FFT_SIZE = 512
    HOP_SIZE = 256

    hamming = jnp.hamming(FFT_SIZE)
    num_frames = (len(audio) - FFT_SIZE) // HOP_SIZE + 1

    def compute_fft(i):
        start = i * HOP_SIZE
        signal = jax.lax.dynamic_slice(audio, (start,), (FFT_SIZE,))
        signal = signal * hamming
        return jnp.fft.rfft(signal, n = FFT_SIZE)


    sfft = jax.vmap(compute_fft)(jnp.arange(num_frames))

    ssft = jnp.abs(sfft)**2 #Spectro
    dB_format =  20 * jnp.log10( ssft / 1e-10) #Spectro
    return dB_format.T

def spectro_feat_batch(batch : jnp.ndarray, sample_rate : int) -> jnp.ndarray:
    return jax.vmap(lambda audio: spectro_feat(audio, sample_rate))(batch)

def plt_spectogram(batch : jnp.ndarray, sample_rate : int, y_axis_type= "linear"):
  escala_Y = spectro_feat_batch(batch, sample_rate)
  escala_Y = np.array(escala_Y)
  #fig = plt.figure(figsize=(10,10))
  #plt.subplot(3,3,1)
  for (i,audio) in enumerate(escala_Y):
    plt.subplot(3,3,i+1)
    librosa.display.specshow(audio, sr = sample_rate, x_axis = "time", y_axis = y_axis_type, )
    plt.set_cmap("magma")
    plt.colorbar()

def write_spectrogram(audios : jnp.ndarray, sample_rate: int, paths_write : list[str]):
  escala_Y = spectro_feat_batch(audios, sample_rate)

  del audios
  gc.collect()

  escala_Y = np.array(escala_Y)

  assert len(escala_Y) == len(paths_write)

  for (audio, path_write) in zip(escala_Y, paths_write):
    plt.figure(figsize=(6,6))
    librosa.display.specshow(audio, sr = sample_rate)
    plt.set_cmap("magma")
    plt.savefig(path_write, dpi = 100, bbox_inches = "tight", pad_inches = 0)


    del audio
    gc.collect()
    plt.close("all")


  del escala_Y
  del paths_write
  gc.collect()

def write_spectrogram_from_genre(genre_path : str):
    files = dir_files_wav(genre_path)
    total_files_start = len(files)
    paths = np.array(genre_path.split("/"))
    genre = paths[-1]
    size_path = len(paths)
    genre_images_path = '/'.join(paths[0:size_path-2]) + "/images/" + genre
    batch_size = 5

    print(f"Writing on: {genre_images_path}")

    if is_dir_empty(genre_images_path):
        while(len(files) != 0):
            try:
                audios, nums, sample_rate, files = create_batch(files, 2, genre)

                paths_write = [genre_images_path + "/" + genre + "." + num + ".png" for num in nums]
                write_spectrogram(audios, sample_rate, paths_write)#, path_write)

                if(len(files) % 25 == 0):
                    print(f"Carregando imagens, {genre} : {((total_files_start - len(files))/total_files_start)*100:.2f}%")

            except Exception as e:
                print(f"Erro {e}")
                print(f"Erro em {files[0]}")
    else:
        print(f"Diretorio com imagens, porfavor esvazie: {genre}")

#Writting new images spectogram
def write_all_spectrograms(path_genres : str):
  genres_path = dir_genres(path_genres)

  for genre_path in genres_path:
    write_spectrogram_from_genre(genre_path)


write_all_spectrograms(root_path + "/Data/genres_original")

Writing on: /content/drive/MyDrive/Dataset/DatasetAudios/GTZAN/Data/images/jazz
Diretorio com imagens, porfavor esvazie: jazz
Writing on: /content/drive/MyDrive/Dataset/DatasetAudios/GTZAN/Data/images/blues
Diretorio com imagens, porfavor esvazie: blues
Writing on: /content/drive/MyDrive/Dataset/DatasetAudios/GTZAN/Data/images/metal
Diretorio com imagens, porfavor esvazie: metal
Writing on: /content/drive/MyDrive/Dataset/DatasetAudios/GTZAN/Data/images/classical
Diretorio com imagens, porfavor esvazie: classical
Writing on: /content/drive/MyDrive/Dataset/DatasetAudios/GTZAN/Data/images/country
Diretorio com imagens, porfavor esvazie: country
Writing on: /content/drive/MyDrive/Dataset/DatasetAudios/GTZAN/Data/images/pop
Diretorio com imagens, porfavor esvazie: pop
Writing on: /content/drive/MyDrive/Dataset/DatasetAudios/GTZAN/Data/images/disco
Diretorio com imagens, porfavor esvazie: disco
Writing on: /content/drive/MyDrive/Dataset/DatasetAudios/GTZAN/Data/images/reggae
Diretorio com im

#Features on time domain

In [119]:
audio_example, sr = librosa.load("/content/drive/MyDrive/Dataset/DatasetAudios/GTZAN/Data/genres_original/metal/metal.00041.wav")

In [245]:
def round_float_decimal(number):
    if not isinstance(number, np.ndarray):
        return float(f"{number:.4f}")
    else:
        return np.array([float(f"{d:.4f}") for d in number])

def get_statistics_feature(audio : np.typing.NDArray, name) -> dict:
    mean = round_float_decimal(np.mean(audio))
    std = round_float_decimal(np.std(audio))
    max = round_float_decimal(np.max(audio))
    min = round_float_decimal(np.min(audio))
    quartis = round_float_decimal(np.quantile(audio, [0.25, 0.5, 0.75]))
    return {name + "_mean": mean, name + "_std":std,  name + "_max": max, name + "_min": min, name + "_25p":quartis[0], name + "_50p": quartis[1],  name+ "_75p": quartis[2]}

## Low level features

### Energy

In [176]:
def get_rms(audio, FRAME_SIZE = 1024):
    rms = librosa.feature.rms(y = audio, frame_length= FRAME_SIZE)
    feats = get_statistics_feature(rms, "rms")
    return feats
rms = get_rms(audio_example)
print(rms)


{'rms_mean': np.float32(0.096787415), 'rms_std': np.float32(0.022715215), 'rms_max': np.float32(0.16872784), 'rms_min': np.float32(0.03834604), 'rms_25p': np.float64(0.07967611402273178), 'rms_50p': np.float64(0.09463489800691605), 'rms_75p': np.float64(0.1081550270318985)}


### Zero-Cross-Rate

In [211]:
def get_zero_cross_rate(audio, FRAME_SIZE = 1024):
    zero_cross = librosa.feature.zero_crossing_rate(audio, frame_length= FRAME_SIZE)
    feats = get_statistics_feature(zero_cross, "zero_cross")
    return feats


rate = get_zero_cross_rate(audio_example)


### Amplitude Envelope

In [178]:
def get_amplitude_envelope(audio, sample_rate, frame_size = 1024):
    AEm = []
    number_of_frames = len(audio) // frame_size
    for i in range(number_of_frames):
        start = i * frame_size
        stop = start + frame_size
        max_amp = np.max(audio[start:stop])
        AEm.append(max_amp)
    AEm = np.array(AEm)

    feats = get_statistics_feature(AEm, "amplitude_envelope")
    return feats




audio, sr = librosa.load("/content/drive/MyDrive/Datasets/DatasetAudios/GTZAN/Data/genres_original/blues/blues.00001.wav")
rms_info = get_amplitude_envelope(audio_example, sr)
print(rms_info)

{'amplitude_envelope_mean': np.float32(0.29201722), 'amplitude_envelope_std': np.float32(0.091424696), 'amplitude_envelope_max': np.float32(0.6850891), 'amplitude_envelope_min': np.float32(0.11013794), 'amplitude_envelope_25p': np.float64(0.228851318359375), 'amplitude_envelope_50p': np.float64(0.274688720703125), 'amplitude_envelope_75p': np.float64(0.3383331298828125)}


## Medium level Features

### Kurtosis

In [247]:
#Analise do formato da curva da distribuição dos valores do sinal
# Kurtosis -> O quão intenso é o pico em comparação ao resto
# K = E([SINAL - MEDIA]^4)/DESVIO_PADRÃO ^4
# K < 3 -> Mais suave que uma distribuição gaussiana
# K = 3 -> Parecido com uma distribuição gaussiana
# K > 3 -> Mais "pontudo" que a distribuicação gaussiana
def get_kurtosis(audio):
    audio_normalizado = (audio - np.mean(audio))/np.max(audio)

    mean = np.mean(audio_normalizado)
    std = np.std(audio_normalizado)
    dif_audio_media = audio - mean

    valor_esperado = np.mean(dif_audio_media**4)

    kurtosis = np.float64(valor_esperado/std**4)
    return round_float_decimal(kurtosis)

print(get_kurtosis(audio_example))

0.8641


## High level features

### Tempo

In [126]:
def get_tempo(audio, sample_rate):
    tempo, beats = librosa.beat.beat_track(y = audio, sr = sample_rate)
    tempo = tempo.item() #É um vetor de apenas um unico valor
    return np.int32(tempo)


print(get_tempo(audio_example, sr))

117


# Features on frequency domain

### Low level feature

In [213]:
#Gera um "centro de massa"
#Média das frequencias, ponderadas pela amplitude daquela sample para cada frame (usa sft)
def get_spectral_centroid(audio, sample_rate):
    spectral_centroid = librosa.feature.spectral_centroid(y = audio, sr = sample_rate)
    feats = get_statistics_feature(spectral_centroid, "spectral_centroid")
    return feats

centroid = get_spectral_centroid(audio_example, sr)
print(centroid)

{'spectral_centroid_mean': np.float64(2269.1618713103626), 'spectral_centroid_std': np.float64(368.46775170144997), 'spectral_centroid_max': np.float64(3496.809546746783), 'spectral_centroid_min': np.float64(1306.5515169480977), 'spectral_centroid_25p': np.float64(2005.6191945289615), 'spectral_centroid_50p': np.float64(2233.779149412821), 'spectral_centroid_75p': np.float64(2532.513023398053)}


### High level feature

In [241]:
def get_mel_coef(audio, sample_rate):
    mfcc = librosa.feature.mfcc(y = audio, sr = sample_rate, n_mfcc = 13)
    coefs_estatistics = {}
    for (i,coefs) in enumerate(mfcc):
        coefs_estatistics["mfcc#" + f"{i:02d}_mean"] = round_float_decimal(np.mean(coefs))
        coefs_estatistics["mfcc#" + f"{i:02d}_std"] = round_float_decimal(np.std(coefs))


    return coefs_estatistics

mfcc = get_mel_coef(audio_example, sr)
print(mfcc)

{'mfcc#00_mean': -100.423, 'mfcc#00_std': 36.409, 'mfcc#01_mean': 104.693, 'mfcc#01_std': 19.852, 'mfcc#02_mean': -57.254, 'mfcc#02_std': 18.144, 'mfcc#03_mean': 56.577, 'mfcc#03_std': 10.628, 'mfcc#04_mean': -5.556, 'mfcc#04_std': 9.673, 'mfcc#05_mean': 22.833, 'mfcc#05_std': 10.063, 'mfcc#06_mean': -6.172, 'mfcc#06_std': 10.503, 'mfcc#07_mean': 23.097, 'mfcc#07_std': 8.875, 'mfcc#08_mean': -19.072, 'mfcc#08_std': 7.08, 'mfcc#09_mean': 17.795, 'mfcc#09_std': 7.216, 'mfcc#10_mean': -13.783, 'mfcc#10_std': 6.934, 'mfcc#11_mean': 9.963, 'mfcc#11_std': 7.073, 'mfcc#12_mean': -14.859, 'mfcc#12_std': 6.991}


# Create CSV

In [249]:
import csv



def get_features_dict(audio, sample_rate, name, dir_image):
    features_dict = {"name": name + ".wav", "dir_image": dir_image}

    rms = (get_rms(audio, sample_rate))
    zero_cross = (get_zero_cross_rate(audio))
    amplitude_env = (get_amplitude_envelope(audio, sample_rate))
    spectral_centroid = (get_spectral_centroid(audio, sample_rate))
    mfcc_coef = (get_mel_coef(audio, sample_rate))

    features_dict = features_dict | rms | zero_cross | amplitude_env | spectral_centroid | mfcc_coef

    kurtosis = get_kurtosis(audio)
    tempo = get_tempo(audio, sample_rate)
    label = name.split(".")[0]

    features_dict.update({"tempo":tempo, "kurtosis":kurtosis, "label":label})

    return features_dict



def write_csv_genre(genre_path, path_write):
    files = dir_files_wav(genre_path)
    features_vector = []
    for file in files:
        path = file.split("/")
        name = path[-1].replace(".wav", "")
        num = name.split(".")[1]

        genre = name.split(".")[0]

        idx_data = [i for i in range(len(path)) if path[i] == "Data" ][0]
        dir_image = "/".join(path[:idx_data]) + "/images/" + genre + "/" + name + ".png"

        audio, sample_rate = librosa.load(file)
        features_dict = get_features_dict(audio, sample_rate, name, dir_image)
        features_vector.append(features_dict)


    with open(path_write + "/feats.csv", 'w') as f:
        writer = csv.DictWriter(f, fieldnames = features_vector[0].keys())
        writer.writeheader()
        writer.writerows(features_vector)

    del features_vector
    del files
    gc.collect()


def write_csv(root_dir, path_write):
    genres_dir = dir_genres(root_dir)

    for genre in genres_dir:
        print("Acessing: ", genre)
        write_csv_genre(genre, path_write)





write_csv(root_path + "/Data/genres_original",  root_path + "/Data")