In [1]:
import sys
sys.path.insert(0, '/tf/utils/')

In [2]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 
import tensorflow as tf
import matplotlib.pyplot as plt

In [3]:
from keras.models import Model, load_model
from keras.layers import Dropout, Conv2DTranspose, ReLU, ZeroPadding2D, BatchNormalization, Input, Conv2D, Conv2DTranspose, Flatten, Dense, LeakyReLU, MaxPooling2D, UpSampling2D, Concatenate, concatenate
from keras.optimizers import Adam
import numpy as np
import tensorflow.keras.backend as K
import tensorflow as tf

from utils import calculate_snr, itakura_distortion, somar_sinais, add_white_gaussian_noise, performance
import librosa
from tqdm import tqdm

from sound import Sound

from IPython.display import Audio
import time
from IPython import display

In [4]:
base_shape_size = 8192
ws = 255
ol = 128

In [5]:
sound_base = Sound('../../Dados/Base/', '../../Dados/ESC-50-master/audio/', base_shape_size)

Loading clean files: 100%|██████████| 5476/5476 [00:03<00:00, 1815.43it/s]
Loading noise files: 100%|██████████| 2000/2000 [00:10<00:00, 188.39it/s]


In [6]:
def calculate_stft_magnitude_and_phase(signal, sampling_rate=8000, window_size=ws, overlap=ol):
    # Calcula a STFT usando a biblioteca librosa
    stft_result = librosa.stft(signal, n_fft=window_size, hop_length=overlap)
    
    magnitude, phase = librosa.magphase(stft_result)
    phi = np.angle(phase)
    f = librosa.fft_frequencies(sr=sampling_rate, n_fft=window_size)
    t = librosa.frames_to_time(np.arange(stft_result.shape[1]), sr=sampling_rate, hop_length=overlap)

    return magnitude, phi, f, t

def reconstruct_signal_from_stft(magnitude, phi, sampling_rate=8000, window_size=ws, overlap=ol):
    # Reconstruct the signal from magnitude and phase
    complex_spec = magnitude * np.exp(1j * phi)
    signal = librosa.istft(complex_spec, hop_length=overlap)

    return signal

In [7]:
class DataGenerator:
    def __init__(self, sound_files, noise_files):
        self.sound_files = sound_files
        self.noise_files = noise_files

    def generate_sample_completo(self, batch_size=32):
        while True:
            # Carrega um lote de sons
            sound_batch_choices = np.random.choice(self.sound_files.shape[0], size=batch_size, replace=False)
            sound_batch = self.sound_files[sound_batch_choices]
            
            # Carrega um lote de ruídos
            noise_batch_choices = np.random.choice(self.noise_files.shape[0], size=batch_size, replace=False)
            noise_batch = self.noise_files[noise_batch_choices]
            
            x_train = []
            y_train = []
            
            # Adiciona ruído a cada som e calcula a nota PESQ
            for sound, noise in zip(sound_batch, noise_batch):
                # noisy_sound = somar_sinais(sound, noise, sr)
                min_valor = np.min(sound)
                max_valor = np.max(sound)
                
                # Defina o novo intervalo desejado
                novo_min = -0.4
                novo_max = 0.4
                
                # Realize a escala do sinal para o novo intervalo
                sound_escalado = (sound - min_valor) / (max_valor - min_valor) * (novo_max - novo_min) + novo_min

                sr = np.random.randint(0, 20, size=(1,)[0])
                noisy_sound = somar_sinais(sound_escalado, noise, sr)
                
                # noisy_sound = add_white_gaussian_noise(noisy_sound, np.random.randint(20, 30, size=(1,)[0]))
                noisy_sound = add_white_gaussian_noise(noisy_sound, np.random.randint(20, 30, size=(1,)[0]))
                noisy_sound = np.clip(noisy_sound, -1.0, 1.0)
                
                A, phi, _, _ = calculate_stft_magnitude_and_phase(sound_escalado)
                A_noisy, phi_noisy, _, _ = calculate_stft_magnitude_and_phase(noisy_sound)

                # A /= NORM_FACTOR
                # A_noisy /= NORM_FACTOR

                # A = 10*np.log10(A)
                # A_noisy = 10*np.log10(A_noisy)

                # xA_batch.append(A)
                # xphi_batch.append(phi)
                # yA_batch.append(A_noisy)
                # yphi_batch.append(phi_noisy)
                
                # Monta o fasor normalizando a faze por Pi
                F = np.concatenate([A.reshape(A.shape[0], A.shape[1], 1), (phi.reshape(phi.shape[0], phi.shape[1], 1) / (2*np.pi)) + 0.5], axis=-1)
                F_noisy = np.concatenate([A_noisy.reshape(A_noisy.shape[0], A_noisy.shape[1], 1), (phi_noisy.reshape(phi_noisy.shape[0], phi_noisy.shape[1], 1) / (2*np.pi)) + 0.5], axis=-1)
                
                # Adiciona o exemplo ao lote de treinamento
                x_train.append(F_noisy)
                y_train.append(F)

            x_train = np.array(x_train)
            y_train = np.array(y_train)
            
            yield x_train, y_train

