<a href="https://colab.research.google.com/github/anupampani123/Soundeffectgenerator/blob/main/Model_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Building a model to add the reverb effect 
# Variational autoencoder is used so that the relationship between classes during decoding isnt lost

In [None]:
import numpy as np
import librosa as lr
import matplotlib.pyplot as plt
from glob import glob
import soundfile as sf
import os
import sys
import csv
import librosa.display
import librosa 
import random
import tensorflow 
import cv2

In [None]:
from sklearn.model_selection import train_test_split
import keras
from keras.losses import binary_crossentropy
from keras import backend as K

In [None]:
def fix_specgram_shape1(spec, shape=(513,128)):
    """Fix spectrogram shape to user specified size.
    Args:
        spec: 2D spectrogram [freqs, time].
        shape: 2D output spectrogram shape [freqs, time].
    Returns:
        fixed_spec: fixed 2D output spectrogram [freqs, time].
    """
    if spec.shape[1] < shape[1]: # pad the input to be of shape (512, 128)
        out = np.zeros(shape)
        out[:spec.shape[0],:spec.shape[1]] = spec
    else: # crop the input to be of shape (513, 750)
        out = spec[:,:shape[1]]
            
    return out



def fix_specgram_shape2(spec, shape=(513,128)):
    """Fix spectrogram shape to user specified size.
    Args:
        spec: 2D spectrogram [freqs, time].
        shape: 2D output spectrogram shape [freqs, time].
    Returns:
        fixed_spec: fixed 2D output spectrogram [freqs, time].
    """        
    if spec.shape[0] < shape[0]: # pad the input to be of shape (513, 750)
        out = np.zeros(shape)
        out[:spec.shape[0],:spec.shape[1]] = spec
    else: # crop the input to be of shape (513, 750)
        out = spec[:shape[0],:]



def generate_z(encoder, spec):
    """
    Determine the latent representation of a spectrogram.
    Args:
        encoder (obj): trained Keras encoder network.
        spec (ndarray): spectrogram of shape (freqs, time).
    Returns:
        z (ndarray): latent vector of shape (1, 1, 1, 3)
    """
    # fix shape (may be longer or shorter)
    spec_shape = (encoder.input_shape[1], encoder.input_shape[2])
    spec = fix_specgram_shape1(spec, spec_shape)
    spec = fix_specgram_shape2(spec, spec_shape)

    # reshape for input to the encoder
    spec = np.reshape(spec, (1, spec.shape[0], spec.shape[1], 1))

    # predict embedding to latent vector z
    z = encoder.predict(spec)

    return z

def generate_specgram(decoder, z):
    """
    Generate a spectrogram from a latent representation.
    Args:
        decoder (obj): trained Keras decoder network.
        z (ndarray): latent vector of shape (1, 1, 1, 3).
    Returns:
        spec (ndarray): spectrogram of shape (freqs, time).
    """
    spec = decoder.predict(z) # predict spectrogram
    spec = np.reshape(spec, (spec.shape[1], spec.shape[2]))
    return spec

def audio_from_specgram(spec, rate, output):
    """
    Reconstruct audio and save it to file.
    Args:
        spec (ndarray): spectrogram of shape (freqs, time).
        rate (int): sample rate of input audio.
        output (str): path to output file.
    """
    spec = np.reshape(spec, (spec.shape[0], spec.shape[1], 1)) # reshape
    audio = ispecgram(spec, n_fft=1024, hop_length=256, mag_only=True, num_iters=1000)
    sf.write(output + '.wav', audio, rate) 


In [None]:
def inv_magphase(mag, phase_angle):
    phase = np.cos(phase_angle) + 1.j * np.sin(phase_angle)
    print("phase is "+str(phase.shape))
    print(mag.shape)
    return mag * phase

def griffin_lim(mag, phase_angle, n_fft, hop, num_iters):
    """Iterative algorithm for phase retrival from a magnitude spectrogram.
    Args:
        mag: Magnitude spectrogram.
        phase_angle: Initial condition for phase.
        n_fft: Size of the FFT.
        hop: Stride of FFT. Defaults to n_fft/2.
        num_iters: Griffin-Lim iterations to perform.
    Returns:
        audio: 1-D array of float32 sound samples.
    """
    fft_config = dict(n_fft=n_fft, win_length=n_fft, hop_length=hop, center=True)
    ifft_config = dict(win_length=n_fft, hop_length=hop, center=True)
    complex_specgram = inv_magphase(mag, phase_angle)
    for i in range(num_iters):
        audio = librosa.istft(complex_specgram, **ifft_config)
        if i != num_iters - 1:
            complex_specgram = librosa.stft(audio, **fft_config)
            _, phase = librosa.magphase(complex_specgram)
            phase_angle = np.angle(phase)
            complex_specgram = inv_magphase(mag, phase_angle)
    return audio

