<a href="https://colab.research.google.com/github/alinamuliak/MiniShazam/blob/main/MiniShazam_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install essentia
!pip install mir_eval

In [None]:
import essentia
import essentia.standard as es
import essentia.streaming as ess
import matplotlib.pyplot as plt
import numpy as np
from IPython.display import Audio
from mir_eval.sonify import pitch_contour
import IPython
%matplotlib inline
from pylab import plot, show, figure, imshow

In [None]:
# audio_file = "carry_me_away.wav"

# Audio(audio_file)

In [None]:
def detect_bpm(audio_filename: str) -> float:
  sampling_rate = 44100
  audio = es.MonoLoader(filename=audio_filename, sampleRate=sampling_rate)()
  bpm = es.PercivalBpmEstimator()(audio)
  return bpm
  

def extract_pitches(audio_filename: str) -> tuple:
  audio = loader = es.EqloudLoader(filename=audio_filename, sampleRate=44100)()
  pitch_extractor = es.PredominantPitchMelodia(frameSize=2048, hopSize=128)
  pitch_values, pitch_confidence = pitch_extractor(audio)

  pitch_times = np.linspace(0.0, len(audio)/44100, len(pitch_values))
  return pitch_times, pitch_values, pitch_confidence


def plot_pitch_curve(pitch_attributes) -> None:
  pitch_times, pitch_values, pitch_confidence = pitch_attributes
  f, axarr = plt.subplots(2, sharex=True)
  axarr[0].plot(pitch_times, pitch_values)
  axarr[0].set_title('estimated pitch [Hz]')
  axarr[1].plot(pitch_times, pitch_confidence)
  axarr[1].set_title('pitch confidence')
  plt.show()


def compose_determined_pitch(pitch_times, pitch_values):
  synthesized_melody = pitch_contour(pitch_times, pitch_values, 44100).astype(np.float32)
  es.AudioWriter(filename='test.mp3', format='mp3')(es.StereoMuxer()([0 for _ in range(len(synthesized_melody))], synthesized_melody))
  display(Audio('test.mp3'))


### Onset detection and (Pitch, Rhythm) pair creation

Once the onsets are detected, we can form $<\text{Pitch}, \text{Rhythm}>$ pairs to be used in matching later. Each $\text{Rhythm}$ component is the $\log(\text{IOI})$, a logarithm of an inter-onset interval, while the corresponding $\text{Pitch}$ is the average value of pitches that have been detected and belong to that interval.

In [None]:
from math import log2

def get_note_number(pitch: float) -> int:
  if pitch == 0:
    return 0
  n = 12 * log2(pitch/440) + 49
  if 1 <= n <= 88:
    return round(n)
  return 0

def pitches_per_interval(audio, pitch_values: list, onsets: list, sampling_rate=44100) -> list:
  """
  Calculate the number of pitches contained between two consecutive onsets.
  """
  num_of_pitches = []
  for oc in onsets:
    num_of_pitches.append(int(oc * len(pitch_values) / int(len(audio)/sampling_rate)))

  num_of_pitches.append(len(pitch_values))
  
  return num_of_pitches


def average_per_interval(pitch_values: list, pitches_per_interval: list, onsets: list) -> list:
  """
  Calculate the average of pitches contained between two consecutive onsets
  and convert the note value to a number.
  """
  avg_per_interval = []
  for i in range(len(onsets) - 1):
    avg_pitch = sum(pitch_values[pitches_per_interval[i]:pitches_per_interval[i+1]])/(pitches_per_interval[i+1]-pitches_per_interval[i])
    # debug print
    # print(avg_pitch, get_note_number(avg_pitch))
    avg_per_interval.append(get_note_number(avg_pitch))

  return avg_per_interval


def log_ioi(onsets: list, duration) -> list:
  """
  Calculate log(IOI), the logarithm of time between the two adjacent onsets.
  For the last note, its duration is taken as its IOI.
  """
  log_ioi = []
  for i in range(len(onsets) - 1):
    log_ioi.append(round(np.log(onsets[i+1] - onsets[i])))
  
  log_ioi.append(duration - onsets[i])
  
  return log_ioi


