# a project to enhance the apparent quality of microphone in real time

## loading the useful libraries

In [None]:
#this notebook is able to run both locally and in google colab
#if running in google colab, some additional actions need to be performed
#The variable IN_COLAB tells the code whether to perform those actions
import sys
IN_COLAB = 'google.colab' in sys.modules
IN_COLAB

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os

In [None]:
import librosa
import torchaudio
from IPython.display import Audio #play back the signal (original waveform)
import  IPython




In [None]:
#install audio_preprocessing from github - this is necessary if running in Google Colab, otherwise not necessary
if (IN_COLAB):
   #!git clone https://github.com/RomanZhvanskiy/microphone_enhancer_gh.git
   !git -C "microphone_enhancer_gh" pull || git clone https://github.com/RomanZhvanskiy/microphone_enhancer_gh.git "microphone_enhancer_gh"


In [None]:
if (IN_COLAB):  # it is also necessary to change directory in Google Colab to load audio_preprocessing
  %cd /content/microphone_enhancer_gh/

In [None]:
if (IN_COLAB): # switch to the appropriate branch
  !git checkout better_models_and_gridsearch
  !git pull

In [None]:
%load_ext autoreload
%autoreload 2
from audio_preprocessing import preprocessing as pp


In [None]:
!pwd

In [None]:
if (IN_COLAB):  # the training data is loaded in the google drive for the purpose of being used in google colab
  from google.colab import drive
  drive.mount('/content/gdrive')




In [None]:
#!ls -la /content/gdrive/MyDrive/'Colab Notebooks'/data_audio/VCTK-Corpus/wav48


In [None]:
#os.listdir("/content/gdrive/MyDrive/Colab Notebooks/data_audio/VCTK-Corpus/wav48")

## initial exploration

### getting the data

In [None]:
#sample Audio and text exploration

In [None]:
#get_speech()
x, sr  = pp.get_speech(working_in_google_colab = IN_COLAB)
print('sampling rate', sr, 'hz')
print('number of samples', len(x))
print('duration', round(len(x)/sr, 2), 'seconds')

In [None]:
x.shape

### Convert waveform to MEL spectrogram

In [None]:
spectrogram = pp.waveform_2_spectrogram (x,sr)



In [None]:
spectrogram.shape

In [None]:
type(spectrogram)

In [None]:
spectrogram

### Degrade quality

In [None]:
spectrogram = pp.waveform_2_spectrogram (x,sr)
print (f"spectrogram.shape = {spectrogram.shape}")
#remove frequencies (our simulated bad microphone cannot capture low and high f)
degraded_x = pp.mel_spectrogram_remove_frequency(
            spectrogram,
            sr,
            remove_above=3000.0,
            remove_below=100.0,
            debug=0)

print (f"degraded_x.shape = {degraded_x.shape}")

#remove quiet sounds  (our simulated bad microphone cannot capture quiet sounds)
degraded_x = pp.mel_spectrogram_remove_quiet_sounds (
            degraded_x,
            sr,
            remove_below=0.5,
            debug=0)

#add noise (our simulated bad microphone also captures noize)
degraded_x = pp.mel_spectrogram_add_noise(degraded_x,
            sr,
            relative_noise_level=0.1,
            add_above=100.0,
            add_below=3000.0,
            debug=0)

reconstructed_degraded_x = pp.spectrogram_2_waveform (degraded_x, sr=sr)

print (f"reconstructed_degraded_x.shape = {reconstructed_degraded_x.shape}")


In [None]:
def degrade_quaity(spectrogram, sr, upper_limit=3000.0, lower_limit=100.0, insensitive_level = 0.5,relative_noise_level=0.1, debug=0):
    degraded_spectrogram = pp.mel_spectrogram_remove_frequency(
            spectrogram,
            sr,
            remove_above=upper_limit,
            remove_below=lower_limit,
            debug=debug)


    #remove quiet sounds  (our simulated bad microphone cannot capture quiet sounds)
    degraded_spectrogram = pp.mel_spectrogram_remove_quiet_sounds (
            degraded_spectrogram,
            sr,
            remove_below=insensitive_level,
            debug=debug)

    #add noise (our simulated bad microphone also captures noize)
    degraded_spectrogram = pp.mel_spectrogram_add_noise(degraded_spectrogram,
            sr,
            relative_noise_level=relative_noise_level,
            add_above=lower_limit,
            add_below=upper_limit,
            debug=debug)
    return degraded_spectrogram

