<a href="https://colab.research.google.com/github/MCOENER/BWE_Q_Tranform/blob/master/CQT_ABE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [31]:
import pandas as pd
import numpy as np
import os
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Input
from tensorflow.keras.layers import Dense,InputLayer,Flatten
from tensorflow.keras.models import Sequential, Model
from  matplotlib import pyplot as plt
import matplotlib.image as mpimg

from pathlib import Path
import glob
from glob import glob
from IPython.display import Audio
!pip install pystoi
from pystoi import stoi

import random
from scipy.signal import butter,filtfilt

from keras.models import load_model
from sklearn import decomposition

!pip install essentia
from essentia.standard import (MonoLoader, NSGConstantQ, NSGIConstantQ)
from essentia import lin2db
from essentia import array
import soundfile as sf



In [32]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [33]:
def load_dataset(target_folder, dur, fs):
    res = []
    for dir in os.listdir(target_folder):
        for file in os.listdir(os.path.join(target_folder, dir)):
            if file.endswith(".wav"):
                res.append(os.path.join(target_folder, dir, file))

    #random.shuffle(res)

    np_arr = []
    samples = dur * fs

    for file in res:
        loader = MonoLoader(filename=file, sampleRate=fs)
        audio = loader()

        # Pad or truncate the audio to the desired length
        if len(audio) < samples:
            audio = np.append(audio, np.zeros(samples - len(audio)))
        elif len(audio) > samples:
            audio = audio[:samples]

        np_arr.append(array(audio))

    return np_arr

In [34]:
def aud_to_cqt(np_arr):
    cqt_mat = []
    for i in np_arr:
        # Ensure the input is a one-dimensional numpy array with the correct dtype
        i = array(i)  # dtype is set by default to 'float32' inside array function
        params['inputSize'] = i.size

        # Perform the Constant-Q transform
        constantq, dcchannel, nfchannel = NSGConstantQ(**params)(i)

        # Append the magnitude of the Constant-Q transform to the result
        cqt_mat.append(np.abs(constantq))

    return cqt_mat

In [35]:
def butter_lowpass_filter(data, cutoff, fs, order):
    normal_cutoff = cutoff / (0.5 * fs)
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    y = filtfilt(b, a, data)
    return y

In [36]:
def high_to_low(np_arr,cutoff,fs,order):
  lowpass_nparr=[]
  for i in np_arr:
    arr=butter_lowpass_filter(i, cutoff, fs, order)
    lowpass_nparr.append(arr)
  return lowpass_nparr

In [37]:
def comp_stoi(clean,predicted,fs):
  if(len(clean)>=len(predicted)):
    return stoi(clean[:(len(predicted))],predicted,fs,extended=False)
  return stoi(clean,predicted[:(len(clean))],fs,extended=False)

In [38]:
def equalise(orig,proc):
  if(len(orig)>=len(proc)):
    return orig[:len(proc)],proc
  return orig,proc[:len(orig)]

In [39]:
def test_train_split(target_folder, ratio, dur, fs, cutoff, order, no_cqt, highaud, lowaud):
    highcqt = aud_to_cqt(highaud)
    lowcqt = aud_to_cqt(lowaud)

    x_train = []
    y_train = []
    x_test = []
    y_test = []
    train_size = int(ratio * len(lowaud))

    # Process low CQT for training data
    for i in lowcqt[:train_size]:
        for j in range(len(i[0])):
            arr = []
            for z in range(len(i)):
                arr.append(i[z][j])
            x_train.append(arr)

    train_len = len(x_train)
    x_train = array(x_train).reshape(train_len, no_cqt)

    # Process low CQT for testing data
    x_test = []
    for i in lowcqt[train_size:]:
        for j in range(len(i[0])):
            arr = []
            for z in range(len(i)):
                arr.append(i[z][j])
            x_test.append(arr)

    test_len = len(x_test)
    x_test = array(x_test).reshape(test_len, no_cqt)

    # Process high CQT for training data
    y_train = []
    for i in highcqt[:train_size]:
        for j in range(len(i[0])):
            arr = []
            for z in range(len(i)):
                arr.append(i[z][j])
            y_train.append(arr)

    train_len = len(y_train)
    y_train = array(y_train).reshape(train_len, no_cqt)

    # Process high CQT for testing data
    y_test = []
    for i in highcqt[train_size:]:
        for j in range(len(i[0])):
            arr = []
            for z in range(len(i)):
                arr.append(i[z][j])
            y_test.append(arr)

    test_len = len(y_test)
    y_test = array(y_test).reshape(test_len, no_cqt)

    return x_train, y_train, x_test, y_test

