In [1]:
from google.colab import drive
drive._mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import zipfile

x_zip_path = "/content/drive/MyDrive/Colab Notebooks/speech denoiser/x_train_noised_speech.zip"
y_zip_path = "/content/drive/MyDrive/Colab Notebooks/speech denoiser/y_train_clean_audio.zip"

with zipfile.ZipFile(x_zip_path, 'r') as x_zip_ref, zipfile.ZipFile(y_zip_path, 'r') as y_zip_ref:
    x_file_list = [f for f in x_zip_ref.namelist() if f.endswith(".npy")]
    y_file_list = [f for f in y_zip_ref.namelist() if f.endswith(".npy")]

# Ensure that the lists are sorted to match X and Y pairs
x_file_list.sort()
y_file_list.sort()

In [4]:
len(y_file_list)

10679

In [1]:
def process_spectrogram(spectrogram, final_shape):
  # Pad the spectrogram to match the desired final shape
  if spectrogram.shape[1] < final_shape[1]:
    # Pad the spectrogram to match the desired final shape
    pad_width = ((0, 0), (0, final_shape[1] - spectrogram.shape[1]))
    processed_spectrogram = np.pad(spectrogram, pad_width, mode='constant', constant_values=0)
  elif spectrogram.shape[1] > final_shape[1]:
    # Trim the spectrogram to match the desired final shape
    processed_spectrogram = spectrogram[:final_shape[0], :final_shape[1]]
  else:
    processed_spectrogram = spectrogram  # No change needed if the shape is already as desired
  # Append the padded spectrogram to the x_train list
  processed_spectrogram = processed_spectrogram[:final_shape[0], :]

  return processed_spectrogram




In [22]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import backend as k
import matplotlib.pyplot as plt
import os
import pickle

class UNET:
    """
    Unet represents a Deep Convolutional encoder decoder architecture
    with skip connection.
    """
    def __init__(self,
                 input_shape,
                 conv_filters,
                 conv_kernels,):
        self.input_shape = input_shape # [256, 256, 1]
        self.conv_filters = conv_filters # [64, 1280, 256, 512]
        self.conv_kernels = conv_kernels # (3, 3)


        self.model = None

        self._model_input = None

        self._build()

    def summary(self):
        self.model.summary()

    def compile(self, learning_rate=0.001):
        optimizer = Adam(learning_rate=learning_rate)
        binary_loss = tf.keras.losses.BinaryCrossentropy()
        self.model.compile(optimizer=optimizer,
                           loss=binary_loss,
                           metrics=[self.iou,])

    def train(self, x_train, y_train, batch_size, num_epochs, callbacks):
        history=self.model.fit(x_train,
                       y_train,
                       batch_size=batch_size,
                       epochs=num_epochs,
                       callbacks=callbacks,
                       validation_split=0.2,
                       )
        return history

    def train_generator(self, train_generator, valid_generator, batch_size, num_epochs, callbacks):
        history=self.model.fit(
                      train_generator,
                      batch_size = batch_size,
                      epochs=num_epochs,
                      callbacks=callbacks,
                      validation_data = valid_generator,
                       )
        return history

    def save(self, save_folder):
        self._create_folder_if_it_doesnt_exist(save_folder)
        self._save_parameters(save_folder)
        self._save_weights(save_folder)

    def load_weights(self, weights_path):
        self.model.load_weights(weights_path)

    # For predicitons
    def reconstruct(self, spectrograms):
        predictions = self.model.predict(spectrograms)
        return predictions

    # Plots the graph for training and validation loss and iou score
    def plot_loss(self, history):
        plt.plot(history.history['loss'],label="loss")
        plt.plot(history.history['val_loss'],label="validation loss")
        plt.legend()
        plt.show()

    def plot_iou(self, history):
        plt.plot(history.history['iou'],label="iou")
        plt.plot(history.history['val_iou'],label="validation iou")
        plt.legend()
        plt.show()

    # Calculates the Iou score matrics
    def iou(self, y_true, y_pred, smooth = 1):
        y_true = k.flatten(y_true)
        y_pred = k.flatten(y_pred)
        intersection = k.sum(y_true*y_pred)
        union = k.sum(y_true)+k.sum(y_pred)-intersection
        iou_score = (intersection+smooth)/(union+smooth)
        return iou_score

    def _create_folder_if_it_doesnt_exist(self, folder):
        if not os.path.exists(folder):
            os.makedirs(folder)

    def _save_parameters(self, save_folder):
        parameters = [
            self.input_shape,
            self.conv_filters,
            self.conv_kernels,
        ]
        save_path = os.path.join(save_folder, "parameters.pkl")
        with open(save_path, "wb") as f:
            pickle.dump(parameters, f)

    def _save_weights(self, save_folder):
        save_path = os.path.join(save_folder, "weights.h5")
        self.model.save_weights(save_path)

    def _build(self):
        self._build_unet()

    # Define the U-Net architecture
    def _build_unet(self):
        inputs = tf.keras.Input(self.input_shape)

        conv1, pool1 = self._downsample_block(inputs, 64, (3, 3))
        conv2, pool2 = self._downsample_block(pool1, 128, (3, 3))
        conv3, pool3 = self._downsample_block(pool2, 256, (3, 3))
        conv4, pool4 = self._downsample_block(pool3, 512, (3, 3))

        conv5 = layers.Conv2D(1024, (3, 3), activation='relu', padding='same')(pool4)
        conv5 = layers.Conv2D(1024, (3, 3), activation='relu', padding='same')(conv5)

        conv6 = self._upsample_block(conv5, conv4, 512, (3, 3))
        conv7 = self._upsample_block(conv6, conv3, 256, (3, 3))
        conv8 = self._upsample_block(conv7, conv2, 128, (3, 3))
        conv9 = self._upsample_block(conv8, conv1, 64, (3, 3))

        outputs = layers.Conv2D(1, 1, activation='sigmoid')(conv9)

        self.model = models.Model(inputs=inputs, outputs=outputs, name = 'Unet')

    # Downsampling block (encoder)
    def _downsample_block(self, input_layer, filters, kernel_size, padding='same', activation='relu'):
        conv1 = layers.Conv2D(filters, kernel_size, activation=activation, padding=padding)(input_layer)
        conv1 = layers.Dropout(0.1)(conv1)
        conv1 = layers.Conv2D(filters, kernel_size, activation=activation, padding=padding)(conv1)
        b1 = layers.BatchNormalization()(conv1)
        r1 = layers.ReLU()(b1)
        pool = layers.MaxPooling2D(pool_size=(2, 2))(r1)
        print("Downsample block shape is: " ,conv1.shape)
        return conv1, pool

    # Upsampling block
    def _upsample_block(self, input_layer, skip_connection, filters, kernel_size, padding='same', activation='relu'):
        up = layers.UpSampling2D(size=(2, 2))(input_layer)
        up = layers.Conv2DTranspose(filters, kernel_size, activation=activation, padding=padding)(up)
        merge = layers.concatenate([up, skip_connection], axis=3)
        conv = layers.Conv2D(filters, 3, activation=activation, padding=padding)(merge)
        conv = layers.Conv2D(filters, 3, activation=activation, padding=padding)(conv)
        print("upsample block shape is: " ,conv.shape)
        return conv