def ispecgram(spec,
              n_fft=512,
              hop_length=None,
              mask=True,
              log_mag=True,
              re_im=False,
              dphase=True,
              mag_only=True,
              num_iters=1000):
    """Inverse Spectrogram using librosa.
    Args:
        spec: 3-D specgram array [freqs, time, (mag_db, dphase)].
        n_fft: Size of the FFT.
        hop_length: Stride of FFT. Defaults to n_fft/2.
        mask: Reverse the mask of the phase derivative by the magnitude.
        log_mag: Use the logamplitude.
        re_im: Output Real and Imag. instead of logMag and dPhase.
        dphase: Use derivative of phase instead of phase.
        mag_only: Specgram contains no phase.
        num_iters: Number of griffin-lim iterations for mag_only.
    Returns:
        audio: 1-D array of sound samples. Peak normalized to 1.
    """
    if not hop_length:
        hop_length = n_fft // 2

    ifft_config = dict(win_length=n_fft, hop_length=hop_length, center=True)

    if mag_only:
        mag = spec[:, :, 0]
        phase_angle = np.pi * np.random.rand(*mag.shape)
    elif re_im:
        spec_real = spec[:, :, 0] + 1.j * spec[:, :, 1]
    else:
        mag, p = spec[:, :, 0], spec[:, :, 1]
        if mask and log_mag:
            p /= (mag + 1e-13 * np.random.randn(*mag.shape))
        if dphase:
            # Roll up phase
            phase_angle = np.cumsum(p * np.pi, axis=1)
        else:
            phase_angle = p * np.pi

    # Magnitudes
    if log_mag:
        mag = (mag - 1.0) * 120.0
        mag = 10**(mag / 20.0)
    phase = np.cos(phase_angle) + 1.j * np.sin(phase_angle)
    spec_real = mag * phase

    if mag_only:
        audio = griffin_lim(
            mag, phase_angle, n_fft, hop_length, num_iters=num_iters)
    else:
        audio = librosa.core.istft(spec_real, **ifft_config)
    return np.squeeze((audio / audio.max()) * 0.25) # scale to -12dB peak

In [None]:
img_size1=513;
img_size2=128;
num_channels=1;
latent_space_dim=3;

In [None]:
filelist = []
path="/content/drive/MyDrive/Suga/spect_text_new" #change to spect text
for root, dirs, files in os.walk(path):
  for file in files:
    name=os.path.join(root,file)
    if ".txt" not in name:
      continue
    else:
      filelist.append(name)

In [None]:
#Get the spectrogram data

def load_specgrams(filelist, spec_shape, train_split=0.80, n_samples=None):
    """
    Utility function to load spectogram data.
    Args:
        dataset_dir (str): Directory containing the dataset.
        spec_shape (tuple) : Shape of spectrograms to be loaded (freqs, time)
        train_split (float, optional): Fraction of the data to return as training samples.
        n_samples (int, optional): Number of total dataset examples to load. 
            (Deafults to full size of the dataset)
    Returns:
        x_train (ndarray): Training set (samples, freqs, time).
        x_test (ndarray): Testing set (samples, freqs, time).
    """
    if n_samples is None: # set number of samples to full dataset
        n_samples = len(filelist)

    x = [] # list to hold spectrograms
    for idx,sample in enumerate(filelist):
        if idx < n_samples:
            s = np.loadtxt(sample)
            out = fix_specgram_shape1(s, spec_shape)
            #out = fix_specgram_shape2(out, spec_shape)
            x.append(out) # create list of spectrograms


    x = np.stack(x, axis=0)

    train_idx = np.floor(n_samples*train_split).astype('int')
    x_train = x[:train_idx,:,:]
    x_train = np.reshape(x_train, (x_train.shape[0],spec_shape[0], x_train.shape[2], 1))
    x_test = x[train_idx:,:,:]
    x_test = np.reshape(x_test, (x_test.shape[0], spec_shape[0], x_test.shape[2], 1))

    print("x_train: {}".format(x_train.shape))
    print("x_test:  {}".format(x_test.shape))

    return x_train, x_test

In [None]:
# #get images from in numpy format 

# path ="/content/drive/MyDrive/Suga/spectograms"
# #we shall store all the file names in this list
# filelist = []
# img_data_array=[]
# i=0;

# for root, dirs, files in os.walk(path):
# 	for file in files:
#         #append the file name to the list
# 		filelist.append(os.path.join(root,file))

# #print all the file names
# for name in filelist:
#     image= cv2.imread(name, cv2.COLOR_BGR2RGB)
#     image=np.array(image)
#     image = image.astype('float32')
#     image /= 255 
#     img_data_array.append(image)


# print("done adding images")

In [None]:
x_train,x_test=load_specgrams(filelist,(img_size1,img_size2))