def create_pitch_rhythm_pairs(avg_pitch_values: list, log_ioi_values: list) -> list:
  """
  Create and return an array of <pitch, rhythm> pairs.
  """
  return [el for el in zip(avg_pitch_values, log_ioi_values)]


def find_relative_pitch(avg_pitch_values: list) -> list:
  """
  Create and return an array of relative <pitch, rhythm> pairs.
  """
  pitch_change = 0
  result = []
  for i in range(len(avg_pitch_values) - 1):
    pitch_change = -1 * (avg_pitch_values[i] - avg_pitch_values[i + 1])
    if pitch_change == 0 or abs(pitch_change) >= 22:
      continue
    result.append(pitch_change)
  return result

In [None]:
def detect_complex_onsets(audio) -> list:
  """
  This function detects onsets of an audio using complex method and returns it as a list.
  """
  od = es.OnsetDetection(method='complex')

  # Let's also get the other algorithms we will need, and a pool to store the results
  w = es.Windowing(type = 'hann')
  fft = es.FFT() # this gives us a complex FFT
  c2p = es.CartesianToPolar() # and this turns it into a pair (magnitude, phase)
  pool = essentia.Pool()

  # Computing onset detection functions.
  for frame in es.FrameGenerator(audio, frameSize = 1024, hopSize = 512):
      mag, phase, = c2p(fft(w(frame)))
      pool.add('features.complex', od(mag, phase))

# Compute the actual onsets locations
  onsets = es.Onsets()

  onsets_complex = onsets(essentia.array([ pool['features.complex'] ]), [ 1 ])

  return onsets_complex

### Usage

In [None]:
sampling_rate = 44100
# audio 1
audio = es.MonoLoader(filename=audio_file, sampleRate=sampling_rate)()
pitch_times, pitch_values, pitch_confidence = extract_pitches(audio_file)
onsets = detect_complex_onsets(audio)

num_of_pitches = pitches_per_interval(audio, pitch_values, onsets, sampling_rate)
avg = average_per_interval(pitch_values, num_of_pitches, onsets)
ioi = log_ioi(onsets, len(audio)/sampling_rate)
carry_me_away = find_relative_pitch(avg)

RuntimeError: ignored

## Matching

In [None]:
def distance(song: list, input: list) -> int:
  """
  Calculates and returns the maximum cross corelation value
  of the given two lists.
  """
  return np.max(np.correlate(song, input, 'full'))


def calculate_min_distance(song_pitches: list, user_pitches: list) -> int:
  """
  Calculates and returns the minimum Hamming distance between user input and
  all corresponding song intervals.
  """
  min_distance = 0
  for i in range(len(user_pitches), len(song_pitches) + 1, 2):
    min_distance = max(distance(song_pitches[i - len(user_pitches):i], user_pitches), min_distance)
  return min_distance

In [None]:
def match_song(database: list, user_input: list) -> str:
  """
  Finds the closest song that match user input.
  Return the name of the matched song.

  Database should consist of tuples with the following song info
  in the exact order: (<song name>, <song tempo>, <list of song's relative pitches>).
  User input should be the following: [<query tempo>, <query replative pitches>].
  """
  match = None
  min_distance = 0
  user_bpm, user_pitches = user_input
  for song in database:
    song_name, song_bpm, song_pitches = song
    print(song_name)
    if user_bpm > 2 * song_bpm or user_bpm < song_bpm / 2:
      print("tempo miss")
      continue
    distance = calculate_min_distance(song_pitches, user_pitches)
    print(f" dist: {distance}")
    if distance > min_distance:
      min_distance = distance
      match = song_name
  if match == None:
    print("couldn't find a match:( try learning how to sing;)") # я токсік
  return match

## Database creation and editing

In [None]:
def create_database(audio_filenames: list, sampling_rate = 44100) -> list:
  """
  Create a database consisting of all songs from the input list.
  The song in database contain infornation such as:
  - song name
  - song tempo
  - list of relative pitches
  """
  database = []
  for song_name in audio_filenames:
    audio = es.MonoLoader(filename=song_name, sampleRate=sampling_rate)()
    pitch_times, pitch_values, pitch_confidence = extract_pitches(song_name)
    onsets = detect_complex_onsets(audio)

    num_of_pitches = pitches_per_interval(audio, pitch_values, onsets, sampling_rate)
    avg = average_per_interval(pitch_values, num_of_pitches, onsets)
    ioi = log_ioi(onsets, len(audio)/sampling_rate)
    song_pitches = find_relative_pitch(avg)

    database.append((song_name, detect_bpm(song_name), song_pitches))
  return database