In [23]:
import os
import numpy as np
import zipfile

def data_generator(x_zip_path, y_zip_path, final_shape, batch_size=32):
    with zipfile.ZipFile(x_zip_path, 'r') as x_zip_ref, zipfile.ZipFile(y_zip_path, 'r') as y_zip_ref:
        x_file_list = [f for f in x_zip_ref.namelist() if f.endswith(".npy")]
        y_file_list = [f for f in y_zip_ref.namelist() if f.endswith(".npy")]

        # Ensure that the lists are sorted to match X and Y pairs
        x_file_list.sort()
        y_file_list.sort()

        # Check if the number of files in both folders match
        assert len(x_file_list) == len(y_file_list), "Mismatch in the number of files in X and Y folders"

    while True:
        for batch_start in range(0, len(x_file_list), batch_size):
            batch_x_files = x_file_list[batch_start:batch_start + batch_size]
            batch_y_files = y_file_list[batch_start:batch_start + batch_size]
            batch_x_data = []
            batch_y_data = []

            for x_filename, y_filename in zip(batch_x_files, batch_y_files):
                with zipfile.ZipFile(x_zip_path, 'r') as x_zip_ref:
                    with x_zip_ref.open(x_filename) as x_file:
                        x_spectrogram = np.load(x_file)

                with zipfile.ZipFile(y_zip_path, 'r') as y_zip_ref:
                    with y_zip_ref.open(y_filename) as y_file:
                        y_spectrogram = np.load(y_file)

                processed_x_spectrogram = process_spectrogram(x_spectrogram, final_shape)
                processed_y_spectrogram = process_spectrogram(y_spectrogram, final_shape)

                batch_x_data.append(processed_x_spectrogram)
                batch_y_data.append(processed_y_spectrogram)


            batch_x_data = np.array(batch_x_data)
            batch_y_data = np.array(batch_y_data)

            batch_x_data = batch_x_data.reshape(batch_x_data.shape[0], batch_x_data.shape[1], batch_x_data.shape[2], 1 )
            batch_y_data = batch_y_data.reshape(batch_y_data.shape[0], batch_y_data.shape[1], batch_y_data.shape[2], 1 )
            yield batch_x_data, batch_y_data

# Example usage
x_zip_path = "/content/drive/MyDrive/Colab Notebooks/speech denoiser/x_train_noised_speech.zip"
y_zip_path = "/content/drive/MyDrive/Colab Notebooks/speech denoiser/y_train_clean_audio.zip"


In [24]:
model = UNET(
        input_shape=(256, 512, 1),
        conv_filters=(64, 128, 256, 512),
        conv_kernels=(3, 3),
    )


Downsample block shape is:  (None, 256, 512, 64)
Downsample block shape is:  (None, 128, 256, 128)
Downsample block shape is:  (None, 64, 128, 256)
Downsample block shape is:  (None, 32, 64, 512)
upsample block shape is:  (None, 32, 64, 512)
upsample block shape is:  (None, 64, 128, 256)
upsample block shape is:  (None, 128, 256, 128)
upsample block shape is:  (None, 256, 512, 64)


In [25]:
model.compile()

In [26]:
from callback import callbackList
callbacks = callbackList()

In [27]:
shape = (256, 512)
batch_size = 8
train_generator = data_generator(x_zip_path=x_zip_path, y_zip_path=y_zip_path, final_shape=shape, batch_size=batch_size)
valid_generator = data_generator(x_zip_path=x_zip_path, y_zip_path=y_zip_path, final_shape=shape, batch_size=batch_size)

In [28]:
history = model.train_generator(train_generator, valid_generator,batch_size= batch_size, num_epochs = 2, callbacks = callbacks)

Epoch 1/2
    140/Unknown - 308s 2s/step - loss: 0.3359 - iou: 0.2351

KeyboardInterrupt: ignored

In [31]:
! nvidia-smi

Fri Nov 17 16:32:38 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 525.105.17   Driver Version: 525.105.17   CUDA Version: 12.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   75C    P0    31W /  70W |  14571MiB / 15360MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces