# Import Libraries

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import librosa
import librosa.display
import IPython.display as ipd
import keras
from keras import layers
from keras.models import Sequential
from keras.layers import LSTM
from keras.layers import Dense
from keras.layers import RepeatVector
from keras.layers import TimeDistributed
from keras.callbacks import LearningRateScheduler
from keras.callbacks import ReduceLROnPlateau
from sklearn.preprocessing import StandardScaler
import os
from glob import glob

In [None]:
!pip install pesq
from pesq import pesq

Collecting pesq
  Downloading pesq-0.0.3.tar.gz (35 kB)
Building wheels for collected packages: pesq
  Building wheel for pesq (setup.py) ... [?25l[?25hdone
  Created wheel for pesq: filename=pesq-0.0.3-cp37-cp37m-linux_x86_64.whl size=209961 sha256=c727464a15d65d5558ecccc19007bcccdccdfbe99ebe8a79c5a04915b2d04169
  Stored in directory: /root/.cache/pip/wheels/4f/67/5b/aa7cf31fe0c7199e35c604bb7bc91c629a13726bf221fedba0
Successfully built pesq
Installing collected packages: pesq
Successfully installed pesq-0.0.3


# Load Audio Files

In [None]:
audio_files_train = [y for x in os.walk("Librispeech/dev-clean") for y in glob(os.path.join(x[0], '*.flac'))]

print("Total .flac files: " + str(len(audio_files_train)))

audio_inputs_train = []

for f in audio_files_train:
    try:
        audio, sr = librosa.load(f, sr=16000, duration = 8)
        if(len(audio) == 128000):
            audio_inputs_train.append(audio)
    except:
        pass

print("Total used files: " + str(len(audio_inputs_train)))

Total de arquivos .flac: 3317


In [None]:
audio_files_test = [y for x in os.walk("Librispeech/test-clean") for y in glob(os.path.join(x[0], '*.flac'))]

print("Total .flac files: " + str(len(audio_files_test)))

audio_inputs_test = []

for f in audio_files_test:
    try:
        audio, sr = librosa.load(f, sr=16000, duration = 8)
        if(len(audio) == 128000):
            audio_inputs_test.append(audio)
    except:
        pass

print("Total used files: " + str(len(audio_inputs_test)))

Total de arquivos .flac: 2620
Total de arquivos de áudio utilizados: 859


# Constants

In [None]:
hop_length = 128
n_fft = 512

# Training Data

In [None]:
x_train = []

for elem in audio_inputs_train:
    stft = librosa.stft(elem, n_fft=n_fft, hop_length=hop_length, window='hann')
    x_train.append(np.abs(stft[0:256,0:1000]))
    
x_train = np.stack(x_train)

x_train.shape

(1644, 256, 1000)

# Test Data

In [None]:
x_test = []
x_test_phase = []

for elem in audio_inputs_test:
    stft = librosa.stft(elem, n_fft=n_fft, hop_length=hop_length, window='hann')
    x_test_phase.append(np.angle(stft))
    x_test.append(np.abs(stft[0:256,0:1000]))
    
x_test = np.stack(x_test)

x_test_phase = np.stack(x_test_phase)

x_test.shape

(859, 256, 1000)

# Scaling Data

In [None]:
#Train Data
scaler_train = StandardScaler()

X_train_scaled = []

for elem in x_train:  
    X_train_scaled.append(scaler_train.fit_transform(elem))

X_train_scaled = np.stack(X_train_scaled)

X_train_scaled = X_train_scaled.reshape((-1, X_train_scaled.shape[1], X_train_scaled.shape[2], 1))

In [None]:
#Test Data
scaler_test = StandardScaler()

X_test_scaled = []

for elem in x_test:  
    X_test_scaled.append(scaler_test.fit_transform(elem))

X_test_scaled = np.stack(X_test_scaled)

X_test_scaled = X_test_scaled.reshape((-1, X_test_scaled.shape[1], X_test_scaled.shape[2], 1))

# Convolutional Network

In [None]:
#Callbacks

earlystop = keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, mode="min")

callbacks = [earlystop]

In [None]:
#Convolutional Autoencoder

input = keras.Input(shape=(X_train_scaled.shape[1], X_train_scaled.shape[2],X_train_scaled.shape[3]))

#Encoder
x = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(input)
x = layers.MaxPooling2D((2, 2), padding='same')(x)
x = layers.Conv2D(8, (3, 3), activation='relu', padding='same')(x)
x = layers.MaxPooling2D((2, 2), padding='same')(x)
x = layers.Conv2D(8, (3, 3), activation='relu', padding='same')(x)
encoded = layers.MaxPooling2D((2, 2), padding='same')(x)

#Decoder
x = layers.Conv2D(8, (3, 3), activation='relu', padding='same')(encoded)
x = layers.UpSampling2D((2, 2))(x)
x = layers.Conv2D(8, (3, 3), activation='relu', padding='same')(x)
x = layers.UpSampling2D((2, 2))(x)
x = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(x)
x = layers.UpSampling2D((2, 2))(x)
decoded = layers.Conv2D(1, (3, 3), activation='sigmoid', padding='same')(x)

conv_autoencoder = keras.Model(input, decoded)
conv_autoencoder.compile(optimizer='adam', loss='mse')
conv_autoencoder.summary()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 256, 1000, 1)]    0         
_________________________________________________________________
conv2d (Conv2D)              (None, 256, 1000, 16)     160       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 128, 500, 16)      0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 128, 500, 8)       1160      
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 64, 250, 8)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 64, 250, 8)        584       
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 32, 125, 8)        0     