### Convert MEL spectrogram to waveform

In [None]:
reconstructed_x = pp.spectrogram_2_waveform (spectrogram, sr=sr)

### compare quality

In [None]:
print ("original audio")
IPython.display.display(IPython.display.Audio(data=x,  rate=sr))

print ("audio which has been converted to spectrogram and back")
IPython.display.display(IPython.display.Audio(data=reconstructed_x,  rate=sr))

print ("audio which has been converted to spectrogram, degraded and converted back")
IPython.display.display(IPython.display.Audio(data=reconstructed_degraded_x,  rate=sr))


### plot spectrograms

In [None]:
pp.plot_mel_spectrogram(spectrogram,sr)

In [None]:
pp.plot_mel_spectrogram(degraded_x,sr)

In [None]:
#https://datasciencedojo.com/blog/python-libraries-for-generative-ai/#
#https://huggingface.co/docs/diffusers/tutorials/basic_training
#library for distortions
#https://github.com/iver56/audiomentations?tab=readme-ov-file

## training a simple model

### data preparation

In [None]:
large_data, sr = pp.get_all_speech_as_one_mel(num_spectrograms=100, num_speaker =0, debug = 1,working_in_google_colab = IN_COLAB)

In [None]:
train_sg, test_sg = pp.split_spectrogram_in_train_and_test(large_data,0.2, debug=1)

In [None]:
#degrade quality of both train and test
degraded_train_sg =degrade_quaity(train_sg, sr )
degraded_test_sg =degrade_quaity(test_sg, sr )




In [None]:

reconstructed_test = pp.spectrogram_2_waveform (test_sg, sr=sr)
reconstructed_degraded_test = pp.spectrogram_2_waveform (degraded_test_sg, sr=sr)

In [None]:
reconstructed_train = pp.spectrogram_2_waveform (train_sg, sr=sr)
reconstructed_degraded_train = pp.spectrogram_2_waveform (degraded_train_sg, sr=sr)

In [None]:
#the above can take a long time on large datasets, so I'll save the results to file
np.savetxt(fname="/content/gdrive/MyDrive/Colab Notebooks/data_audio/degraded_train_sg.sg", X=degraded_train_sg)
np.savetxt(fname="/content/gdrive/MyDrive/Colab Notebooks/data_audio/degraded_test_sg.sg", X=degraded_test_sg)
np.savetxt(fname="/content/gdrive/MyDrive/Colab Notebooks/data_audio/degraded_train_sg.sg", X=degraded_train_sg)
np.savetxt(fname="/content/gdrive/MyDrive/Colab Notebooks/data_audio/degraded_test_sg.sg", X=degraded_test_sg)

In [None]:

print ("audio reconstructed_test")
IPython.display.display(IPython.display.Audio(reconstructed_test,  rate=sr))
pp.plot_mel_spectrogram(test_sg,sr, figsize=(2,2))

print ("audio degraded_test_sg")
IPython.display.display(IPython.display.Audio(reconstructed_degraded_test,  rate=sr))
pp.plot_mel_spectrogram(degraded_train_sg,sr, figsize=(2,2))


### model - simple autoencoder

let us start with the simplest possible model - restoring 1 column of MEL spectrogram (256 entries)
this loses out on the previous time snapshots, but should be simple to train
Accordingly, instead of Conv2d, we will have Conv1d

In [None]:
from tensorflow.keras import models
from tensorflow.keras import layers
from tensorflow.keras.layers import Dense
#from tensorflow.keras.layers import Conv2D, Conv1D, MaxPooling2D, Flatten, Dense
#from tensorflow.keras.layers import MaxPooling1D



In [None]:

def build_encoder():
    '''returns an encoder model, of output_shape equals to latent_dimension'''
    encoder = models.Sequential()
    encoder.add(layers.Dense(100, input_dim=256, activation='tanh'))

    return encoder


In [None]:
encoder = build_encoder()
encoder.summary()

In [None]:

def build_decoder():
    decoder = models.Sequential()

    decoder.add(layers.Dense(256, input_dim=100, activation='relu'))

    return decoder


In [None]:
decoder = build_decoder()
decoder.summary()

