In [1]:
import sys
sys.path.append("../")

In [2]:
import random 
import librosa
import numpy as np
import matplotlib.pyplot as plt
import libs.AudioDataSet

In [3]:
def jaccard_similarity(signature1, signature2):
    """
    Compute the Jaccard similarity between two MinHash signatures.

    Parameters:
    signature1 (numpy array): MinHash signature of shape (num_hashes,)
    signature2 (numpy array): MinHash signature of shape (num_hashes,)

    Returns:
    similarity (float): Jaccard similarity between the two signatures
    """
    intersection = np.sum(np.bitwise_and(signature1, signature2))
    union = np.sum(np.bitwise_or(signature1, signature2))
    return intersection / union

def minHASH(arrays, num_perm):
  permutations = []
  count = 0
  while count < num_perm:
    perm = random.sample(range(0, len(arrays[0])), len(arrays[0]))
      # print(skm.mutual_info_score(arrays[0], perm))
    permutations.append(perm)
    count += 1
    
  signatures = []
  for i in range(len(arrays)):
    signature = []
    for permutation in permutations:
      for j in permutation:
        if(arrays[i][j] == 1):
          signature.append(j)
          break
    signatures.append(signature)
  return signatures
      

In [4]:
class MinHash:
  def __init__(self, num_perm):
    self.permutations = self._permutations_generate(num_perm)
  
  def _permutations_generate(self, num_perm)->list:
    permutations = []
    count = 0
    while count < num_perm:
      perm = random.sample(range(0, 255), 100)
      permutations.append(perm)
      count += 1
    return permutations
      
  def hash(self,array)->list:
    signature = []
    for permutation in self.permutations:
      for j in permutation:
        if(array[j] == 1):
          signature.append(j)
          break
    return signature
  
  def jaccard_similarity(self,signature1, signature2):
    intersection = np.sum(np.bitwise_and(signature1, signature2))
    union = np.sum(np.bitwise_or(signature1, signature2))
    return intersection / union 

In [26]:
class LSH:
    
    def __init__(self, threshold:float,num_hash_functions:int = 100,bands:int = 25, rows:int = 4):
      assert threshold > 0 and threshold < 1
      assert bands*rows == num_hash_functions
      self._threshold = threshold
      self._num_hash_functions = num_hash_functions
      self._bands = bands
      self._rows = rows
      self._min_hash = MinHash(num_hash_functions)
      self._buckets = {}
    
    def clear(self):
      self._buckets.clear()

    def search(self, fingerprints,name,time, add_to_bucket = True):
        min_hashes = self._min_hash.hash(fingerprints)
        band_hashes = []
        compared_sketches = set()
        
        if(len(min_hashes) != self._num_hash_functions):
          return None
        
        for i in range(self._bands):
          band_hashes.append(self._compute_band_hash(min_hashes, i))
          if band_hashes[i] in self._buckets:
            for sketch_to_check in self._buckets[band_hashes[i]]:
              check_key = ''.join(str(x) for x in sketch_to_check)
              if check_key not in compared_sketches:
                if self._min_hash.jaccard_similarity(sketch_to_check[0], min_hashes) >= self._threshold:
                  return (sketch_to_check[1],sketch_to_check[2])
                compared_sketches.add(check_key)
        if add_to_bucket:
          for i in range(self._bands):
            if band_hashes[i] not in self._buckets:
              self._buckets[band_hashes[i]] = []
            self._buckets[band_hashes[i]].append((min_hashes, name, time))
          
    def _compute_band_hash(self, min_hashes: list, i: int) -> str:
        """Compute a hash for quick bucket match search."""
        band_hash_list = []
        for j in range(self._rows):
            # Adding the rows corresponding to ith band
            band_hash_list.append('%02d' % min_hashes[i * self._rows + j])

        # Adding the number i to distinguish between bands
        band_hash_list.append('%02d' % i)
        return ''.join(band_hash_list)
      

In [6]:
def get_32bin_mel(samples):
  return librosa.feature.melspectrogram(y=samples, sr=5512,n_fft=2048, hop_length=64, n_mels=32)

In [7]:
def get_energy_spec(spec):
  # Transpose 'spec' upfront and get the shifted versions for calculations
  specT = spec.T
  specT_shifted_vertically = np.roll(specT, shift=-1, axis=0)
  specT_shifted_horizontally = np.roll(specT, shift=-1, axis=1)

  # Calculate the differences for current and previous rows
  diff_current = specT - specT_shifted_horizontally
  diff_previous = specT_shifted_vertically - np.roll(specT_shifted_horizontally, shift=-1, axis=0)

  # Compute the energy_spec matrix using NumPy's greater function for element-wise comparison
  energy_spec = np.greater(diff_current - diff_previous, 0).astype(int)
  return energy_spec.T

In [8]:
dataSet = libs.AudioDataSet.AudioDataset("../data/compressed_index/")




In [9]:
audio_1 = dataSet[0]


In [10]:
audio_2 = dataSet[198]

In [11]:
audio_3 = dataSet[1]

In [12]:
from moviepy.audio.AudioClip import AudioArrayClip
audio2 = AudioArrayClip(audio_2.T, fps=5512*2)
audio2.write_audiofile('test_2.wav')

MoviePy - Writing audio in test_2.wav


                                                                      

MoviePy - Done.




In [13]:
energy_spec1 = get_energy_spec(get_32bin_mel(audio_1[0]))
energy_spec2 = get_energy_spec(get_32bin_mel(audio_2[0]))

In [14]:
fingerprint1 = []
for i in range(0,energy_spec1.shape[1],128):
  fingerprint1.append(energy_spec1[:,i:i+128].flatten())
fingerprint2 = []
for i in range(0,energy_spec2.shape[1],128):
  fingerprint2.append(energy_spec2[:,i:i+128].flatten())


In [49]:
lsh = LSH(0.65)

In [50]:
for i,f in enumerate(fingerprint1):
  seconds = i*1.48
  minutes, seconds = divmod(seconds, 60)
  time_str = '{:02d}:{:02d}'.format(int(minutes), int(seconds))
  lsh.search(f,"First video",time_str)

In [51]:
for i,f in enumerate(fingerprint2):
  seconds = i*1.48
  minutes, seconds = divmod(seconds, 60)
  time_str = '{:02d}:{:02d}'.format(int(minutes), int(seconds))
  s = lsh.search(f,"Second video",time_str,add_to_bucket=False)
  if(s is not None):
    print("This time: ", time_str)
    print(s)

This time:  00:00
('First video', '21:03')
This time:  00:04
('First video', '04:48')
This time:  00:05
('First video', '02:08')
This time:  00:08
('First video', '00:34')
This time:  00:11
('First video', '20:46')
This time:  00:13
('First video', '03:53')
This time:  00:14
('First video', '26:39')
This time:  00:16
('First video', '02:10')
This time:  00:17
('First video', '15:35')
This time:  00:20
('First video', '13:53')
This time:  00:22
('First video', '08:01')
This time:  00:23
('First video', '00:19')
This time:  00:25
('First video', '24:37')
This time:  00:28
('First video', '22:22')
This time:  00:31
('First video', '02:11')
This time:  00:34
('First video', '14:33')
This time:  00:37
('First video', '06:24')
This time:  00:39
('First video', '00:17')
This time:  00:41
('First video', '23:05')
This time:  00:44
('First video', '00:44')
This time:  00:45
('First video', '07:12')
This time:  00:47
('First video', '00:17')
This time:  00:48
('First video', '03:09')
This time: 