In [8]:
data_generator_train = DataGenerator(sound_base.train_X, sound_base.noise_sounds)
data_generator_val = DataGenerator(sound_base.val_X, sound_base.noise_sounds)

In [9]:
def downsample(filters, size, apply_batchnorm=True):
    initializer = tf.random_normal_initializer(0., 0.02)

    result = tf.keras.Sequential()
    result.add(Conv2D(filters, size, strides=2, padding='same', kernel_initializer=initializer, use_bias=False))

    if apply_batchnorm:
        result.add(BatchNormalization())

    result.add(LeakyReLU())
    
    return result

In [10]:
def upsample(filters, size, apply_dropout=False):
  initializer = tf.random_normal_initializer(0., 0.02)

  result = tf.keras.Sequential()
  result.add(Conv2DTranspose(filters, size, strides=2, padding='same', kernel_initializer=initializer, use_bias=False))

  result.add(BatchNormalization())

  if apply_dropout:
      result.add(Dropout(0.5))

  result.add(ReLU())

  return result

In [11]:
# Função para criar o gerador
def Generator(inputs):
    down_stack = [
    downsample(64, 4, apply_batchnorm=False),  # (batch_size, 128, 128, 64)
    downsample(128, 4),  # (batch_size, 64, 64, 128)
    downsample(256, 4),  # (batch_size, 32, 32, 256)
    downsample(512, 4),  # (batch_size, 16, 16, 512)
    downsample(512, 4),  # (batch_size, 8, 8, 512)
    # downsample(512, 4),  # (batch_size, 4, 4, 512)
    # downsample(512, 4),  # (batch_size, 2, 2, 512)
    # downsample(512, 4),  # (batch_size, 1, 1, 512)
    ]
    
    up_stack = [
    # upsample(512, 4, apply_dropout=True),  # (batch_size, 2, 2, 1024)
    # upsample(512, 4, apply_dropout=True),  # (batch_size, 4, 4, 1024)
    # upsample(512, 4, apply_dropout=True),  # (batch_size, 8, 8, 1024)
    upsample(512, 4),  # (batch_size, 16, 16, 1024)
    upsample(256, 4),  # (batch_size, 32, 32, 512)
    upsample(128, 4),  # (batch_size, 64, 64, 256)
    upsample(64, 4),  # (batch_size, 128, 128, 128)
    ]
    
    initializer = tf.random_normal_initializer(0., 0.02)
    last = Conv2DTranspose(2, 4,
                         strides=2,
                         padding='same',
                         kernel_initializer=initializer,
                         activation='linear')  # (batch_size, 256, 256, 3)
    
    x = inputs
    
    # Downsampling through the model
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)
        
    skips = reversed(skips[:-1])
    
    # Upsampling and establishing the skip connections
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = Concatenate()([x, skip])
    
    x = last(x)
    
    return x

# Função para criar o discriminador
def Discriminator(inputs, targets):
    initializer = tf.random_normal_initializer(0., 0.02)
    
    x = concatenate([inputs, targets])  # (batch_size, 256, 256, channels*2)
    
    down1 = downsample(64, 4, False)(x)  # (batch_size, 128, 128, 64)
    down2 = downsample(128, 4)(down1)  # (batch_size, 64, 64, 128)
    down3 = downsample(256, 4)(down2)  # (batch_size, 32, 32, 256)
    
    zero_pad1 = ZeroPadding2D()(down3)  # (batch_size, 34, 34, 256)
    conv = Conv2D(512, 4, strides=1, kernel_initializer=initializer, use_bias=False)(zero_pad1) # (batch_size, 31, 31, 512)
    
    batchnorm1 = BatchNormalization()(conv)
    
    leaky_relu = LeakyReLU()(batchnorm1)
    
    zero_pad2 = ZeroPadding2D()(leaky_relu)  # (batch_size, 33, 33, 512)
    
    last = Conv2D(1, 4, strides=1, kernel_initializer=initializer)(zero_pad2)  # (batch_size, 30, 30, 1)
    
    return last