x_train: (440, 513, 128, 1)
x_test:  (111, 513, 128, 1)


In [None]:
for file in x_train[0:2]:
  print(file.shape)

(513, 128, 1)
(513, 128, 1)


In [None]:
from keras import layers

In [None]:
def build_spectral_ae(input_shape=(513, 128, 1), latent_dim=3, n_filters=[32, 64, 128, 256, 512], lr=0.01):

    f1 = n_filters[0]
    f2 = n_filters[1]
    f3 = n_filters[2]
    f4 = n_filters[3]

    input_spect = layers.Input(input_shape)
    x = layers.Conv2D(f1, (5,5), padding='same', strides=(2,2))(input_spect)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(f1, (4,4), padding='same', strides=(2,2))(x)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(f1, (4,4), padding='same', strides=(2,2))(x)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(f2, (4,4), padding='same', strides=(2,2))(x)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(f2, (4,4), padding='same', strides=(2,2))(x)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(f2, (4,4), padding='same', strides=(2,2))(x)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(f3, (4,4), padding='same', strides=(2,2))(x)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(f3, (4,4), padding='same', strides=(2,2))(x)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(f3, (4,4), padding='same', strides=(2,1))(x)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(f4, (1,1), padding='same', strides=(2,1))(x)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(latent_dim, (1,1), padding='same', strides=(1,1))(x)
    z = layers.BatchNormalization()(x)

    input_z = layers.Input(shape=(1, 1, latent_dim))
    x = layers.Conv2DTranspose(f4, (1,1), padding='same', strides=(1,1))(input_z)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(f3, (2,2), padding='same', strides=(2,2))(x)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(f3, (2,2), padding='same', strides=(2,2))(x)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(f3, (2,2), padding='same', strides=(2,2))(x)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(f2, (2,2), padding='same', strides=(2,2))(x)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(f2, (2,2), padding='same', strides=(2,2))(x)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(f2, (2,2), padding='same', strides=(2,2))(x)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(f1, (2,2), padding='same', strides=(2,2))(x)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(f1, (2,2), padding='same', strides=(2,1))(x)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(f1, (3,1), padding='valid', strides=(2,1))(x)
    x = layers.LeakyReLU(alpha=0.1)(x)
    x = layers.BatchNormalization()(x)
    output_spect = layers.Conv2DTranspose(1, (1,1), padding='same', strides=(1,1))(x)
    
    encoder = Model(input_spect, z)
    encoder.summary()

    decoder = Model(input_z, output_spect)
    decoder.summary()

    outputs = decoder(encoder(input_spect))
    autoencoder = Model(input_spect, outputs)
    autoencoder.compile(optimizer=tensorflow.keras.optimizers.Adam(learning_rate=lr), loss='mean_squared_error')
    autoencoder.summary()
    
    return encoder, decoder, autoencoder

In [None]:
# class VAE(tensorflow.keras.Model):
#     def __init__(self, encoder, decoder, **kwargs):
#         super(VAE, self).__init__(**kwargs)
#         self.encoder = encoder
#         self.decoder = decoder
#         self.total_loss_tracker = tensorflow.keras.metrics.Mean(name="total_loss")
#         self.reconstruction_loss_tracker = tensorflow.keras.metrics.Mean(
#             name="reconstruction_loss"
#         )
#         self.kl_loss_tracker = tensorflow.keras.metrics.Mean(name="kl_loss")

#     @property
#     def metrics(self):
#         return [
#             self.total_loss_tracker,
#             self.reconstruction_loss_tracker,
#             self.kl_loss_tracker,
#         ]

#     def train_step(self, data):
#         with tensorflow.GradientTape() as tape:

#             z_mean, z_log_var, z = self.encoder(data)
#             reconstruction = self.decoder(z)
#             reconstruction_loss = tensorflow.reduce_mean(
#                 tensorflow.reduce_sum(
#                     tensorflow.keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
#                 )
#             )
#             kl_loss = -0.5 * (1 + z_log_var - tensorflow.square(z_mean) - tensorflow.exp(z_log_var))
#             kl_loss = tensorflow.reduce_mean(tensorflow.reduce_sum(kl_loss, axis=1))
#             total_loss = reconstruction_loss + kl_loss
#         grads = tape.gradient(total_loss, self.trainable_weights)
#         self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
#         self.total_loss_tracker.update_state(total_loss)
#         self.reconstruction_loss_tracker.update_state(reconstruction_loss)
#         self.kl_loss_tracker.update_state(kl_loss)
#         return {
            
#             "loss": self.total_loss_tracker.result(),
#             "reconstruction_loss": self.reconstruction_loss_tracker.result(),
#             "kl_loss": self.kl_loss_tracker.result(),
#         }

#         # return reconstruction;


