In [None]:
!pip install PyWavelets

In [None]:
from google.colab import drive
import pandas as pd
import pywt
import os
import numpy as np
import librosa
import matplotlib.pyplot as plt
import soundfile as sf
import wave
from scipy.signal import butter, sosfiltfilt

In [None]:
drive.mount('/content/drive') # Change if needed
database_path = "" # Write the raw_database/ path
denoized_database_path = "" # Write the denoized_database/ path
databases_infos_path = "" # Write a directory path where you are reading and saving .xlsx files with databases informations.

In [None]:
database_info = pd.read_excel(os.path.join(databases_infos_path, "database_info.xlsx"))

In [None]:
print(database_info)

In [None]:
# Complete audio example
example_file_path = os.path.join(database_path, database_info['Subject ID'][0] + ".wav")

In [None]:
# Spectogram example
y, sr = librosa.load(example_file_path)
sr = 16000
print(f'Audio wave size (y): {y.shape}')
print(f'Sample rate (sr): {sr}')

D = librosa.stft(y)
S_db = librosa.amplitude_to_db(abs(D), ref=np.max)

plt.figure(figsize=(14, 6))
librosa.display.specshow(S_db, sr=sr, x_axis='time', y_axis='log')
plt.colorbar(format='%+2.0f dB')
plt.title('Spectrogram')
plt.xlabel('Time')
plt.ylabel('Frequency')

duration = librosa.get_duration(y=y, sr=sr)
ticks = range(0, int(duration)+1, 10)
plt.xticks(ticks, ticks) 

plt.show()

In [None]:
# Wave form example
audio_data, sr = librosa.load(example_file_path)

plt.figure(figsize=(10, 4))
librosa.display.waveshow(audio_data[5000:8000], sr=16000)
plt.title('Forma de Onda')
plt.xlabel('Tempo (s)')
plt.ylabel('Amplitude')
plt.show()

In [None]:
# 10 seconds spectogram example
y, sr = librosa.load(example_file_path, sr=16000)

duracao = 10
y = y[:duracao * sr]

print(f'Audio wave size (y): {y.shape}')
print(f'Sample rate (sr): {sr}')

D = librosa.stft(y)
S_db = librosa.amplitude_to_db(abs(D), ref=np.max)

plt.figure(figsize=(10, 6))
librosa.display.specshow(S_db, sr=sr, x_axis='time', y_axis='log')
plt.colorbar(format='%+2.0f dB')
plt.title('10 seconds spectogram with BPM=132')
plt.xlabel('Time')
plt.ylabel('Frequency')
plt.show()

In [None]:
# Check if audio is normalized
def verify_normalization(subject_id):
  path_audio_completo = os.path.join(database_path, subject_id + ".wav")
  audio, sr = librosa.load(path_audio_completo, sr=16000)

  max_amplitude = np.max(np.abs(audio))

  if max_amplitude <= 1.0:
      return True
  else:
      return False

lista_nao_normalizados = []
for index, coluna in database_info.iterrows():
    if not verify_normalization(coluna['Subject ID']):
      lista_nao_normalizados.append(coluna['Subject ID'])

print(lista_nao_normalizados)


### Filters

In [None]:
def normalize_audio(data):
    max_amplitude = np.max(np.abs(data))
    normalized_data = data / max_amplitude  # Normalize between [-1, 1]
    return normalized_data

In [None]:
def rescale_audio(data, mult=100):
    return data * mult  # Rescale between [-100, 100]

In [None]:
def butter_bandpass_filter(data, lowcut=20, highcut=120, order=6, fs=16000):
    nyquist = 0.5 * fs # Nyquist ensures that the filter operates correctly within the limits of audible frequencies and prevents distortions during audio processing.
    low = lowcut / nyquist
    high = highcut / nyquist

    if not (0 < low < high < 1):
      raise ValueError("Invalid cutoff frequencies. Ensure that 0 < low < high < 1.")

    if not np.isfinite(data).all():
        raise ValueError("The input contains NaN or Inf values.")

    sos = butter(order, [low, high], btype='band', output='sos')
    y = sosfiltfilt(sos, data)

    if not np.isfinite(y).all():
        raise ValueError("The filter generated non-finite values (NaN or Inf) in the signal.")


    return y

In [None]:
def estimate_sigma(detail_coeffs):
    return np.median(np.abs(detail_coeffs)) / 0.6745

def wavelet_filter(data, wavelet='coif4', level=7):
    coeffs = pywt.wavedec(data, wavelet, level=level) # wavelet decomposition
    sigma = estimate_sigma(coeffs[-1]) # Estimate the noise sigma using the detail coefficients at the first level
    threshold = sigma * np.sqrt(2 * np.log(len(data))) # Calculate the universal threshold
    thresholded_coeffs = coeffs.copy() # Apply soft thresholding to the detail coefficients
    for i in range(1, len(coeffs)):
        thresholded_coeffs[i] = pywt.threshold(coeffs[i], threshold, mode='soft')

    denoised_data = pywt.waverec(thresholded_coeffs, wavelet) # Reconstruct the signal using the thresholded coefficients


    denoised_data = denoised_data[:len(data)] # Truncate or pad the reconstructed signal to match the original length

    return denoised_data