def add_songs(database: list, songs_to_add: list) -> None:
  """
  Add songs to a database inplace.
  """
  songs_with_all_info = create_database(songs_to_add)
  for new_song in songs_with_all_info:
    database.append(new_song)

# отут ще не знаю, як там і що для ріал-тайм інпуту, треба тестити
def process_user_input():
  return None

## Some examples

In [None]:
database = create_database(['carry_me_away.wav', "You're Gonna Live Forever In Me.mp3"])


user_file = 'carry_me_away_detected_pitch.mp3'

audio = es.MonoLoader(filename=user_file, sampleRate=sampling_rate)()
pitch_times, pitch_values, pitch_confidence = extract_pitches(user_file)
onsets = detect_complex_onsets(audio)

num_of_pitches = pitches_per_interval(audio, pitch_values, onsets, sampling_rate)
avg = average_per_interval(pitch_values, num_of_pitches, onsets)
ioi = log_ioi(onsets, len(audio)/sampling_rate)

user_pitches = find_relative_pitch(avg)
user_input = [detect_bpm(user_file), user_pitches]

match_song(database, user_input)

## Testing

In [None]:
# create song database
song_names = ["Stefania.mp3", "Полуничне небо.mp3", "Танець на стінах.mp3"]
db = create_database(song_names)

In [None]:
# process user input
user_file = "пролог.ogg"
sampling_rate = 44100
audio = es.MonoLoader(filename=user_file, sampleRate=sampling_rate)()
pitch_times, pitch_values, pitch_confidence = extract_pitches(user_file)
onsets = detect_complex_onsets(audio)
compose_determined_pitch(pitch_times, pitch_values)

num_of_pitches = pitches_per_interval(audio, pitch_values, onsets, sampling_rate)
avg = average_per_interval(pitch_values, num_of_pitches, onsets)
ioi = log_ioi(onsets, len(audio)/sampling_rate)

user_pitches = find_relative_pitch(avg)
user_input = [detect_bpm(user_file), user_pitches]
# print(user_pitches)

In [None]:
match_song(db, user_input)

Stefania.mp3
 dist: 1790
Полуничне небо.mp3
 dist: 1834
Пролог.mp3
 dist: 1716
Танець на стінах.mp3
 dist: 1857


'Танець на стінах.mp3'

### Real-time recording and playing audios.

To make our system convenient to use, we are planning to implement live humming so that the user can sing or play a melody right to their computer.

*small coment: можна було б подивитися. якщо програвати звук у пайтоні не складно, то перед записом зробити типу
звук пауза звук пауза звук пауза рекондінг. ну типу дати 3 секунди до запису*

In [None]:
!sudo apt-get install libportaudio2
!pip install sounddevice

Reading package lists... Done
Building dependency tree       
Reading state information... Done
libportaudio2 is already the newest version (19.6.0-1).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'sudo apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 42 not upgraded.
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import sounddevice as sd
from scipy.io.wavfile import read, write
import numpy as np
from time import sleep

In [None]:
#playing a sample audio file

fs, array = read("00014.wav")
# print(array, fs)

sd.play(array, fs)
sd.wait()

FileNotFoundError: ignored

In [None]:
# live recording

fs=8000
duration = 15 # seconds
print("Recording...")
recording = sd.rec(int(duration * fs), samplerate=fs, channels=2)
sd.wait()
print("Recording finished.")

Recording...


PortAudioError: ignored

In [None]:
# playing and saving the recording

print("Playing...")
sd.play(recording, fs)
sd.wait()

scaled_recording = np.int16(recording/np.max(np.abs(recording)) * 32767)
write("output.wav", 8000, scaled_recording)

Of course, the search will be successful if the required melody exists in the database. By the way, this is a disadvantage of the majority of commercial QBH systems developed so far. For example, SoundHound requires users' contributions, and it is impossible to find a song if no one adds it manually to the database.