# Initial Imports


In [None]:
# Ensure that you are within the demo folder with all the files within it and are running this in a google colab environment. If not you might need to change some of the depedencies mentioned
# !pip install -r demo-requirements.txt

In [None]:
from zipfile import ZipFile
import glob
import csv
import random
from subprocess import Popen, PIPE
from keras import regularizers
from os.path import dirname
import os
import soundfile as sf
!sudo apt-get install sox
import math
import numpy as np
import librosa
import shutil
import pickle
import re
import tensorflow as tf
!git clone https://github.com/DemisEom/SpecAugment.git
!pip install /content/SpecAugment/ --quiet
!pip install tensorflow-addons --quiet
!pip install sed_eval --quiet
import keras
from SpecAugment import spec_augment_tensorflow
import sed_eval
import dcase_util
from keras import regularizers
from keras.regularizers import l2

# Download preset model

In [None]:
!pip install gdown
import gdown

output1 = "best_model.h5"
gdown.download(id='1cnDPfL3udjHcT516qX9mcXigvE_WYha3', output=output1, quiet=False)



# Helper Functions

In [None]:
def convert_annotations_to_events(filename): #read_annotations
    events = []
    with open(filename, 'r') as csvfile:
        spamreader = csv.reader(csvfile, delimiter='\t', quotechar='|')
        for row in spamreader:
            row.append(row[0])
            row.pop(0)
            row[1] = str((float(row[1])/1000))
            row[0] = str((float(row[0])/1000))
            events.append(row)
    return events


In [None]:
def construct_examples(audio_path, win_len = 2.56, hop_len = 1.0, sr = 44100.0):
  # here win_len is the window_length and hop_len is the hop_length between the examples.
  # sr is the sampling rate

  window_length_t = win_len
  hop_length_t = hop_len

  window_length = int(sr*window_length_t)
  hop_length = int(sr*hop_length_t)

  audio, sr = sf.read(audio_path)

  # handle padding
  if audio.shape[0] < window_length:
    audio_padded = np.zeros((window_length, ))
    audio_padded[0:audio.shape[0]] = audio 

  else:
    no_of_hops = math.ceil((audio.shape[0] - window_length) / hop_length)
    audio_padded = np.zeros((int(window_length + hop_length*no_of_hops), ))
    audio_padded[0:audio.shape[0]] = audio  

  audio_example = [audio_padded[i - window_length : i] for i in range(window_length, audio_padded.shape[0]+1, hop_length)]
  win_ranges = [((i - window_length)/sr, i/sr) for i in range(window_length, audio_padded.shape[0]+1, hop_length)]

  return audio_example, win_ranges

In [None]:
CLASS_ENCODING = {"car": 0, "aircraft": 1, "crowds":2, "footsteps":3, "clocks":4, "rainforest": 5}

In [None]:
def get_log_melspectrogram(audio, sr = 44100, hop_length = 441, win_length = 1764, n_fft = 2048, n_mels = 128, fmin = 0, fmax = 22050):
    """Return the log-scaled Mel bands of an audio signal."""
    audio_2 = librosa.util.normalize(audio)
    bands = librosa.feature.melspectrogram(
        y=audio_2, sr=sr, hop_length=hop_length, win_length = win_length, n_fft=n_fft, n_mels=n_mels)
    return librosa.core.power_to_db(bands)

# Network Definition

In [None]:
class YOHOBlock:
  def __init__(self, stride, num_filters, index, input):
      X = tf.keras.layers.DepthwiseConv2D(kernel_size=[3,3], strides = stride, depth_multiplier=1, padding='same', use_bias=False,
                                      activation=None, name="layer"+ str(index + 2)+"/depthwise_conv")(input)
      X = tf.keras.layers.BatchNormalization(center=True, scale=False, epsilon=1e-4, name = "layer"+ str(index + 2)+"/depthwise_conv/bn")(X)
      X = tf.keras.layers.ReLU(name="layer"+ str(index + 2)+"/depthwise_conv/relu")(X)
      X = tf.keras.layers.Conv2D(filters =num_filters, kernel_size=[1, 1], strides=1, padding='same', use_bias=False, activation=None,
                                name = "layer"+ str(index + 2)+"/pointwise_conv",
                                kernel_regularizer=l2(0.01), bias_regularizer=l2(0.01))(X)
      X = tf.keras.layers.BatchNormalization(center=True, scale=False, epsilon=1e-4, name = "layer"+ str(index + 2)+"/pointwise_conv/bn")(X)
      self.output = tf.keras.layers.ReLU(name="layer"+ str(index + 2)+"/pointwise_conv/relu")(X)


