In [1]:
import tensorflow as tf
from tensorflow.keras.layers import Conv1D,Conv1DTranspose,Concatenate,Input
import numpy as np
import IPython.display
import glob
from tqdm.notebook import tqdm
import librosa.display
import matplotlib.pyplot as plt

In [None]:
import os

os.environ["CUDA_VISIBLE_DEVICES"]="0"
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        tf.config.experimental.set_memory_growth(gpus[0], True)
    except RuntimeError as e:
        print(e)

In [3]:
!nvidia-smi

Wed Mar 15 09:04:29 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 527.56       Driver Version: 527.56       CUDA Version: 12.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name            TCC/WDDM | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  NVIDIA GeForce ... WDDM  | 00000000:01:00.0 Off |                  N/A |
| N/A   46C    P0    12W /  50W |      0MiB /  4096MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

# Load the Data

In [6]:
clean_sounds = glob.glob('./CleanData/*')
noisy_sounds = glob.glob('./NoisyData/*')

clean_sounds_list,_ = tf.audio.decode_wav(tf.io.read_file(clean_sounds[0]),desired_channels=1)
for i in tqdm(clean_sounds[1:2000]):
    so,_ = tf.audio.decode_wav(tf.io.read_file(i),desired_channels=1)
    clean_sounds_list = tf.concat((clean_sounds_list,so),0)

noisy_sounds_list,_ = tf.audio.decode_wav(tf.io.read_file(noisy_sounds[0]),desired_channels=1)
for i in tqdm(noisy_sounds[1:2000]):
    so,_ = tf.audio.decode_wav(tf.io.read_file(i),desired_channels=1)
    noisy_sounds_list = tf.concat((noisy_sounds_list,so),0)

clean_sounds_list.shape,noisy_sounds_list.shape

  0%|          | 0/1999 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
sample,_ = tf.audio.decode_wav(tf.io.read_file("./0021.wav"),desired_channels=1)
sample.shape

# batching_size = 12000

# sample_train = [],[]

# for i in tqdm(range(0,clean_sounds_list.shape[0]-batching_size,batching_size)):
#     sample_train.append(clean_sounds_list[i:i+batching_size])


# clean_train = tf.stack(clean_train)


# clean_train.shape

In [None]:
clean_sounds_list.shape,noisy_sounds_list.shape

In [None]:
batching_size = 12000

clean_train,noisy_train = [],[]

for i in tqdm(range(0,clean_sounds_list.shape[0]-batching_size,batching_size)):
    clean_train.append(clean_sounds_list[i:i+batching_size])
    noisy_train.append(noisy_sounds_list[i:i+batching_size])

clean_train = tf.stack(clean_train)
noisy_train = tf.stack(noisy_train)

clean_train.shape,noisy_train.shape

In [None]:
sample.shape

In [None]:
sample_train = []

for j in tqdm(range(0,sample.shape[0]-batching_size,batching_size)):
    sample_train.append(sample[j:j+batching_size])

sample_train = tf.stack(sample_train)

sample_train.shape

# Create a tf.data.Dataset

In [None]:
def get_dataset(x_train,y_train):
    dataset = tf.data.Dataset.from_tensor_slices((x_train,y_train))
    dataset = dataset.shuffle(100).batch(64,drop_remainder=True)
    return dataset

In [None]:
train_dataset = get_dataset(noisy_train[:40000],clean_train[:40000])
test_dataset = get_dataset(noisy_train[40000:],clean_train[40000:])

In [None]:
test_dataset

# Reviewing Sample Waveform

In [None]:
librosa.display.waveplot(np.squeeze(clean_train[5].numpy(),axis=-1))
plt.show()
librosa.display.waveplot(np.squeeze(noisy_train[5].numpy(),axis=-1))
plt.show()

# Creating the Model

In [None]:
inp = Input(shape=(batching_size,1))
c1 = Conv1D(2,32,2,'same',activation='relu')(inp)
c2 = Conv1D(4,32,2,'same',activation='relu')(c1)
c3 = Conv1D(8,32,2,'same',activation='relu')(c2)
c4 = Conv1D(16,32,2,'same',activation='relu')(c3)
c5 = Conv1D(32,32,2,'same',activation='relu')(c4)

dc1 = Conv1DTranspose(32,32,1,padding='same')(c5)
conc = Concatenate()([c5,dc1])
dc2 = Conv1DTranspose(16,32,2,padding='same')(conc)
conc = Concatenate()([c4,dc2])
dc3 = Conv1DTranspose(8,32,2,padding='same')(conc)
conc = Concatenate()([c3,dc3])
dc4 = Conv1DTranspose(4,32,2,padding='same')(conc)
conc = Concatenate()([c2,dc4])
dc5 = Conv1DTranspose(2,32,2,padding='same')(conc)
conc = Concatenate()([c1,dc5])
dc6 = Conv1DTranspose(1,32,2,padding='same')(conc)
conc = Concatenate()([inp,dc6])
dc7 = Conv1DTranspose(1,32,1,padding='same',activation='linear')(conc)
model = tf.keras.models.Model(inp,dc7)
model.summary()

