In [97]:
import os
import librosa
import librosa.display
import numpy as np
import soundfile as sf
import matplotlib.pyplot as plt

In [98]:
def padd_audio(audio, max_duration=5, sample_rate=16000):
    """This function take an audio and padd that audio with zeros"""
    
    max_length = max_duration * sample_rate
    padding_needed = max_length - len(audio)
    pad_left = padding_needed // 2
    pad_right = padding_needed - pad_left
    
    return np.pad(audio, (pad_left, pad_right), 'constant')

In [99]:
def mix_audio(original_audio_path, noise_audio_path, sample_rate=16000):
    """This function take an original audio path and noise audio path and mix it together"""
    
    # Load the original audio
    original_audio, sr = librosa.load(original_audio_path, sr=sample_rate)
    
    #Padd original audio
    original_audio = padd_audio(original_audio, sample_rate=sample_rate)
    
    # Load the noise audio
    noise_audio, sr_noise = librosa.load(noise_audio_path, sr=sample_rate)
    
    # Repeat the noise audio
    noise_audio = np.tile(noise_audio, int(np.ceil(len(original_audio) / len(noise_audio))))

    # Trim the repeated noise audio to match the length of the original audio
    noise_audio = noise_audio[:len(original_audio)]
    
    return original_audio + noise_audio/2

In [100]:
def get_clean_audio(original_audio_path, sample_rate=16000):
    """This function take an original audio path and return padded audio"""
    
    # Load the original audio
    original_audio, sr = librosa.load(original_audio_path, sr=sample_rate)
    
    #Padd original audio
    return padd_audio(original_audio, sample_rate=sample_rate)

In [101]:
sr = 16000  # Sampling rate   

In [102]:
# Randmly chosen noise for each audio
def combine_audio_with_noise(original_audio_dir, noise_audio_dir):
    combination_dict = {}
    noise_audios = os.listdir(noise_audio_dir)
    original_audios = os.listdir(original_audio_dir)
    
    for original_audio in original_audios:
        noise_audio = np.random.choice(noise_audios)
        combination_dict[os.path.join(original_audio_dir, original_audio)] = os.path.join(noise_audio_dir, noise_audio)
        
    return combination_dict

In [103]:
def audio_to_stft(audio, n_fft=1199, hop_length_fft=304):
    # STFT transformation
    stftaudio = librosa.stft(audio, n_fft=n_fft, hop_length=hop_length_fft)

    # Extract magnitude and phase
    stftaudio_magnitude, stftaudio_phase = librosa.magphase(stftaudio)

    # Convert magnitude to dB
    stftaudio_magnitude_db = librosa.amplitude_to_db(stftaudio_magnitude, ref=np.max)
    
    return stftaudio_magnitude_db, stftaudio_phase

In [104]:
def stft_to_audio(stftaudio_magnitude_db, stftaudio_phase, hop_length_fft=304):
# Convert dB back to amplitude
    stftaudio_magnitude_rev = librosa.db_to_amplitude(stftaudio_magnitude_db, ref=1.0)

    # Reconstruct the STFT complex matrix
    audio_reverse_stft = stftaudio_magnitude_rev * stftaudio_phase

    # Inverse STFT to get back the audio signal
    audio_reconstruct = librosa.istft(audio_reverse_stft, hop_length=hop_length_fft)

    return audio_reconstruct / np.max(np.abs(audio_reconstruct))

In [105]:
# Example of normalization function using global min and max values
def normalize(stft, global_min, global_max):
    return (stft - global_min) / (global_max - global_min)

# Example of denormalization function using global min and max values
def denormalize(normalized_stft, global_min, global_max):
    return normalized_stft * (global_max - global_min) + global_min

# Preprocessing the data

In [106]:
audio_noise_pairs = combine_audio_with_noise(os.path.join(os.getcwd(), 'Dataset'), os.path.join(os.getcwd(), 'Noise'))

#Getting all the noisy and clean audio matrices
noisy_audios = np.zeros(len(audio_noise_pairs), dtype=object)
clean_audios = np.zeros(len(audio_noise_pairs), dtype=object)

for index, (audio_dir, noise_dir) in enumerate(audio_noise_pairs.items()):
    noisy_audios[index] = mix_audio(audio_dir, noise_dir, sample_rate=16000)
    clean_audios[index] = get_clean_audio(audio_dir, sample_rate=16000)

In [107]:
#Getting STFT data for learning process
noisy_audios_stft = [audio_to_stft(audio)[0] for audio in noisy_audios]
noisy_audios_stft_phase = [audio_to_stft(audio)[1] for audio in noisy_audios]
clean_audios_stft = [audio_to_stft(audio)[0] for audio in clean_audios]

In [108]:
#Splitting the data for training and test set
split_ratio = 0.9
split_index = int(len(noisy_audios_stft) * split_ratio)

X_train = noisy_audios_stft[:split_index]
X_test = noisy_audios_stft[split_index:]

y_train = clean_audios_stft[:split_index]
y_test = clean_audios_stft[split_index:]

phase_training = noisy_audios_stft_phase[:split_index]
phase_test = noisy_audios_stft_phase[split_index:]

