In [1]:
# https://stackoverflow.com/questions/59056786/python-librosa-with-microphone-input

import sys
sys.path.insert(0, '../basic-pitch-modified')
sys.path.insert(0, '../Prototype')

import numpy as np
import pyaudio
import time
import librosa
import threading
import tensorflow as tf

import os
import pandas as pd
import audb
import audiofile
import opensmile

from utils.midi import *

from basic_pitch_modified.inference import predict_pyaudio
from basic_pitch_modified import ICASSP_2022_MODEL_PATH

SAMPLE_RATE = 44100
INPUT_CHANNELS = 1

In [2]:
'''
This class is a template class for a thread that reads in audio from PyAudio.
'''

class AudioThread(threading.Thread):
    def __init__(self, name, starting_chunk_size, process_func, args_before, args_after):
        """
        Initializes an AudioThread.
        Parameters:
            name: the name of the thread
            starting_chunk_size: an integer representing the chunk size in samples
            process_func: the function to be called as a callback when new audio is received from PyAudio
            args_before: a tuple of arguments for process_func to be put before the sound array
            args_after: a tuple of arguments for process_func to be put after the sound array
        Returns: nothing
        """
        super(AudioThread, self).__init__()
        self.name = name    # General imports
        self.process_func = process_func
        self.args_before = args_before
        self.args_after = args_after
        
        self.p = None    # PyAudio vals
        self.stream = None
        self.FORMAT = pyaudio.paFloat32
        self.CHANNELS = INPUT_CHANNELS
        self.RATE = SAMPLE_RATE
        self.CHUNK = starting_chunk_size * 2

        self.max_time = 0    # Data storage and analytics
        self.data = None
        
    def set_args_before(a):
        """
        Changes the arguments before the sound array when process_func is called.
        Parameters: a: the arguments
        Returns: nothing
        """
        self.args_before = a
    
    def set_args_after(a):
        """
        Changes the arguments after the sound array when process_func is called.
        Parameters: a: the arguments
        Returns: nothing
        """
        self.args_after = a
    
    def run(self):
        """
        When the thread is started, this function is called which opens the PyAudio object
        and keeps the thread alive.
        Parameters: nothing
        Returns: nothing
        """
        self.p = pyaudio.PyAudio()
        self.stream = self.p.open(format=self.FORMAT,
                                  channels=self.CHANNELS,
                                  rate=self.RATE,
                                  input=True,
                                  output=False,
                                  stream_callback=self.callback,
                                  frames_per_buffer=self.CHUNK)
        while (self.is_alive()):
                time.sleep(1.0)
            
    def stop(self):
        """
        When the thread is stopped, this function is called which closes the PyAudio object
        Parameters: nothing
        Returns: nothing
        """
        self.stream.stop_stream()
        self.stream.close()
        self.p.terminate()

    def callback(self, in_data, frame_count, time_info, flag):
        """
        This function is called whenever PyAudio recieves new audio. It calls process_func to process the sound data
        and stores the result in the field "data".
        This function should never be called directly.
        Parameters: none user-exposed
        Returns: nothing of importance to the user
        """
        numpy_array = np.frombuffer(in_data, dtype=np.float32)
        start_time = time.process_time()
        self.data = self.process_func(*self.args_before, numpy_array, *self.args_after)
        end_time = time.process_time()
        elapsed_time = end_time - start_time
        if (elapsed_time > self.max_time):
            self.max_time = elapsed_time
        return None, pyaudio.paContinue

In [3]:
'''
This class is a thread class that computes OpenSmile features in real time.
'''

class SmileThread(AudioThread):
    db = audb.load('emodb',
        version='1.1.1',
        format='wav',
        mixdown=True,
        sampling_rate=16000,
        media='wav/03a01.*',  # load subset
        full_path=False,
        verbose=False,
    )
    
    """
    This function is called whenever the internal AudioThread gets new audio.
    It sends the signal to OpenSMILE to be processed by that library.
    This function should never be called directly.
    Parameters: the signal to be processed
    Returns: the feature set from OpenSMILE
    """
    def process(self, signal):
        return self.smile.process_signal(signal, self.RATE)
    
    """
    This function is called when a SmileThread is created. It sets OpenSMILE parameters as well as the
    starting chunk size of the internal AudioThread.
    Parameters:
        name: the name of the thread
        starting_chunk_size: the input chunk_size for the internal AudioThread
        F_SET: the OpenSMILE feature set to use
        F_LEVEL: the OpenSMILE feature level to use
    Returns: nothing
    """        
    def __init__(self, name, starting_chunk_size, 
                 F_SET = opensmile.FeatureSet.emobase, 
                 F_LEVEL = opensmile.FeatureLevel.Functionals):
        self.smile = smile = opensmile.Smile(
                                    feature_set = F_SET,
                                    feature_level = F_LEVEL,
                                )
        self.RATE = SAMPLE_RATE

        super().__init__(name, starting_chunk_size, self.process, (), ())