In [12]:
def generate_images(model, test_input, tar):
    prediction = model(test_input, training=True)
    plt.figure(figsize=(22, 7))
    
    display_list = [test_input[0], tar[0], prediction[0]]
    title = ['Log Power Spectrum - Som ruidoso', 'Log Power Spectrum - Som original', 'Log Power Spectrum - Som filtrado']
    
    for i in range(3):
        plt.subplot(1, 3, i+1)
        plt.title(title[i])
        # Getting the pixel values in the [0, 1] range to plot.
        plt.imshow(10 * np.log10(display_list[i][..., 0]), aspect='auto', cmap='inferno')
        plt.colorbar(format='%+2.0f dB')  # Removi a variável 'im' e 'axs[0]'
        plt.axis('off')
    
    plt.show()

In [13]:
# Construindo o gerador e o discriminador
input_shape = (128, 64, 2)

disc_inputs = Input(shape=input_shape)
target_inputs = Input(shape=input_shape)

disc_out = Discriminator(disc_inputs, target_inputs)
discriminator = Model([disc_inputs, target_inputs], disc_out, name='discriminator')
discriminator.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=2e-4, beta_1=0.5))

In [15]:
gen_inputs = Input(shape=input_shape)

gen_output = Generator(gen_inputs)
generator = Model(gen_inputs, gen_output, name='generator')
generator.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=2e-4, beta_1=0.5))

In [18]:
gan_input = Input(shape=input_shape)
gan_target_inputs = Input(shape=input_shape)

gen_outputs = generator(gan_input)
gan_out = discriminator([gen_outputs, gan_target_inputs])
gan = Model([gan_input, gan_target_inputs], gan_out, name='gan')
gan.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=2e-4, beta_1=0.5))

In [19]:
generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

In [None]:
# Tamanho do lote
batch_size = 64

# Número de épocas e lotes por época
num_epochs = 2
num_batches_per_epoch = len(sound_base.train_X) // batch_size

gan_loss = 1.
diff_param = 10.
SAME_ITS = 4

for epoch in range(num_epochs):

    factor_dis = 1
    factor_gen = 1
    dis_loss_count = 0
    gen_loss_count = 0
    same_count = 0

    for batch in tqdm(range(num_batches_per_epoch)):
        # Restaurar o treinamento do discriminador
        discriminator.trainable = True
        generator.trainable = False

        if discriminator_loss == 0. or (gan_loss / discriminator_loss) > diff_param:
            gen_loss_count += 1
            dis_loss_count = 0
            factor_dis = 1
            factor_gen = min(2 ** gen_loss_count, 128)

        elif gan_loss == 0. or (discriminator_loss / gan_loss) > diff_param:
            dis_loss_count += 1
            gen_loss_count = 0
            factor_dis = min(2 ** dis_loss_count, 128)
            factor_gen = 1

        else:
            dis_loss_count = 0
            gen_loss_count = 0
            factor_dis = 1
            factor_gen = 1

            same_count += 1

            if same_count % SAME_ITS == 0:
                diff_param /= 2.
                same_count = 0


        print(f'Training Discriminator: {factor_dis} times\nTraining Generator: {factor_gen} times')
        
        for i in range(factor_dis):
            # Treinar o discriminador com dados reais
            noisy_stft_batch, real_stft_batch = next(data_generator_train.generate_sample_completo(batch_size))
            discriminator_loss_real = discriminator.train_on_batch([noisy_stft_batch, real_stft_batch], np.ones((14, 6, 1)))
        
            # Gerar dados falsos e treinar o discriminador com eles
            generated_stft = generator.predict(noisy_stft_batch, verbose=False)
            discriminator_loss_fake = discriminator.train_on_batch([noisy_stft_batch, generated_stft], np.zeros((14, 6, 1)))

            discriminator_loss = 0.5 * np.add(discriminator_loss_real, discriminator_loss_fake)
            print(f'Discriminator: {i + 1}/{factor_dis} - Loss fake: {discriminator_loss_fake:.5g} Loss real: {discriminator_loss_real:.5g}')

        # Definir o discriminador como não treinável
        discriminator.trainable = False
        generator.trainable = True

        for i in range(factor_gen):
            noisy_stft_batch, _ = next(data_generator_train.generate_sample_completo(batch_size))
            # Treinar a GAN com dados ruidosos (ruído) e rótulos de 1 (indicando que são dados reais)
            gan_loss = gan.train_on_batch(noisy_stft_batch, np.ones((batch_size, 1)))
            print(f'Generator: {i + 1}/{factor_gen} - Generator loss: {gan_loss}')

        print('\n\n')

    # Imprima métricas de treinamento ao final de cada época, se desejar
    print(f'Epoch {epoch + 1}/{num_epochs}, Discriminator Loss: {discriminator_loss}, GAN Loss: {gan_loss}')