In [None]:
tf.keras.utils.plot_model(model,show_shapes=True,show_layer_names=False)

# Training

In [None]:
model.compile(optimizer=tf.keras.optimizers.Adam(0.002),loss=tf.keras.losses.MeanAbsoluteError())
history = model.fit(train_dataset,epochs=10)

# Testing Samples

In [None]:
from IPython.display import Audio
Audio(np.squeeze(noisy_train[22].numpy()),rate=16000)

In [None]:
Audio(tf.squeeze(model.predict(tf.expand_dims(tf.expand_dims(noisy_train[22],-1),0))),rate=16000)

In [None]:
model.evaluate(test_dataset)

In [None]:
model.save('NoiseSuppressionModel.h5')

# Inference
## Handling different sized audio inputs can be solved by overlapping prediction frames and removing the intersection part from the final waveform

In [None]:
def get_audio(path):
    audio,_ = tf.audio.decode_wav(tf.io.read_file(path),1)
    return audio

In [None]:
def inference_preprocess(path):
    audio = get_audio(path)
    audio_len = audio.shape[0]
    batches = []
    for i in range(0,audio_len-batching_size,batching_size):
        batches.append(audio[i:i+batching_size])

    batches.append(audio[-batching_size:])
    diff = audio_len - (i + batching_size)
    return tf.stack(batches), diff

In [None]:
def predict(path):
    test_data,diff = inference_preprocess(path)
    predictions = model.predict(test_data)
    final_op = tf.reshape(predictions[:-1],((predictions.shape[0]-1)*predictions.shape[1],1))
    final_op = tf.concat((final_op,predictions[-1][-diff:]),axis=0)
    return final_op

In [None]:
Audio(np.squeeze(get_audio(noisy_sounds[4]).numpy(),-1),rate=16000)

In [None]:
Audio(np.squeeze(get_audio("./0003.wav").numpy(),-1),rate=8000)

In [None]:
Audio(tf.squeeze(predict(noisy_sounds[4])),rate=16000)


In [None]:
Audio(tf.squeeze(predict("./0003.wav")),rate=8000)

In [None]:
%%timeit
tf.squeeze(predict(noisy_sounds[3]))

In [None]:
librosa.display.waveshow(np.squeeze(get_audio(noisy_sounds[4]).numpy(),-1))
librosa.display.waveshow(np.squeeze(predict(noisy_sounds[4])))
plt.show()

# Quantization and TFLite Conversion

In [None]:
lite_model = tf.lite.TFLiteConverter.from_keras_model(model)
lite_model.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model_quant = lite_model.convert()

In [None]:
with open('TFLiteModel.tflite','wb') as f:
    f.write(tflite_model_quant)

# TFLite Inference

In [None]:
interpreter = tf.lite.Interpreter(model_path='./contents/TFLiteModel.tflite')
interpreter.allocate_tensors()

In [None]:
def predict_tflite(path):
    test_audio,diff = inference_preprocess(path)
    input_index = interpreter.get_input_details()[0]["index"]
    output_index = interpreter.get_output_details()[0]["index"]

    preds = []
    for i in test_audio:
        interpreter.set_tensor(input_index, tf.expand_dims(i,0))
        interpreter.invoke()
        predictions = interpreter.get_tensor(output_index)
        preds.append(predictions)

    predictions = tf.squeeze(tf.stack(preds,axis=1))
    final_op = tf.reshape(predictions[:-1],((predictions.shape[0]-1)*predictions.shape[1],1))
    final_op = tf.concat((tf.squeeze(final_op),predictions[-1][-diff:]),axis=0)
    return final_op

In [None]:
# Original Noisy Audio
Audio(np.squeeze(get_audio(noisy_sounds[4]).numpy(),-1),rate=16000)

In [None]:
### sample파일을 바로 불러오기
# get_audio(noisy_sounds[4]).numpy().shape
# sample.shape
# noisy_sounds[4].shape
# noisy_sounds[4]
Audio(np.squeeze(get_audio("./0003.wav").numpy(),-1),rate=8000)

In [None]:
# Clean Audio
Audio(np.squeeze(get_audio(clean_sounds[4]).numpy(),-1),rate=16000)

In [None]:
# Output Audio
Audio(predict_tflite(noisy_sounds[4]),rate=16000)

In [None]:
# Output Audio sample
Audio(predict_tflite("./0003.wav"),rate=8000)

In [None]:
%%timeit
predict_tflite(noisy_sounds[3])

In [None]:
librosa.display.waveplot(np.squeeze(get_audio(noisy_sounds[4]).numpy(),-1))
librosa.display.waveplot(predict_tflite(noisy_sounds[4]).numpy())

In [None]:
(get_audio(clean_sounds[4]).numpy(),-1)
# Audio(np.squeeze(get_audio(clean_sounds[4]).numpy(),-1),rate=16000)

In [None]:
(sample.numpy(),-1)