# **MAAS Audio Filtering**

This notebook details the process of filtering wav audio files for usage in the MAAS project. The first four steps are an implementation of the process described by Salamon and Gómez [2]. The rest is inspired by the research done by Wang [3].

In [None]:
# Imports
import numpy as np
from scipy import signal as sig
from scipy.io import wavfile as wav
import matplotlib.pyplot as plt

In [None]:
# Define Mauricio's Audio Analysis System function
def MAAS_filter(data, sr = 44100, filename = None):
  ## **Step 1.** Sinusoid Extraction
  # 1.1. Equal Loudness Filtering
  # Filter approximation by [1]

  yule_b = np.array([0.05418656406430, -0.02911007808948, -0.00848709379851, -0.00851165645469,
                     -0.00834990904936, 0.02245293253339, -0.02596338512915, 0.01624864962975,
                     -0.00240879051584, 0.00674613682247, -0.00187763777362])
  yule_a = np.array([1.0, -3.47845948550071, 6.36317777566148, -8.54751527471874, 9.47693607801280,
                     -8.81498681370155, 6.85401540936998, -4.39470996079559, 2.19611684890774,
                     -0.75104302451432, 0.13149317958808])
  butter_b = np.array([0.98500175787242, -1.97000351574484, 0.98500175787242])
  butter_a = np.array([1.0, -1.96977855582618, 0.97022847566350])

  num = np.convolve(yule_b, butter_b)
  den = np.convolve(yule_a, butter_a)

  eq_data = sig.lfilter(num, den, data)
  # 1.2. Spectral Transform
  # Apply Short-Time Fourier Transform

  # Constants as calculated by [2]
  # Values specific to 44.1 kHz, should be scalable to other sample rates
  M = 2048
  N = 8192
  H = 128

  win = sig.windows.hann(M, False)
  SFT = sig.ShortTimeFFT(win, hop=H, fs=sr, mfft=N, scale_to='magnitude')
  Sx = SFT.stft(eq_data)
  # Spectogram only for illustration purposes
  fig1, ax1 = plt.subplots(figsize=(9., 6.))
  ax1.set(ylim=(0, 5000))

  im1 = ax1.imshow(abs(Sx), origin='lower', aspect='auto',
                   extent=SFT.extent(sr*np.size(data)), cmap='viridis')
  fig1.colorbar(im1)
  fig1.tight_layout()

  plt.show()
  # 1.3. Frequency/Amplitude Correction
  # Identify local maxima of each time frame

  peaks = sig.argrelmax(abs(Sx), axis=0, order=2)
  pcp = max(abs(Sx[peaks[0], peaks[1]])) / 20

  peak_ids = np.zeros(0, dtype=int)
  for i in range(peaks[0].size):
    if abs(Sx[peaks[0][i], peaks[1][i]]) > pcp:
      peak_ids = np.append(peak_ids, i)

  peaks_f = peaks[0][peak_ids]
  peaks_t = peaks[1][peak_ids]
  # Graph only for illustration purposes
  fig1, ax1 = plt.subplots(figsize=(9., 6.))
  ax1.set(ylim=(0, 5000))

  extents = SFT.extent(sr*data.size)
  # x and y scale actually important. do not delete
  x_scale = data.size/Sx.shape[1]
  y_scale = sr/N

  im1 = ax1.imshow(abs(Sx), origin='lower', aspect='auto',
                   extent=extents, cmap='viridis')
  ax1.plot(peaks_t * x_scale, peaks_f * y_scale, 'y.')

  fig1.colorbar(im1)
  fig1.tight_layout()

  plt.show()
  # Correct frequency and amplitude based on peaks' phase
  # Done by computing instantaneous frequency (IF) and magnitude (Ai)
  # Further clarification about this implementation by [4] and [5]

  IF = np.zeros(peaks_f.size)
  Ai = np.zeros(peaks_f.size)
  koff = np.zeros(peaks_f.size)

  for i in range(IF.size):
    k_offset = np.angle(Sx[peaks_f[i], peaks_t[i]]) - (2*np.pi*H*peaks_f[i]/N)
    if peaks_t[i] > 0: k_offset -= np.angle(Sx[peaks_f[i], peaks_t[i]-1])
    k_offset = k_offset%(2*np.pi)
    if k_offset > np.pi: k_offset -= 2*np.pi
    Wh = np.sinc(k_offset) / (2*(1 - k_offset**2))
    koff[i] = k_offset
    k_offset *= N/(2*np.pi*H)

    IF[i] = (peaks_f[i] + k_offset) * y_scale
    Ai[i] = abs(Sx[peaks_f[i], peaks_t[i]]) / (2*Wh)
  # Graph only for illustration purposes
  fig1, ax1 = plt.subplots(figsize=(9., 6.))
  ax1.set(ylim=(0, 5000))

  im1 = ax1.imshow(abs(Sx), origin='lower', aspect='auto',
                   extent=extents, cmap='viridis')
  ax1.plot(peaks_t * x_scale, peaks_f * y_scale, 'r.')
  ax1.plot(peaks_t * x_scale, IF, 'y.')

  fig1.colorbar(im1)
  fig1.tight_layout()

  plt.show()
  ## **Step 2.** Salience Function Computation
  # Calculate number of peaks and highest peak per frame

  peaks_per_f = np.zeros(Sx.shape[1], dtype=int)
  max_peak = np.zeros(peaks_per_f.size, dtype=int)

  for i in range(peaks_t.size):
    peaks_per_f[peaks_t[i]] += 1
    if IF[i] > max_peak[peaks_t[i]]:
      max_peak[peaks_t[i]] = i
  # Useful function definitions

  # Bin: Computes (discrete) bin number of given frequency
  def Bin(fi):
    return np.floor(120*np.log2(fi/55) + 1)

  # Threshold: Whether or not a given peak is loud enough
  # compared to highest peak in its frame
  def Threshold(a, t, g):
    if np.log10(abs(Ai[max_peak[t]] / a)) < g:
      return 1
    else: return 0

  # Weight: Assigns (cos^2) weight to bin if the given peak is a
  # multiple of the bin's center frequency (harmonic)
  def Weight(a, b, h, fi):
    d = abs(Bin(fi/h) - b)/10
    if d <= 1:
      return (a**(h-1)) * np.cos(np.pi*d/2)**2
    else: return 0
  # Constant values given by [2]
  alpha = 0.8
  beta = 1
  gamma = 2
  Nh = 20

  # Useful peak indexing per frame
  peaks_by_t = np.zeros((peaks_t.size, 2), dtype=int)
  for i in range(peaks_t.size):
    peaks_by_t[i] = [peaks_t[i], i]
  peaks_by_t = peaks_by_t[np.argsort(peaks_by_t[:,0])]
  # Computation of every bin's salience per frame

  Sb = np.zeros((600, Sx.shape[1]))

  index = 0
  for l in range(Sb.shape[1]):
    for b in range(Sb.shape[0]):
      for n in range(peaks_per_f[l]):
        idx = index + n
        w = 0
        ea = Threshold(Ai[peaks_by_t[idx,1]], l, gamma)
        if ea == 0: continue
        ab = abs(Ai[peaks_by_t[idx,1]])**beta
        for h in range(Nh):
          w += Weight(alpha, b+1, h+1, IF[peaks_by_t[idx,1]])
        Sb[b, l] += w * ab
    index += peaks_per_f[l]
  # Figure only for illustration purposes
  fig1, ax1 = plt.subplots(figsize=(9., 6.))

  new_extents = (extents[0], extents[1], extents[2], 600.0)
  im1 = ax1.imshow(Sb, origin='lower', aspect='auto',
                   extent=new_extents, cmap='viridis')

  fig1.colorbar(im1)
  fig1.tight_layout()

  plt.show()
  # Identify local maxima of each time frame

  Speaks = sig.argrelmax(Sb, axis=0, order=5)
  Speaks_arr = np.array([Speaks[0], Speaks[1]]).T
  Speaks_arr = Speaks_arr[np.argsort(Speaks_arr[:,1])]
  pcp2 = max(abs(Sb[Speaks_arr[:,0], Speaks_arr[:,1]])) / 10

  Speak_ids = np.zeros(0, dtype=int)
  for i in range(Speaks_arr.shape[0]):
    if abs(Sb[Speaks_arr[i,0], Speaks_arr[i,1]]) > pcp2:
      Speak_ids = np.append(Speak_ids, i)

  Speaks_b = Speaks_arr[Speak_ids, 0]
  Speaks_t = Speaks_arr[Speak_ids, 1]
  # Graph only for illustration purposes
  fig1, ax1 = plt.subplots(figsize=(9., 6.))

  im1 = ax1.imshow(Sb, origin='lower', aspect='auto',
                   extent=new_extents, cmap='viridis')
  ax1.plot(Speaks_t * x_scale, Speaks_b, 'y.')

  fig1.colorbar(im1)
  fig1.tight_layout()

  plt.show()
  ## **Step 3.** Peak Streaming
  # Calculate highest salience peaks per frame
  # and filter low salience peaks

  peak_Sb = Sb[Speaks_b, Speaks_t]
  max_Speak = np.zeros(Sb.shape[1], dtype=int)
  S_minus = np.zeros(0, dtype=int)

  for i in range(peak_Sb.size):
    if peak_Sb[i] > peak_Sb[max_Speak[Speaks_t[i]]]:
      max_Speak[Speaks_t[i]] = i

  for i in range(peak_Sb.size):
    max_id = max_Speak[Speaks_t[i]]
    if peak_Sb[i] < (peak_Sb[max_id] * 0.9):
      S_minus = np.append(S_minus, i)
  # Filter remaining peaks based on general frame salience

  boolarr = np.ones(peak_Sb.size, dtype=bool)
  for i in S_minus: boolarr[i] = False

  N = peak_Sb.size - S_minus.size
  s_mean = np.average(peak_Sb[boolarr])
  s_dev = np.std(peak_Sb[boolarr])
  min_S = s_mean - 0.9*s_dev

  for a in range(peak_Sb.size):
    if boolarr[a] == 1 and peak_Sb[a] < min_S:
      S_minus = np.append(S_minus, a)
      boolarr[a] = 0
  # Graph only for illustration purposes
  fig1, ax1 = plt.subplots(figsize=(9., 6.))

  im1 = ax1.imshow(Sb, origin='lower', aspect='auto',
                   extent=new_extents, cmap='viridis')
  ax1.plot(Speaks_t[boolarr] * x_scale, Speaks_b[boolarr], 'y.')

  fig1.colorbar(im1)
  fig1.tight_layout()

  plt.show()
  # Actual contour creation
  # Travel through the remaining ordered peaks and group them based on
  # time and pitch continuity
  # Might use some filtered peaks to maintain the continuities

  S_plus = np.array(np.where(boolarr == 1)).flatten()
  contours = []
  first_f = min(Speaks_t)
  last_f = max(Speaks_t)

  while S_plus.size:
    wpeak_Sb = np.copy(peak_Sb[S_plus])
    it = np.argmax(wpeak_Sb)
    it2 = 0
    for i in range(S_minus.size):
      if Speaks_t[S_minus[i]] == Speaks_t[it]:
        it2 = i
        break

    c_contour = np.zeros(1, dtype=int)
    c_delete = np.zeros(1, dtype=int)
    c_delete2 = np.zeros(1, dtype=int)
    c_contour[0] = S_plus[it]
    c_delete[0] = it

    # Forward
    last_p = S_plus[it]
    last = it
    os = 1
    last2 = it2
    os2 = 1
    gap = 1

    while gap < 35 and (Speaks_t[last_p] + gap) <= last_f:
      if (it+os) < S_plus.size and Speaks_t[S_plus[it+os]] == Speaks_t[last_p]:
        os += 1
      elif (it+os) < S_plus.size and Speaks_t[S_plus[it+os]] == Speaks_t[last_p] + gap:
        if abs(Speaks_b[S_plus[it+os]] - Speaks_b[last_p]) < 9:
          last = it + os
          last_p = S_plus[last]
          c_contour = np.append(c_contour, last_p)
          c_delete = np.append(c_delete, last)
          gap = 1
        os += 1
      else:
        if (it2+os2) < S_minus.size and Speaks_t[S_minus[it2+os2]] == Speaks_t[last_p]:
          os2 += 1
        elif (it2+os2) < S_minus.size and Speaks_t[S_minus[it2+os2]] == Speaks_t[last_p] + gap:
          if abs(Speaks_b[S_minus[it2+os2]] - Speaks_b[last_p]) < 9:
            last2 = it2 + os2
            last_p = S_minus[last2]
            c_contour = np.append(c_contour, last_p)
            c_delete2 = np.append(c_delete2, last2)
            gap = 1
          os2 += 1
        else: gap += 1

    # Backward
    c_contour = np.flip(c_contour)
    last_p = S_plus[it]
    last = it
    os = 1
    last2 = it2
    os2 = 1
    gap = 1

    while gap < 35 and (Speaks_t[last_p] - gap) >= first_f:
      if (it-os) >= 0 and Speaks_t[S_plus[it-os]] == Speaks_t[last_p]:
        os += 1
      elif (it-os) >= 0 and Speaks_t[S_plus[it-os]] == Speaks_t[last_p] - gap:
        if abs(Speaks_b[S_plus[it-os]] - Speaks_b[last_p]) < 9:
          last = it - os
          last_p = S_plus[last]
          c_contour = np.append(c_contour, last_p)
          c_delete = np.append(c_delete, last)
          gap = 1
        os += 1
      else:
        if (it2-os2) >= 0 and Speaks_t[S_minus[it2-os2]] == Speaks_t[last_p]:
          os2 += 1
        elif (it2-os2) >= 0 and Speaks_t[S_minus[it2-os2]] == Speaks_t[last_p] - gap:
          if abs(Speaks_b[S_minus[it2-os2]] - Speaks_b[last_p]) < 9:
            last2 = it2 - os2
            last_p = S_minus[last2]
            c_contour = np.append(c_contour, last_p)
            c_delete2 = np.append(c_delete2, last2)
            gap = 1
          os2 += 1
        else: gap += 1

    c_contour = np.flip(c_contour)
    contours += [c_contour]

    S_plus = np.delete(S_plus, c_delete)
    S_minus = np.delete(S_minus, c_delete2)
  # Graph only for illustration purposes
  fig1, ax1 = plt.subplots(figsize=(9., 6.))

  #im1 = ax1.imshow(Sb, origin='lower', aspect='auto',
                   #extent=new_extents, cmap='viridis')
  for arr in contours:
    ax1.plot(Speaks_t[arr] * x_scale, Speaks_b[arr], '.')

  fig1.colorbar(im1)
  fig1.tight_layout()

  plt.show()
  ## **Step 4.** Melody Selection
  # Calculate certain attributes for each contour

  p_mean = np.array([np.mean(Speaks_b[arr]) for arr in contours])
  p_std = np.array([np.std(Speaks_b[arr]) for arr in contours])
  s_mean = np.array([np.mean(peak_Sb[arr]) for arr in contours])
  s_total = np.array([np.sum(peak_Sb[arr]) for arr in contours])
  s_std = np.array([np.std(peak_Sb[arr]) for arr in contours])
  # 4.1. Voicing Detection
  # Give contours with considerable pitch deviation "immunity"
  # for the rest of this step

  vib = np.zeros(0, dtype=int)
  for c in range(s_std.size):
    if p_std[c] >= 4: vib = np.append(vib, c)

  # Filter contours with low salience

  s_meanmean = np.mean(s_mean)
  s_meanstd = np.std(s_mean)
  v = 0.4
  del2 = np.zeros(0, dtype=int)

  for c in range(s_mean.size):
    if c not in vib and s_mean[c] < (s_meanmean - v*s_meanstd):
      del2 = np.append(del2, c)
  del2 = np.flip(del2)
  for d in range(del2.size):
    del contours[del2[d]]
  # Graph only for illustration purposes
  fig1, ax1 = plt.subplots(figsize=(9., 6.))

  for arr in contours:
    ax1.plot(Speaks_t[arr] * x_scale, Speaks_b[arr], '.')

  fig1.colorbar(im1)
  fig1.tight_layout()

  plt.show()
  # 4.2. Octave Errors and Pitch Outliers
  # Function definitions

  # Derive melody pitch mean from given contours
  def CreatePt(cont, pm, sm):
    Pt = np.zeros(Sb.shape[1])
    Pt_d = np.zeros(Sb.shape[1])

    for i in range(len(cont)):
      for j in range(cont[i].size):
        k = Speaks_t[cont[i][j]]
        Pt[k] += Speaks_b[cont[i][j]]*peak_Sb[cont[i][j]]
        Pt_d[k] += peak_Sb[cont[i][j]]
    for i in range(Pt.size):
      if Pt_d[i] != 0: Pt[i] /= Pt_d[i]

    Pt_ = np.zeros(Pt.size)
    for i in range(Pt_.size):
      n = 0
      for m in range(1, 860):
        if (i - m) < 0: break
        if Pt[i-m] == 0: continue
        Pt_[i] += Pt[i-m]
        n += 1
      for m in range(1, 860):
        if (i + m) >= Pt.size: break
        if Pt[i+m] == 0: continue
        Pt_[i] += Pt[i+m]
        n += 1
      if n != 0: Pt_[i] /= n

    return Pt_

  # Find contour overlap sections
  def Overlaps(cont):
    overlaps = []
    timer = []
    for _ in range(Sb.shape[1]): timer += [np.zeros(0, dtype=int)]

    for i in range(len(cont)):
      for j in range(Speaks_t[cont[i][0]], Speaks_t[cont[i][-1]]+1):
        for a in range(timer[j].size):
          i2 = timer[j][a]
          done = 0
          for k in range(len(overlaps)):
            if overlaps[k][0] == i and overlaps[k][1] == i2:
              done = 1
              break
          if done == 1: continue

          last = min([Speaks_t[cont[i][-1]], Speaks_t[cont[i2][-1]]])
          overlaps += [np.array([i, i2, j, last])]

          new_x = len(overlaps) - 1
          for k in range(len(overlaps)-1):
            if (overlaps[k][0] == i or overlaps[k][1] == i or
                overlaps[k][0] == i2 or overlaps[k][1] == i2):
              if ((overlaps[k][3] - overlaps[k][2]) <
                  (overlaps[new_x][3] - overlaps[new_x][2])) and k > new_x:
                overlaps[k], overlaps[new_x] = overlaps[new_x], overlaps[k]
                new_x = k
        timer[j] = np.append(timer[j], i)

    return overlaps

  # Detect pairs of octave duplicates and pick closest to Pt
  def PickOctave(over, Pt):
    cont0 = contours[over[0]]
    cont1 = contours[over[1]]
    dif = np.zeros(0)
    pt0 = np.zeros(0)
    pt1 = np.zeros(0)
    i0 = 0
    i1 = 0

    while Speaks_t[cont0[i0]] != over[2]:
      if (i0+1) >= (cont0.size): break
      else: i0 += 1
    while Speaks_t[cont1[i1]] != over[2]:
      if (i1+1) >= (cont1.size): break
      else: i1 += 1
    for t in range(over[2], over[3]+1):
      while Speaks_t[cont0[i0]] < t:
        if (i0+1) >= (cont0.size): break
        else: i0 += 1
      if Speaks_t[cont0[i0]] > t: continue
      while Speaks_t[cont1[i1]] < t:
        if (i1+1) >= (cont1.size): break
        else: i1 += 1
      if Speaks_t[cont1[i1]] > t: continue

      b0 = Speaks_b[cont0[i0]]
      b1 = Speaks_b[cont1[i1]]
      dif = np.append(dif, abs(b0 - b1))
      pt0 = np.append(pt0, abs(Pt[t] - b0))
      pt1 = np.append(pt1, abs(Pt[t] - b1))

    if np.mean(dif) >= 115 and np.mean(dif) <= 125:
      if np.mean(pt0) == np.mean(pt1): return 0
      elif np.mean(pt0) < np.mean(pt1): return 1
      else: return 2
    else: return 0
  # Filter out octave duplicates and pitch outliers
  # Repeat process three times with updated melody pitch means

  Pt = CreatePt(contours, p_mean, s_mean)
  overlaps = Overlaps(contours)

  for _ in range(3):
    rest = range(len(contours))

    rest2 = []
    for over in overlaps:
      if over[0] in rest2 or over[1] in rest2: continue
      result = PickOctave(over, Pt)
      if result == 1: octave = over[1]
      elif result == 2: octave = over[0]
      if result != 0:
        rest2 += [octave]
    rest = [r for r in rest if r not in rest2]
    crest = []
    for c in range(len(rest)): crest += [contours[rest[c]]]
    Pt = CreatePt(crest, p_mean[rest], s_mean[rest])

    rest2 = []
    for i in range(len(rest)):
      cont = crest[i]
      ptdif = np.zeros(0)
      for j in range(cont.size):
        t = Speaks_t[cont[j]]
        ptdif = np.append(ptdif, abs(Speaks_b[cont[j]] - Pt[t]))
      if np.mean(ptdif) > 120:
        rest2 += [rest[i]]
    rest = [r for r in rest if r not in rest2]
    crest = []
    for c in range(len(rest)): crest += [contours[rest[c]]]
    Pt = CreatePt(crest, p_mean[rest], s_mean[rest])
  # Graph only for illustration purposes
  fig1, ax1 = plt.subplots(figsize=(9., 6.))

  for arr in crest:
    ax1.plot(Speaks_t[arr], Speaks_b[arr], 'k.')
  ax1.plot(np.arange(Pt.size), Pt, 'r--')

  fig1.colorbar(im1)
  fig1.tight_layout()

  plt.show()
  # 4.3. Final Melody Selection

  contours = crest
  s_total2 = s_total[rest]
  not_final = []

  overlaps = Overlaps(contours)
  for i in range(len(overlaps)):
    over = overlaps[i]
    if s_total2[over[0]] < s_total2[over[1]]: not_final += [over[0]]
    else: not_final += [over[1]]

  final = range(len(contours))
  final = [f for f in final if f not in not_final]
  final_contours = []
  for f in range(len(final)): final_contours += [contours[final[f]]]
  # Graph only for illustration purposes
  fig1, ax1 = plt.subplots(figsize=(9., 6.))

  im1 = ax1.imshow(Sb, origin='lower', aspect='auto',
                   extent=new_extents, cmap='viridis')
  for arr in final_contours:
    ax1.plot(Speaks_t[arr] * x_scale, Speaks_b[arr], 'y.')

  fig1.colorbar(im1)
  fig1.tight_layout()

  plt.show()
  for i in range(len(final_contours)):
    for j in range(i+1, len(final_contours)):
      if final_contours[i][0] > final_contours[j][0]:
        final_contours[i], final_contours[j] = final_contours[j], final_contours[i]

  final_t = np.zeros(0, dtype=int)
  final_b = np.zeros(0, dtype=int)
  for f in final_contours:
    final_t = np.append(final_t, Speaks_t[f])
    final_b = np.append(final_b, Speaks_b[f])

  if (filename is not None):
    with open(filename, "w") as txt:
      txt.write("Time,Pitch\n")
      for i in range(final_t.size):
        txt.write(str(final_t[i]) + "," + str(final_b[i]) + "\n")
  return [final_t, final_b]

## References

[1] https://replaygain.hydrogenaud.io/equal_loudness.html \\
[2] https://www.justinsalamon.com/uploads/4/3/9/4/4394963/salamongomezmelodytaslp2012.pdf \\
[3] https://www.ee.columbia.edu/~dpwe/papers/Wang03-shazam.pdf \\
[4] https://dafx.de/paper-archive/2006/papers/p_247.pdf \\
[5] https://www.db-thueringen.de/servlets/MCRFileNodeServlet/dbt_derivate_00038847/ilm1-2017000136.pdf