#     def call(self, inputs):
#         z_mean, z_log_var, z = self.encoder(inputs)
#         reconstructed = self.decoder(z)
#         # Add KL divergence regularization loss.
#         kl_loss = -0.5 * tensorflow.reduce_mean(
#             z_log_var - tensorflow.square(z_mean) - tensorflow.exp(z_log_var) + 1
#         )
#         self.add_loss(kl_loss)
#         return reconstructed
    

In [None]:
from keras.models import Model
e, d, ae = build_spectral_ae()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 513, 128, 1)]     0         
_________________________________________________________________
conv2d (Conv2D)              (None, 257, 64, 32)       832       
_________________________________________________________________
leaky_re_lu (LeakyReLU)      (None, 257, 64, 32)       0         
_________________________________________________________________
batch_normalization (BatchNo (None, 257, 64, 32)       128       
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 129, 32, 32)       16416     
_________________________________________________________________
leaky_re_lu_1 (LeakyReLU)    (None, 129, 32, 32)       0         
_________________________________________________________________
batch_normalization_1 (Batch (None, 129, 32, 32)       128   

In [None]:
history = ae.fit(x=x_train, y=x_train,
                shuffle=True,
                epochs=20,
                batch_size=16,
                validation_data=(x_test, x_test))


Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [None]:
e.save("/content/drive/MyDrive/Suga/encoder_m2.h5")
d.save("/content/drive/MyDrive/Suga/decoder_m2.h5")
ae.save("/content/drive/MyDrive/Suga/autoencoder_m2.h5")



In [None]:
encoder = tensorflow.keras.models.load_model("/content/drive/MyDrive/Suga/encoder_m2.h5", compile=False)
decoder = tensorflow.keras.models.load_model("/content/drive/MyDrive/Suga/decoder_m2.h5", compile=False)

In [None]:
def generate_z(encoder, spec):
    """
    Determine the latent representation of a spectrogram.
    Args:
        encoder (obj): trained Keras encoder network.
        spec (ndarray): spectrogram of shape (freqs, time).
    Returns:
        z (ndarray): latent vector of shape (1, 1, 1, 3)
    """
    # fix shape (may be longer or shorter)
    spec_shape = (encoder.input_shape[1], encoder.input_shape[2])
    spec = fix_specgram_shape1(spec, spec_shape)
    spec = fix_specgram_shape2(spec, spec_shape)


    # reshape for input to the encoder
    spec = np.reshape(spec, (1, spec.shape[0], spec.shape[1], 1))

    # predict embedding to latent vector z
    z = encoder.predict(spec)

    return z

def generate_specgram(decoder, z):
    """
    Generate a spectrogram from a latent representation.
    Args:
        decoder (obj): trained Keras decoder network.
        z (ndarray): latent vector of shape (1, 1, 1, 3).
    Returns:
        spec (ndarray): spectrogram of shape (freqs, time).
    """
    spec = decoder.predict(z) # predict spectrogram
    spec = np.reshape(spec, (spec.shape[1], spec.shape[2]))
    return spec

def audio_from_specgram(spec, rate, output):
    """
    Reconstruct audio and save it to file.
    Args:
        spec (ndarray): spectrogram of shape (freqs, time).
        rate (int): sample rate of input audio.
        output (str): path to output file.
    """
    spec = np.reshape(spec, (spec.shape[0], spec.shape[1], 1)) # reshape
    audio = ispecgram(spec, n_fft=512, hop_length=256, mag_only=True, num_iters=1000)
    sf.write(output + '.wav', audio, rate) 

In [None]:
def plot_from_specgram(spec, rate, output):
    """
    Plot a spectrogram and save it to file.
    Args:
        spec (ndarray): spectrogram of shape (freqs, time).
        rate (int): sample rate of input audio.
        output (str): path to output file.
    """
    plt.figure()
    librosa.display.specshow(spec, sr=rate*2, y_axis='log', x_axis='time')
    plt.colorbar(format='%+2.0f dB')
    plt.tight_layout()
    plt.savefig(output + '.png')
    plt.close()

In [None]:
idx = 0
for b in np.linspace(-2, 2, num=10):
    for c in np.linspace(-2, 2, num=10):
        print("{:04d} | z = [ {:+0.3f} {:+0.3f}]".format(idx,b, c))
        z = np.reshape(np.array([b, c]), (1, 1, 1, 2)) # think i want to fix this in my model
        filename = "_".join(["({:+0.3f})".format(dim) for dim in np.reshape(z, (2))])
        filename = "{:04d}_{}".format(idx, filename)
        filepath = os.path.join('pre_compute_demo2', filename)
        spec = generate_specgram(decoder, z)
        audio_from_specgram(spec, 16000, filepath)
        idx += 1

0000 | z = [ -2.000 -2.000]


ValueError: ignored