In [40]:
def pred(test_aud, lowaud, highaud, model_path):
    model = load_model(model_path)
    lowaud_test = array(lowaud[test_aud])

    # Compute the CQT for the test audio
    constantq, dcchannel, nfchannel = NSGConstantQ(**params)(lowaud_test)
    lowcqt = np.abs(constantq)

    # Prepare input for the model
    lowcqt_input = np.transpose(lowcqt)
    lowcqt_input = np.expand_dims(lowcqt_input, axis=-1)

    # Predict the high-resolution CQT using the model
    highcqt_pred = model.predict(lowcqt_input)
    highcqt_pred = np.squeeze(highcqt_pred).T

    # Convert the predicted high CQT to complex numbers
    highcqt_complex = np.zeros(constantq.shape, dtype=np.complex64)
    highcqt_complex.real = highcqt_pred
    highcqt_complex.imag = np.zeros_like(highcqt_pred)

    # Reconstruct the audio using the inverse CQT transform
    final_aud = NSGIConstantQ(**params)(highcqt_complex, dcchannel, nfchannel)

    return final_aud

In [41]:
target_folder = '/content/drive/My Drive/EdinburghTest/'

# Parameters
fs = 16000
cutoff = 4100
order = 6
dur = 9
no_cqt = 333 #48 * int(np.log2(6000 / 65.41))
ratio = 0.7
num_classes = no_cqt

params = {
    'minFrequency': 65.41,
    'maxFrequency': 8000,
    'binsPerOctave': 48,
    'minimumWindow': 128,
    'inputSize': fs * dur
}

# Load the dataset
highaud = load_dataset(target_folder, dur, fs)
lowaud = high_to_low(highaud, cutoff, fs, order)
x_train, y_train, x_test, y_test = test_train_split(target_folder, ratio, dur, fs, cutoff, order, no_cqt, highaud, lowaud)

print(x_train)

[[0.129191   0.1131879  0.09722101 ... 0.00205036 0.00365242 0.00386346]
 [0.13408773 0.11502953 0.09575175 ... 0.00112726 0.00172848 0.00258247]
 [0.14014347 0.11928427 0.0982558  ... 0.00498366 0.00242752 0.00355169]
 ...
 [0.0934957  0.09209301 0.09031762 ... 0.00043742 0.00077492 0.00052291]
 [0.11183834 0.10842282 0.10436462 ... 0.00062576 0.0009234  0.00183604]
 [0.12706785 0.12098037 0.11405047 ... 0.02680659 0.02750965 0.01991402]]


In [43]:
model = tf.keras.Sequential(
    [
        tf.keras.Input(shape=(no_cqt, 1)),
        layers.Flatten(),
      #  layers.Dense(900, activation="relu"),
      #  layers.Dense(800, activation="relu"),
      #  layers.Dense(600, activation="relu"),
      #  layers.Dense(400, activation="relu"),
        layers.Dense(300, activation= "relu"),
        layers.Dense(250, activation="relu"),
       # layers.Dense(200, activation="relu"),
       # layers.Dense(250, activation="relu"),
        layers.Dense(300, activation="relu"),
      #  layers.Dense(400, activation="relu"),
      #  layers.Dense(600, activation="relu"),
      #  layers.Dense(800, activation="relu"),
      #  layers.Dense(900, activation="relu"),
        layers.Dense(num_classes, activation="linear"),
    ]
)

