In [None]:
import librosa
import numpy as np
import matplotlib.pyplot as plt
import soundfile as sf
import IPython.display as ipd
import pandas as pd
from collections import deque

In [None]:
from google.colab import drive
drive.flush_and_unmount()
drive.mount('/content/drive', force_remount=True)

Drive not mounted, so nothing to flush and unmount.
Mounted at /content/drive


In [None]:
SAMPLING_RATE=16000
DURATION=10
N_FFT = 1024
WIN_LENGTH = 1024
HOP_LENGTH = 512

In [None]:
X = np.load('/content/drive/MyDrive/val_set/processed_data/combined_spectrogram.npy')

In [None]:
print(X.shape)


(717, 513, 313)


In [None]:
def to_time(iteration, frame_size=WIN_LENGTH, rate=SAMPLING_RATE, hop_size=HOP_LENGTH, duration=DURATION):
  total_iterations = int(np.ceil((DURATION*SAMPLING_RATE)/(hop_size)))
  return (float(iteration)/float(total_iterations))*DURATION

In [None]:
def dilate(X, span=2):
  X_dilated = []
  for i in range(len(X)):
    count = {'silence':0, 'speech':0, 'music':0}
    for j in range(max(0, i-span), min(len(X), i+span)):
      count[X[j]] = count[X[j]] + 1
    keymax = max(zip(count.values(), count.keys()))[1]
    X_dilated.append(keymax)
  return X_dilated

In [None]:
def Decode(predicted, frame_size=WIN_LENGTH, sr=SAMPLING_RATE, min_event_duration=0.5):
  ans = []
  predicted = dilate(predicted)
  prevclass = 'silence'
  curclass = 'silence'
  start = 0
  end = 0

  for i in range(len(predicted)):
    curclass = predicted[i]
    if(curclass != prevclass):
        if(prevclass != 'silence'):
          ans.append((start, to_time(i), prevclass))
        # the prev class has ended
        prevclass = curclass
        # the start time of the current class is now
        start = to_time(i)
  if(curclass !='silence'):
    ans.append((start, to_time(len(predicted)), curclass))
  # Combining adjacent classes (Dilation)
  compressed_ans = []
  pc = ans[0]
  for i in range(len(ans)-1):  
    cc = ans[i]
    nc = ans[i+1]

    if(nc[2] != cc[2]):
      compressed_ans.append(pc)
      pc = nc
    else:
      if(nc[0] - pc[1] >= min_event_duration):
        compressed_ans.append(pc)
        pc=nc
      else:
        pc = (pc[0], nc[1], pc[2])
  compressed_ans.append(pc)    
  
  final_ans = []
  # Erosion
  for i in range(len(compressed_ans)):
    if(compressed_ans[i][1] - compressed_ans[i][0] > min_event_duration):
      final_ans.append(compressed_ans[i])
  return final_ans

In [None]:
predictions=[]
for i in range(132):
  predictions.append('music')
for i in range(100):
  predictions.append('silence')
for i in range(43):
  predictions.append('speech')
for i in range(38):
  predictions.append('silence')

A = predictions.copy()

In [None]:
ans = Decode(predictions)
print(ans)

[(0.0, 4.217252396166134, 'music'), (7.412140575079872, 8.817891373801917, 'speech')]