input_layer = tf.keras.Input(shape=(257, 40), name="mel_input")
X = tf.keras.layers.Reshape((257, 40, 1))(input_layer)
base_model = tf.keras.applications.EfficientNetB0(
    include_top=False,
    weights=None,
    input_tensor=X,
    pooling=None,
)
X = base_model.output
X = YOHOBlock(stride=1, num_filters=512, index=1, input=X).output
X = YOHOBlock(stride=1, num_filters=256, index=2, input=X).output
X = YOHOBlock(stride=1, num_filters=128, index=3, input=X).output
_, _, sx, sy = X.shape
X = tf.keras.layers.Reshape((-1, int(sx * sy)))(X)
pred = tf.keras.layers.Conv1D(18,kernel_size=1, activation="sigmoid")(X)
model = tf.keras.Model(inputs=input_layer, outputs=pred)

# Load the preset model weights

In [None]:
model.load_weights('/content/best-model.h5')


# Inference


In [None]:
def create_mel_spectrograms(audio_file):
  win_length = 2.56
  hop_size = 1.96

  a, win_ranges = construct_examples(audio_file, win_len=win_length,hop_len=hop_size)

  mss_in = np.zeros((len(a), 257, 128))

  preds = np.zeros((len(a), 9, 18))

  for i in range(len(a)):
    M = get_log_melspectrogram(a[i])
    mss_in[i, :, :] = M.T

  return mss_in,win_ranges


def run_inference(model, win_ranges, mss_in, no_of_div = 9, hop_size = 1.96, discard = 0.3, win_length = 2.56, max_event_silence = 0.3, sampling_rate = 44100):
  preds = model.predict(mss_in)
  events = []

  for i in range(len(preds)):
    p = preds[i, :, :]
    events_curr = []
    win_width = win_length / no_of_div
    for predIdx in range(len(p)):
      for classIdx in range(0, 6):
        if p[predIdx][classIdx*3] >= 0.5:
          start = win_width * predIdx + win_width * p[predIdx][classIdx*3+1] + win_ranges[i][0]
          end = p[predIdx][classIdx*3+2] * win_width + start
          events_curr.append([start, end, rev_class_list[classIdx]])

    events += events_curr


  class_set = set([c[2] for c in events])
  class_wise_events = {}

  for c in list(class_set):
    class_wise_events[c] = []


  for c in events:
    class_wise_events[c[2]].append(c)
    
  
  all_events = []

  for k in list(class_wise_events.keys()):
    curr_events = class_wise_events[k]
    count = 0

    while count < len(curr_events) - 1:
      if (curr_events[count][1] >= curr_events[count + 1][0]) or (curr_events[count + 1][0] - curr_events[count][1] <= max_event_silence):
        curr_events[count][1] = max(curr_events[count + 1][1], curr_events[count][1])
        del curr_events[count + 1]
      else:
        count += 1

    all_events += curr_events

  for i in range(len(all_events)):
    all_events[i][0] = round(all_events[i][0], 3)
    all_events[i][1] = round(all_events[i][1], 3)

  all_events.sort(key=lambda x: x[0])

  return all_events


In [None]:
# Ensure that you have a test-inference.wav file in the correct location (or change the path to the audio file below)
def infer_events(model, audio_file):

  # create a temp file with single channel of the audio
  temp_file = audio_file.replace("test", "test-mono")
  command = command = "sox " + audio_file + " " + temp_file + " channels 1"
  p = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
  output, err = p.communicate()

  # make the audio into the melspectrograms
  mss_in, win_ranges = create_mel_spectrograms(temp_file)


  # run inference to generate the set of events
  events = run_inference(model, win_ranges, mss_in)
  output_file = "test-inference.txt"

  print('outputted segmentation events')
  print(events)
  with open(output_file, 'w') as fp:
    fp.write('\n'.join('{},{},{}'.format(round(x[0], 5), round(x[1], 5), x[2]) for x in events))

infer_events(model, 'sample-mixed-audio.wav')