model.summary()

Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 flatten_2 (Flatten)         (None, 333)               0         
                                                                 
 dense_12 (Dense)            (None, 300)               100200    
                                                                 
 dense_13 (Dense)            (None, 250)               75250     
                                                                 
 dense_14 (Dense)            (None, 300)               75300     
                                                                 
 dense_15 (Dense)            (None, 333)               100233    
                                                                 
Total params: 350983 (1.34 MB)
Trainable params: 350983 (1.34 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [44]:
batch_size = 36
epochs = 50

# Choosing an appropriate loss function
# CosineSimilarity can be used, but MeanSquaredError might be more appropriate for regression tasks
loss_func = tf.keras.losses.CosineSimilarity(axis=1)
#loss_func = tf.keras.losses.MeanSquaredError()

# Using the Adam optimizer
adam = tf.keras.optimizers.Adam()

# Compiling the model with the chosen loss function and optimizer
model.compile(loss=loss_func, optimizer=adam, metrics=["accuracy"])

# Fit the model
model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.src.callbacks.History at 0x7d4f73a9fd60>

In [45]:
score = model.evaluate(x_test, y_test, verbose=0)
print("Test loss:", score[0])

# If you used additional metrics such as MAE or RMSE, you can print them as well
for i, metric_name in enumerate(model.metrics_names[1:], start=1):
    print(f"Test {metric_name}:", score[i])

Test loss: -0.9754695296287537
Test accuracy: 0.6438784599304199


In [46]:
model.save('curr_model_mse.h5')
model_path = 'curr_model_mse.h5'

In [47]:
test_aud=0

In [48]:
final_aud=pred(test_aud,lowaud,highaud,model_path)



In [49]:
Audio(final_aud,rate=fs)

In [50]:
Audio(lowaud[test_aud],rate=fs)

In [51]:
Audio(highaud[test_aud], rate=fs)

In [52]:
print(comp_stoi(highaud[test_aud],final_aud,fs))

0.7225866164008184


In [53]:
import os
import matplotlib.pyplot as plt
from scipy.signal import spectrogram

# Create a directory to save the audio files and spectrograms if it doesn't exist
save_dir = "/content/drive/My Drive/EdinburghTest/outputs"
os.makedirs(save_dir, exist_ok=True)

# Define a function to save audio files and spectrograms
def save_audio_and_spectrogram(audio, rate, file_name_prefix):
    # Save the audio file
    sf.write(os.path.join(save_dir, f"{file_name_prefix}.wav"), audio, rate)

    # Generate and save the spectrogram
    f, t, Sxx = spectrogram(audio, rate)
    plt.pcolormesh(t, f, 10 * np.log10(Sxx), shading='gouraud')
    plt.ylabel('Frequency [Hz]')
    plt.xlabel('Time [sec]')
    plt.title(f'Spectrogram of {file_name_prefix}')
    plt.colorbar(label='Intensity [dB]')
    plt.savefig(os.path.join(save_dir, f"{file_name_prefix}_spectrogram.png"))
    plt.close()

# Save high audio, low audio, and final audio along with their spectrograms
for i in range(len(lowaud)):
    save_audio_and_spectrogram(lowaud[i], fs, f"low_audio_{i}")
    save_audio_and_spectrogram(highaud[i], fs, f"high_audio_{i}")

    # Generate final audio for each test case
    final_aud = pred(i, lowaud, highaud, model_path)
    save_audio_and_spectrogram(final_aud, fs, f"final_audio_{i}")

print("Audio files and spectrograms saved successfully.")

  plt.pcolormesh(t, f, 10 * np.log10(Sxx), shading='gouraud')


Audio files and spectrograms saved successfully.
