In [None]:
import os
import random
import os.path
from os import path
import pandas as pd
import shutil
from scipy.stats import norm
import pandas as pd
import numpy as np
from misc import *
from interleave_convolutional import *
from dataset_utils import *
from gan_models import *
import librosa
import librosa.display
import matplotlib.pyplot as plt
import IPython.display as ipd
from joblib import load
import imageio
from PIL import Image
from tensorflow.keras.models import load_model
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


# Objective
Develop a reusable script that creates an augmented spectrogram dataset with varying levels of synthetic spectrogram data

# Directory References

In [None]:
AI_TRAIN_DIR = '/content/drive/MyDrive/Stuff I Coded/PCG_synthesis/data2/ai/spectrograms/train/'
REAL_TRAIN_DIR = '/content/drive/MyDrive/Stuff I Coded/PCG_synthesis/data2/real/spectrograms/train/'

In [None]:
AUG_DIR = '/content/drive/MyDrive/Stuff I Coded/PCG_synthesis/data2/augmented/spectrograms/'

In [None]:
AUTOENCODER_DIR = '/content/drive/MyDrive/Stuff I Coded/PCG_synthesis/out/spectrogram_autoencoder'

In [None]:
real_murmur_key = pd.read_json('/content/drive/MyDrive/Stuff I Coded/PCG_synthesis/model_development/real_murmur_key.json').set_index('pcg_id').to_dict().get('murmur')

In [None]:
real_train_set = os.listdir(REAL_TRAIN_DIR)

# Setup

In [None]:
present_generator_path = "/content/drive/MyDrive/Stuff I Coded/PCG_synthesis/out/model_g_present_500.h5"
absent_generator_path = "/content/drive/MyDrive/Stuff I Coded/PCG_synthesis/out/model_g_absent_500.h5"
unknown_generator_path = "/content/drive/MyDrive/Stuff I Coded/PCG_synthesis/out/model_g_unknown_500.h5"
norm_path = "/content/drive/MyDrive/Stuff I Coded/PCG_synthesis/out/normalization.npy"

In [None]:
NORM = norm_path     # file containing the normalization factors used
REPR = "dft"         # output type: dft, dct, or anything else for raw waveform
IL = True            # use ILConv to bulid model (always True for raw waveform)
BIAS = True          # generator output layer has a bias
LSIZE = 100          # number of latent dimensions
MSIZE = 64           # model size
R = 4000             # sample rate of output files
V = True             # output per-batch progress

In [None]:
# create and build model
norm_f = np.load(NORM)
if (REPR == 'dft'):
    gen_func = dft_generator if IL else dft_generator_tr
elif (IL == 'dct'):
    gen_func = dct_generator if IL else dct_generator_tr
else:
    gen_func = wave_generator

# creating generators
present_generator = gen_func(MSIZE, LSIZE, BIAS)
present_generator.load_weights(present_generator_path)

absent_generator = gen_func(MSIZE, LSIZE, BIAS)
absent_generator.load_weights(absent_generator_path)

unknown_generator = gen_func(MSIZE, LSIZE, BIAS)
unknown_generator.load_weights(unknown_generator_path)

In [None]:
# lookup table for different generators
generators = {
    "present": present_generator,
    "absent": absent_generator,
    "unknown": unknown_generator
}

In [None]:
def generate_pcgs(N, B, model_type, O):
  generator = generators[model_type]
  wav_i = 0
  z_rs = np.random.RandomState()
  while (wav_i < N):
      batch_size = min(B, N - wav_i)
      if (V):
          print('generating files {:d}-{:d} (out of {:d})...'.format(wav_i, wav_i + batch_size - 1, N))
      z_in = z_rs.uniform(low=-1, high=1, size=(batch_size, LSIZE)).astype(np.float32)
      G_z = generator.predict(z_in, batch_size=batch_size) / norm_f
      if (REPR == 'dft'):
          G_z = dft_transform_backward(G_z)
      elif (REPR == 'dct'):
          G_z = dct_transform_backward(G_z)
      else:
          G_z = np.squeeze(G_z)
      write_wav_dataset(G_z, O, fname_init=wav_i)
      wav_i += batch_size
  if (V):
      print('done')

In [None]:
def label_from_filename(filename):
  lowercase_label = filename.split('_')[1]
  return f'{lowercase_label[0].upper()}{lowercase_label[1:]}'

In [None]:
def png_to_array(spec_path):
  img = Image.open(spec_path)
  img_array = np.array(img)
  return img_array