In [None]:
def apply_tranformations(file_path, transformations):
    data, sample_rate = librosa.load(file_path, sr=16000)

    transformed_data = data
    for transform in transformations:
        transformed_data = transform(transformed_data)

    return transformed_data

Assembling 10 second instances and placing them in a directory.

In [None]:
# Delete the created files that are in the output folder.

# def apagar_conteudo_diretorio(diretorio):
#     # Verifica se o diretório existe
#     if os.path.exists(diretorio):
#         # Lista todos os arquivos e subdiretórios no diretório
#         for filename in os.listdir(diretorio):
#             file_path = os.path.join(diretorio, filename)
#             try:
#                 # Se for um arquivo ou link simbólico, apaga
#                 if os.path.isfile(file_path) or os.path.islink(file_path):
#                     os.unlink(file_path)
#                 # Se for um diretório, apaga recursivamente
#                 elif os.path.isdir(file_path):
#                     apagar_conteudo_diretorio(file_path)
#                     os.rmdir(file_path)  # Remove o diretório vazio
#             except Exception as e:
#                 print(f"Erro ao apagar {file_path}. Detalhes: {e}")
#     else:
#         print(f"O diretório {diretorio} não existe.")

# apagar_conteudo_diretorio(denoized_database_path)

In [None]:
def separate_audio(subject_id, bpm_completo):
  lista_bpm = bpm_completo.split('-')


  path_audio_completo = os.path.join(database_path, subject_id + ".wav")
  audio = apply_tranformations(path_audio_completo, [normalize_audio, butter_bandpass_filter, wavelet_filter])

  duracao = 10 * sr

  ids = []
  bpms = []

  ponteiro_inicio_audio = 0
  contador_segmentos = 0
  for bpm in lista_bpm:
    if bpm != "[]":
      ponteiro_final_audio = ponteiro_inicio_audio + duracao
      segmento = audio[ponteiro_inicio_audio:ponteiro_final_audio]
      path_saida = os.path.join(denoized_database_path , f'{subject_id}_{contador_segmentos}_{bpm}.wav')
      sf.write(path_saida, segmento, sr)
      ids.append(f'{subject_id}_{contador_segmentos}')
      bpms.append(bpm)
      contador_segmentos += 1
    ponteiro_inicio_audio += duracao

  return pd.DataFrame({'ID': ids, 'BPM': bpms})



In [None]:
lista_df_segmentos = []

for index, coluna in database_info.iterrows():
    df_segmentos = separate_audio(coluna['Subject ID'], coluna['BPM'])
    lista_df_segmentos.append(df_segmentos)

df = pd.concat(lista_df_segmentos, ignore_index=True)

In [None]:
print(df.to_string())

In [None]:
def get_wav_info(file_name):
    with wave.open(file_name, 'rb') as wav_file:

        num_channels = wav_file.getnchannels()
        sample_width = wav_file.getsampwidth()
        frame_rate = wav_file.getframerate()
        num_frames = wav_file.getnframes()
        duration = num_frames / frame_rate
        return num_channels, sample_width, frame_rate, num_frames, duration

In [None]:
num_channels_list = []
sample_width_list = []
frame_rate_list = []
num_frames_list = []
duration_list = []

for index, coluna in df.iterrows():
    file_name = os.path.join(denoized_database_path, f"{coluna['ID'] + '_' + coluna['BPM']}.wav")
    num_channels, sample_width, frame_rate, num_frames, duration = get_wav_info(file_name)
    num_channels_list.append(num_channels)
    sample_width_list.append(sample_width)
    frame_rate_list.append(frame_rate)
    num_frames_list.append(num_frames)
    duration_list.append(duration)

df['num_channels'] = num_channels_list
df['sample_width'] = sample_width_list
df['frame_rate'] = frame_rate_list
df['num_frames'] = num_frames_list
df['duration'] = duration_list

In [None]:
print(len(df))

In [None]:
print(df.to_string())

In [None]:
print(df["num_channels"].value_counts())
print(df["sample_width"].value_counts())
print(df["frame_rate"].value_counts())
print(df["num_frames"].value_counts())
print(df["duration"].value_counts())

In [None]:
# Delete segments shorter than 10 seconds
instancias_com_pouca_duracao = df[df["duration"] < 10]

for index, coluna in instancias_com_pouca_duracao.iterrows():
  os.remove(os.path.join(denoized_database_path, f"{coluna['ID'] + '_' + coluna['BPM']}.wav"))
  df = df.drop(index)


In [None]:
print(df["num_channels"].value_counts())
print(df["sample_width"].value_counts())
print(df["frame_rate"].value_counts())
print(df["num_frames"].value_counts())
print(df["duration"].value_counts())

In [None]:
df.to_excel(os.path.join(databases_infos_path, "instances.xlsx"))