In [None]:
from tensorflow.keras.optimizers import Adam

autoencoder = models.Sequential([encoder, decoder])
#autoencoder.compile(loss="mse", optimizer=Adam(learning_rate=0.1))
optimizer = Adam()

autoencoder.compile(loss='mse',
                  optimizer=optimizer,
                  metrics=['mse'])


In [None]:
autoencoder.summary()

In [None]:
train_sg_t =  np.transpose(train_sg)

In [None]:
degraded_train_sg_t =  np.transpose(degraded_train_sg)

### train the model

In [None]:
import tensorflow as tf
def reinitialize(model):
    for l in model.layers:
        if isinstance(l, tf.keras.Model):
            reinitialize(l)
            continue
        if hasattr(l,"kernel_initializer"):
            l.kernel.assign(l.kernel_initializer(tf.shape(l.kernel)))
        if hasattr(l,"bias_initializer"):
            l.bias.assign(l.bias_initializer(tf.shape(l.bias)))
        if hasattr(l,"recurrent_initializer"):
            l.recurrent_kernel.assign(l.recurrent_initializer(tf.shape(l.recurrent_kernel)))

In [None]:
reinitialize(autoencoder)

In [None]:


history = autoencoder.fit(train_sg_t, train_sg_t ,
                           validation_split = 0.2,
                           epochs=100,
                           batch_size=32,
                           workers=3,
                           use_multiprocessing=True,
                           verbose=1)

In [None]:
hist_df = pd.DataFrame(history.history)

headers = list(hist_df.columns.values)


plot = hist_df[[headers[0], headers[2]]].plot(title=f"{headers[0]}, {headers[2]}", logy=True)
#plot = hist_df[[headers[1], headers[3]]].plot(title=f"{headers[1]}, {headers[3]}", logy=False)

### save the model

In [None]:
models.save_model(autoencoder, 'autoencoder_001a')


### how does the model sound?

In [None]:
restored_test_sg_t = autoencoder.predict(np.transpose(degraded_test_sg))

In [None]:
reconstructed_restored_test = pp.spectrogram_2_waveform (np.transpose(restored_test_sg_t), sr=sr)

In [None]:
print ("reconstructed_test")
IPython.display.display(IPython.display.Audio(data=reconstructed_test,  rate=sr))
pp.plot_mel_spectrogram(test_sg,sr, figsize=(2,2))


print ("reconstructed_restored_degraded_test")
IPython.display.display(IPython.display.Audio(data=reconstructed_restored_test,  rate=sr))
pp.plot_mel_spectrogram(np.transpose(restored_test_sg_t),sr, figsize=(2,2))

# even simpler (10, 256) model training


In [None]:
### simper model

def build_the_simplest_model_possible():
    simplest_model = models.Sequential()
    simplest_model.add(layers.Dense(10, input_dim=256, activation='relu'))
    simplest_model.add(layers.Dense(256, input_dim=100, activation='relu'))
    return simplest_model
simplest_model = build_the_simplest_model_possible()
optimizer = Adam()

simplest_model.compile(loss='mse',
                  optimizer=optimizer,
                  metrics=['mse'])


In [None]:
# Cache to RAM to speed up the training. This requires arrays to be converted to Datasets
from tensorflow import data
#from tensorflow.data.Dataset import cache

train_dataset = data.Dataset.from_tensor_slices((np.transpose(train_sg), np.transpose(train_sg))).batch(10000)
validation_dataset = data.Dataset.from_tensor_slices((np.transpose(test_sg), np.transpose(test_sg))).batch(3000)


AUTOTUNE = data.experimental.AUTOTUNE
train_dataset = train_dataset.cache().prefetch(buffer_size=AUTOTUNE)
validation_dataset = validation_dataset.cache().prefetch(buffer_size=AUTOTUNE)

#val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)

In [None]:
print (f"train_sg.shape={train_sg.shape}")
print (f"test_sg.shape={test_sg.shape}")b

In [None]:
from tensorflow.keras import callbacks
from keras.callbacks import EarlyStopping
from keras.callbacks import BackupAndRestore


es = EarlyStopping(
    monitor="val_loss",
    patience=10,
    restore_best_weights=True,
    verbose=0
)