In [None]:
# function for converting wav file to spectrogram png file
def wav_to_spectrogram(wav_path, out_path, plot=False, end_limit=90317, n_mels=128):
  # load wav file
  y, sr = librosa.load(wav_path)
  # computing mel spectrogram
  S = librosa.feature.melspectrogram(y=y[:end_limit], sr=sr, n_mels=n_mels)
  # converting to log scale (dB)
  log_S = librosa.power_to_db(S, ref=np.max)
  # normalizing `log_S` to be in the 0-255 range
  normalized_log_S = np.interp(log_S, (log_S.min(), log_S.max()), (0, 255))
  # flipping array vertically
  flipped_log_S = np.flipud(normalized_log_S)
  # converting to unsigned 8-bit integer as imageio expects integers for image values
  normalized_log_S_uint8 = flipped_log_S.astype(np.uint8)

  if plot:
    # plotting
    librosa.display.specshow(log_S, sr=sr, x_axis='time', y_axis='mel')
    plt.colorbar(format='%+2.0f dB')
    plt.title('Mel log spectrogram')
    plt.tight_layout()
    plt.show()
  else:
    # save to png
    imageio.imwrite(out_path, normalized_log_S_uint8)

# Augmented Dataset Creation

In [None]:
def create_rand_aug_train_set(real_dir, num_aug_samples, aug_dir_name, murmur_type):
  aug_murmur_key = pd.DataFrame()
  filenames = []
  labels = []

  # create a new train dir called aug_dir_name
  aug_dir = '/content/drive/MyDrive/Stuff I Coded/PCG_synthesis/data2/augmented/spectrograms/'+aug_dir_name+'/'
  if path.exists(aug_dir) == False:
    os.mkdir(aug_dir)

  # prep the generator
  spec_generator = generators[murmur_type]

  # generate num_aug_samples number of wavs
  generate_pcgs(num_aug_samples, 100, murmur_type, aug_dir)

  print("finalizing AI specs in augmented directory")
  # wav -> spec for ai specs we just generated
  for filename in os.listdir(aug_dir):
    if filename[-4:] == '.wav':
      wav_filename = filename
      # convert the wav to spec
      wav_path = aug_dir + wav_filename
      wav_number = wav_filename.split('.')[0]
      spec_path = aug_dir + murmur_type + '-' + wav_number + '.png'
      wav_to_spectrogram(wav_path, spec_path)
      # remove the wav, we just want the png
      os.remove(wav_path)

  # synthetic spectrograms have now been put into aug_dir
  # now record all that information in the key csv
  # and populate it with the spectrograms we sampled from ai_dir
  ai_spectrograms = os.listdir(aug_dir)
  for filename in ai_spectrograms:
    # for each ai spectrogram, match murmur label with pcg id
    filename = filename[:-4]
    filenames.append(filename)
    label = murmur_type.capitalize()
    labels.append(label)

  print("transfering real specs to augmented directory")
  # next step is copying real spectrograms into aug_dir
  # populate it with all the spectrograms in real_dir
  for filename in os.listdir(real_dir):
    src = os.path.join(real_dir, filename)
    dst = os.path.join(aug_dir, filename)
    shutil.copyfile(src, dst)
    # and for each real spectrogram, match murmur label with pcg id
    filename = filename[:-4]
    filenames.append(filename)
    label = real_murmur_key[filename]
    labels.append(label)

  print("finalizing key creation")
  # create the key csv
  aug_murmur_key['pcg_id'] = filenames
  aug_murmur_key['murmur'] = labels
  aug_murmur_key.to_csv('/content/drive/MyDrive/Stuff I Coded/PCG_synthesis/data2/augmented/'+aug_dir_name+'_key.csv')
  print("key csv created. all done!")

In [None]:
create_rand_aug_train_set(REAL_TRAIN_DIR, 1041, "rand_aug_1041", "present")

generating files 0-99 (out of 1041)...
generating files 100-199 (out of 1041)...
generating files 200-299 (out of 1041)...
generating files 300-399 (out of 1041)...
generating files 400-499 (out of 1041)...
generating files 500-599 (out of 1041)...
generating files 600-699 (out of 1041)...
generating files 700-799 (out of 1041)...
generating files 800-899 (out of 1041)...
generating files 900-999 (out of 1041)...
generating files 1000-1040 (out of 1041)...
done
finalizing AI specs in augmented directory
transfering real specs to augmented directory
finalizing key creation
key csv created. all done!