In [109]:
# Find the global min and max values from both the noisy and clean STFT data
global_min = min(np.min(X_train), np.min(y_train))
global_max = max(np.max(X_train), np.max(y_train))

In [110]:
#Normalize data based on the data from training set
X_train_normalized = normalize(X_train, global_min, global_max)
X_test_normalized = normalize(X_test, global_min, global_max)
y_train_normalized = normalize(y_train, global_min, global_max)
y_test_normalized = normalize(y_test, global_min, global_max)

In [111]:
#Transform data to tensors: sample x dimensions(n x n) x channel(1)
X_train_normalized = X_train_normalized[..., np.newaxis]
X_test_normalized = X_test_normalized[..., np.newaxis]
y_train_normalized = y_train_normalized[..., np.newaxis]
y_test_normalized = y_test_normalized[..., np.newaxis]

In [112]:
#np.savez('input_data.npz', array1=X_train_normalized, array2=X_test_normalized, array3=y_train_normalized, array4=y_test_normalized, array5=phase_training, array6=phase_test, array7=global_min, array8=global_max)

# Load the preprocessed data

In [113]:
#data = np.load('input_data.npz')

In [114]:
#X_train_normalized = data['array1']
#X_test_normalized = data['array2']
#y_train_normalized = data['array3']
#y_test_normalized = data['array4']
#phase_training = data['array5']
#phase_test = data['array6']
#global_min = data['array7']
#global_max = data['array8']

In [115]:
def generator(X, y, batch_size=16, epochs=20):
    assert len(X) == len(y), "The length of X and y must be the same"
    
    # Iterate through each epoch
    for epoch in range(epochs):
        indices = np.arange(len(X))
        np.random.shuffle(indices)
        X_shuffled = X[indices]
        y_shuffled = y[indices]
        
        for start in range(0, len(X), batch_size):
            end = min(start + batch_size, len(X))
            batch_X = X_shuffled[start:end]
            batch_y = y_shuffled[start:end]
            
            yield batch_X, batch_y

In [116]:
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, UpSampling2D, BatchNormalization, MaxPooling2D, Input, Dropout
from tensorflow.keras.models import Model

# Encoder
def encoder(inputs):
    x = Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
    x = BatchNormalization()(x)
    x = MaxPooling2D((2, 2))(x)

    x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D((2, 2))(x)

    x = Conv2D(128, (3, 3), activation='relu', padding='same')(x)
    x = BatchNormalization()(x)
    x = MaxPooling2D((2, 2))(x)

    encoded = Conv2D(256, (3, 3), activation='relu', padding='same')(x)
    return encoded

# Decoder
def decoder(encoded):
    x = UpSampling2D((2, 2))(encoded)
    x = Conv2D(128, (3, 3), activation='relu', padding='same')(x)
    x = BatchNormalization()(x)

    x = UpSampling2D((2, 2))(x)
    x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = BatchNormalization()(x)

    x = UpSampling2D((2, 2))(x)
    x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = BatchNormalization()(x)

    decoded = Conv2D(1, (3, 3), activation='sigmoid', padding='same')(x)  # Single channel output
    return decoded

# Input shape
input_shape = (600, 264, 1)
inputs = Input(shape=input_shape)

# Build the autoencoder
encoded = encoder(inputs)
decoded = decoder(encoded)

autoencoder = Model(inputs, decoded)
autoencoder.compile(optimizer='adam', loss='mse')

# Summary of the model
autoencoder.summary()

batch_size = 16
# Fit the model (example)
autoencoder.fit(generator(X_train_normalized, y_train_normalized),
                steps_per_epoch=len(X_train_normalized) // batch_size,
                epochs=20)


Model: "model_10"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_11 (InputLayer)       [(None, 600, 264, 1)]     0         
                                                                 
 conv2d_68 (Conv2D)          (None, 600, 264, 32)      320       
                                                                 
 batch_normalization_44 (Bat  (None, 600, 264, 32)     128       
 chNormalization)                                                
                                                                 
 max_pooling2d_24 (MaxPoolin  (None, 300, 132, 32)     0         
 g2D)                                                            
                                                                 
 conv2d_69 (Conv2D)          (None, 300, 132, 64)      18496     
                                                                 
 batch_normalization_45 (Bat  (None, 300, 132, 64)     256

<keras.callbacks.History at 0x1afb10adb10>

In [117]:
def get_audio_predictions(output_prediction, phase, global_min=global_min, global_max=global_max):
    prediction = denormalize(output_prediction.squeeze(), global_min, global_max)
    audio_prediction = stft_to_audio(prediction, phase, hop_length_fft=304)
    return audio_prediction

In [118]:
predictions = autoencoder.predict(X_test_normalized)



In [119]:
audio_pred = get_audio_predictions(predictions[2], phase_test[2])

In [120]:
sf.write('audio_pred.wav', audio_pred, samplerate=sr)

In [121]:
sf.write('audio_noisy.wav', stft_to_audio(X_test[2], phase_test[2], hop_length_fft=304), samplerate=sr)

In [122]:
autoencoder.save('autoencoder_model.h5')