In [4]:
'''
This class is a thread class that computes Basic Pitch notes in real time.
'''

class BasicPitchThread(AudioThread):
    basic_pitch_model = tf.saved_model.load(str(ICASSP_2022_MODEL_PATH))
    
    """
    This function is called whenever the internal AudioThread gets new audio.
    It sends the signal to Basic Pitch to be processed by that library.
    This function should never be called directly.
    Parameters: the signal to be processed
    Returns: the MIDI output from Basic Pitch
    """
    def process(self, signal):
        model_output, midi_data, note_events = predict_pyaudio(
            signal,
            BasicPitchThread.basic_pitch_model
        )
        return midi_data
    
    """
    This function is called when a BasicPitchThread is created. It sets the starting chunk size of the internal AudioThread.
    Parameters:
        name: the name of the thread
        starting_chunk_size: the input chunk_size for the internal AudioThread
    Returns: nothing
    """ 
    def __init__(self, name, starting_chunk_size):
        super().__init__(name, starting_chunk_size, self.process, (), ())

In [5]:
'''
This class is a thread class that predicts the genre of input notes in real time.
'''

class GenrePredictorThread(threading.Thread):
    genre_model = tf.keras.models.load_model('../Prototype/utils/model.h5')
    
    """
    This function is called when a GenrePredictorThread is created. It sets the starting chunk size 
    of the internal AudioThread as well as the BasicPitchThread to grab MIDI data from.
    Parameters:
        name: the name of the thread
        starting_chunk_size: the input chunk_size for the internal AudioThread
        BP_Thread: a reference to the BasicPitchThread to use
    Returns: nothing
    """ 
    def __init__(self, name, BP_Thread):
        super(GenrePredictorThread, self).__init__()
        self.BP_Thread = BP_Thread
        self.data = None
    
    """
    When the thread is started, this function is called which repeatedly grabs the most recent
    MIDI data from the BasicPitchThread, predicts its genre, and stores it in the data field.
    Parameters: nothing
    Returns: nothing
    """
    def run(self):
        while type(BP_Thread.data) == NoneType:
            time.sleep(0.2)
        while(self.is_active()):
            midi_data = BP_Thread.data
            if len(thread.data.instruments) != 0: 
                midi_features = get_features(midi_obj)
                subgenre_num = model.predict(midi_features)
                data = get_subgenre(np.argmax(subgenre_num))

In [9]:
BP_Thread = BasicPitchThread(name = "bp", starting_chunk_size = 8192)
BP_Thread.start()
time.sleep(1)
while True:
    if len(BP_Thread.data.instruments) != 0:
        print(BP_Thread.data.instruments[0].notes)
    time.sleep(0.3)

[Note(start=0.011610, end=0.185760, pitch=31, velocity=45), Note(start=0.092880, end=0.650159, pitch=38, velocity=56)]
[Note(start=0.011610, end=0.162540, pitch=38, velocity=85), Note(start=0.174150, end=0.476009, pitch=38, velocity=78), Note(start=0.476009, end=0.684989, pitch=38, velocity=66)]
[Note(start=0.034830, end=0.661769, pitch=38, velocity=79)]
[Note(start=0.034830, end=0.661769, pitch=38, velocity=79)]
[Note(start=0.046440, end=0.673379, pitch=38, velocity=65)]
[Note(start=0.023220, end=0.510839, pitch=38, velocity=76), Note(start=0.510839, end=0.719819, pitch=38, velocity=67)]
[Note(start=0.011610, end=0.708209, pitch=38, velocity=73)]
[Note(start=0.081270, end=0.243810, pitch=38, velocity=82), Note(start=0.243810, end=0.719819, pitch=38, velocity=73)]
[Note(start=0.034830, end=0.719819, pitch=38, velocity=74)]
[Note(start=0.023220, end=0.243810, pitch=38, velocity=66), Note(start=0.243810, end=0.394739, pitch=38, velocity=73), Note(start=0.394739, end=0.719819, pitch=38, v

KeyboardInterrupt: 