br = BackupAndRestore(
    backup_dir="training_backup",
    save_freq="epoch",
    delete_checkpoint=True
)
reinitialize(simplest_model)
history = simplest_model.fit( x=train_dataset,
                           batch_size=4,
                           validation_data=validation_dataset,
                           epochs=1000,
                           verbose=1,
                           workers=24,
                           callbacks=[es,br],
                           use_multiprocessing=True)

In [None]:
hist_df = pd.DataFrame(history.history)
headers = list(hist_df.columns.values)
plot = hist_df[[headers[0], headers[2]]].plot(title=f"{headers[0]}, {headers[2]}", logy=True)

In [None]:
models.save_model(simplest_model, 'autoencoder_baseline_10-256-trained-on-good')

restored_test_sg_t = simplest_model.predict(np.transpose(degraded_test_sg))

print ("reconstructed_test")
IPython.display.display(IPython.display.Audio(data=reconstructed_test,  rate=sr))
pp.plot_mel_spectrogram(test_sg,sr, figsize=(2,2))


print ("reconstructed_restored_degraded_test")
IPython.display.display(IPython.display.Audio(data=reconstructed_restored_test,  rate=sr))
pp.plot_mel_spectrogram(np.transpose(restored_test_sg_t),sr, figsize=(2,2))

# how about training the simpler model on degraded rather than perfect audio?

In [None]:
# Cache to RAM to speed up the training. This requires arrays to be converted to Datasets
from tensorflow import data
#from tensorflow.data.Dataset import cache

train_dataset = data.Dataset.from_tensor_slices((np.transpose(degraded_train_sg), np.transpose(train_sg))).batch(10000)
validation_dataset = data.Dataset.from_tensor_slices((np.transpose(degraded_test_sg), np.transpose(test_sg))).batch(3000)


AUTOTUNE = data.experimental.AUTOTUNE
train_dataset = train_dataset.cache().prefetch(buffer_size=AUTOTUNE)
validation_dataset = validation_dataset.cache().prefetch(buffer_size=AUTOTUNE)

#val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)

In [None]:
es = EarlyStopping(
    monitor="val_loss",
    patience=10,
    restore_best_weights=True,
    verbose=0
)

br = BackupAndRestore(
    backup_dir="training_backup",
    save_freq="epoch",
    delete_checkpoint=True
)

history = simplest_model.fit( x=train_dataset,
                           batch_size=4,
                           validation_data=validation_dataset,
                           epochs=1000,
                           verbose=1,
                           workers=24,
                           callbacks=[es,br],
                           use_multiprocessing=True)

In [None]:
hist_df = pd.DataFrame(history.history)
headers = list(hist_df.columns.values)
plot = hist_df[[headers[0], headers[2]]].plot(title=f"{headers[0]}, {headers[2]}", logy=True)

In [None]:
models.save_model(simplest_model, 'autoencoder_baseline_10-256-trained-on_degraded')

restored_test_sg_t = simplest_model.predict(np.transpose(degraded_test_sg))

print ("reconstructed_test")
IPython.display.display(IPython.display.Audio(data=reconstructed_test,  rate=sr))
pp.plot_mel_spectrogram(test_sg,sr, figsize=(2,2))


print ("reconstructed_restored_degraded_test")
IPython.display.display(IPython.display.Audio(data=reconstructed_restored_test,  rate=sr))
pp.plot_mel_spectrogram(np.transpose(restored_test_sg_t),sr, figsize=(2,2))

# try convolutional autoencoder


In [None]:
def build_convolutional_autoencoder():
    conv_ac = models.Sequential()
    conv_ac.add(layers.Reshape([256,1], input_shape=[256]))
    conv_ac.add(layers.Conv1D(16, kernel_size=6, padding="same", input_dim=[256,1], activation='selu'))
    conv_ac.add(layers.MaxPool1D(pool_size=4))
    conv_ac.add(layers.Conv1D(32, kernel_size=6, padding="same", activation='selu'))
    conv_ac.add(layers.MaxPool1D(pool_size=4))
    conv_ac.add(layers.Conv1D(64, kernel_size=6, padding="same", activation='selu'))
    conv_ac.add(layers.MaxPool1D(pool_size=4))

    conv_ac.add(layers.Conv1DTranspose(32, kernel_size=6, strides=4, padding="same", activation='selu'))
    conv_ac.add(layers.Conv1DTranspose(16, kernel_size=6, strides=4, padding="same", activation='selu'))
    conv_ac.add(layers.Conv1DTranspose(1, kernel_size=6, strides=4, padding="same", activation='relu'))


    return conv_ac