# Train the Model

In [None]:
#Training

history = conv_autoencoder.fit(
    X_train_scaled,
    X_train_scaled,
    epochs=500,
    batch_size=32,
    validation_split=0.15,
    callbacks=[
        callbacks
    ]
)

Epoch 1/500
Epoch 2/500
Epoch 3/500
Epoch 4/500
Epoch 5/500
Epoch 6/500
Epoch 7/500
Epoch 8/500
Epoch 9/500
Epoch 10/500
Epoch 11/500
Epoch 12/500
Epoch 13/500
Epoch 14/500
Epoch 15/500
Epoch 16/500
Epoch 17/500
Epoch 18/500
Epoch 19/500
Epoch 20/500
Epoch 21/500
Epoch 22/500
Epoch 23/500
Epoch 24/500
Epoch 25/500
Epoch 26/500
Epoch 27/500
Epoch 28/500
Epoch 29/500
Epoch 30/500
Epoch 31/500
Epoch 32/500
Epoch 33/500
Epoch 34/500
Epoch 35/500
Epoch 36/500
Epoch 37/500
Epoch 38/500
Epoch 39/500
Epoch 40/500
Epoch 41/500
Epoch 42/500
Epoch 43/500
Epoch 44/500
Epoch 45/500
Epoch 46/500
Epoch 47/500
Epoch 48/500
Epoch 49/500
Epoch 50/500
Epoch 51/500
Epoch 52/500
Epoch 53/500
Epoch 54/500
Epoch 55/500
Epoch 56/500
Epoch 57/500
Epoch 58/500
Epoch 59/500
Epoch 60/500
Epoch 61/500
Epoch 62/500
Epoch 63/500
Epoch 64/500
Epoch 65/500
Epoch 66/500
Epoch 67/500
Epoch 68/500
Epoch 69/500
Epoch 70/500
Epoch 71/500
Epoch 72/500
Epoch 73/500
Epoch 74/500
Epoch 75/500
Epoch 76/500
Epoch 77/500
Epoch 78

In [None]:
#Plot Loss and Validation Loss

plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")
plt.legend()
plt.show()

In [None]:
#Save the trained model
conv_autoencoder.save("h5_model.h5")

In [None]:
#Load the trained model
conv_autoencoder = keras.models.load_model("h5_model.h5")

# Compress and Decompress the Input

In [None]:
#Submit the Test Dataset to the Trained Autoencoder
x_test_pred = conv_autoencoder.predict(X_test_scaled)

#Reshape the Output
x_test_pred = x_test_pred.reshape((-1, x_test_pred.shape[1], x_test_pred.shape[2]))

# Calculate the Mean Average Error for Reconstruction

In [None]:
train_mae_loss = np.mean(np.abs(x_test_pred - X_test_scaled.reshape((-1, X_test_scaled.shape[1], X_test_scaled.shape[2]))), axis=None)

print("Mean reconstruction error: ", train_mae_loss)

Mean reconstruction error:  0.42215854


# Unscale Data

In [None]:
X_test_unscaled = []

for elem in x_test_pred:  
    X_test_unscaled.append(scaler_test.inverse_transform(elem))

X_test_unscaled = np.stack(X_test_unscaled)

# Pad to original size

In [None]:
X_test_unscaled_pad = []

for elem in X_test_unscaled:  
    X_test_unscaled_pad.append(np.pad(elem, ((1, 0), (1, 0)), 'constant'))

X_test_unscaled_pad = np.stack(X_test_unscaled_pad)

# Reconstruct Audio

In [None]:
 #Calculate the inverse STFT using the Griffin-Lim method that uses only the absolute value of STFT

reconstructed_test_griffinlim = []

for elem in X_test_unscaled_pad:
    reconstructed_test_griffinlim.append(librosa.griffinlim(elem))


# Pesq Metric Calculation

In [None]:
pesq_test_griffin = []

for x, y in zip(audio_inputs_test, reconstructed_test_griffinlim):
     pesq_test_griffin.append(pesq(16000, x, y, 'wb'))

pesq_test_griffin = np.stack(pesq_test_griffin)

In [None]:
pesq_test_griffin.mean()

1.0537065089928535