conv_ac = build_convolutional_autoencoder()
optimizer = Adam()

conv_ac.compile(loss='mse',
                  optimizer=optimizer,
                  metrics=['mse'])

In [None]:
conv_ac.summary()

In [None]:
# Cache to RAM to speed up the training. This requires arrays to be converted to Datasets
from tensorflow import data
#from tensorflow.data.Dataset import cache

train_dataset = data.Dataset.from_tensor_slices((np.transpose(degraded_train_sg), np.transpose(train_sg))).batch(10000)
validation_dataset = data.Dataset.from_tensor_slices((np.transpose(degraded_test_sg), np.transpose(test_sg))).batch(3000)


AUTOTUNE = data.experimental.AUTOTUNE
train_dataset = train_dataset.cache().prefetch(buffer_size=AUTOTUNE)
validation_dataset = validation_dataset.cache().prefetch(buffer_size=AUTOTUNE)

#val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)

In [None]:
es = EarlyStopping(
    monitor="val_loss",
    patience=10,
    restore_best_weights=True,
    verbose=0
)

br = BackupAndRestore(
    backup_dir="training_backup",
    save_freq="epoch",
    delete_checkpoint=True
)

history = conv_ac.fit( x=train_dataset,
                           batch_size=4,
                           validation_data=validation_dataset,
                           epochs=100,
                           verbose=1,
                           workers=24,
                           callbacks=[es,br],
                           use_multiprocessing=True)

In [None]:
hist_df = pd.DataFrame(history.history)
headers = list(hist_df.columns.values)
plot = hist_df[[headers[0], headers[2]]].plot(title=f"{headers[0]}, {headers[2]}", logy=True)

In [None]:
restored_test_sg_t = conv_ac.predict(np.transpose(degraded_test_sg))


In [None]:
restored_test_sg_t.shape

In [None]:
reconstructed_test.shape

In [None]:
models.save_model(conv_ac, 'conv_16-32-64-32-16-1')

restored_test_sg_t = conv_ac.predict(np.transpose(degraded_test_sg))[:, :, 0]

print ("reconstructed_test")
IPython.display.display(IPython.display.Audio(data=reconstructed_test,  rate=sr))
pp.plot_mel_spectrogram(test_sg,sr, figsize=(2,2))


print ("reconstructed_restored_degraded_test")
IPython.display.display(IPython.display.Audio(data=np.transpose(restored_test_sg_t),  rate=sr))
pp.plot_mel_spectrogram(np.transpose(restored_test_sg_t),sr, figsize=(2,2))

# Simple autoencoder (100, 256 dense) model training


In [None]:
reinitialize(autoencoder)

In [None]:
history = autoencoder.fit( x=train_dataset,
                           batch_size=4,
                           validation_data=validation_dataset,
                           epochs=10000,
                           verbose=1,
                           workers=24,
                           use_multiprocessing=True)

In [None]:
hist_df = pd.DataFrame(history.history)
headers = list(hist_df.columns.values)
plot = hist_df[[headers[0], headers[2]]].plot(title=f"{headers[0]}, {headers[2]}", logy=True)

In [None]:
models.save_model(autoencoder, 'autoencoder_baseline_10000')

In [None]:
restored_test_sg_t = autoencoder.predict(np.transpose(degraded_test_sg))

In [None]:
print ("reconstructed_test")
IPython.display.display(IPython.display.Audio(data=reconstructed_test,  rate=sr))
pp.plot_mel_spectrogram(test_sg,sr, figsize=(2,2))


print ("reconstructed_restored_degraded_test")
IPython.display.display(IPython.display.Audio(data=reconstructed_restored_test,  rate=sr))
pp.plot_mel_spectrogram(np.transpose(restored_test_sg_t),sr, figsize=(2,2))

In [None]:
from tensorflow.keras import callbacks
from keras.callbacks import EarlyStopping
from keras.callbacks import BackupAndRestore



## same  model with reg

## same  model with dropout

## same  model with dropout & reg

In [None]:
models.save_model(autoencoder_do_reg, 'autoencoder_002')

In [None]:
models.save_model(autoencoder_do_reg_selu, 'autoencoder_003')