In [1]:
from music21 import *
import pandas as pd


# Parse the violin part from the MIDI file
def parse_instrument(midi_stream, desired_instrument):
    """
    Get a desired instrument part from a MIDI file.
    @param:
        - midi_stream: music21.stream.Score object, the MIDI file to parse
        - desired_instrument: Type of the desired instrument part to return (e.g., instrument.Piano)
    @return:
        - The part of the desired instrument if found, otherwise None
    """
    for part in midi_stream.parts:
        for elem in part.recurse():
            if isinstance(elem, desired_instrument):
                return part
    return None


def make_instrument_df(instrument_part):
    """
    Create a DataFrame from a music21 stream of an instrument part.
    @param:
        - instrument_part: music21.stream.Part object, the instrument part to convert
    @return:
        - A pd.DataFrame with columns for pitch, MIDI number, frequency, duration, 
          and start time (aka 'offset', the # quarter notes from the beginning of piece)
          
    """
    instrument_flat = instrument_part.flatten()

    rows = []
    for element in instrument_flat.notes:
        if isinstance(element, note.Note):
            row = [
                element.pitch, 
                element.pitch.midi, 
                element.pitch.frequency,
                element.duration.quarterLength, 
                element.offset]
            # print(f'row: {row}')
        elif isinstance(element, chord.Chord):
            chord_pitches = ', '.join(str(p) for p in element.pitches)
            midi_pitches = ', '.join(str(p.midi) for p in element.pitches)
            frequencies = ', '.join(str(p.frequency) for p in element.pitches)
            row = [
                chord_pitches, 
                midi_pitches, 
                frequencies,
                element.duration.quarterLength, 
                element.offset]
            # print(f'row: {row}')
        rows.append(row)

    instrument_df = pd.DataFrame(rows, columns=['pitch', 'midi', 'frequency', 'duration', 'start'])
    return instrument_df


# Load the MIDI file using music21
music21_midi = converter.parse('../resources/midifiles/mozart_vc4_mvt1.mid')
violin_part = parse_instrument(music21_midi, instrument.Violin)
violin21_df = make_instrument_df(violin_part)

# Inspect the created df
violin21_df

Unnamed: 0,pitch,midi,frequency,duration,start
0,E6,88,1318.510228,0.25,168.0
1,D6,86,1174.659072,0.75,168.25
2,D6,86,1174.659072,0.75,169.0
3,D6,86,1174.659072,0.25,169.75
4,D6,86,1174.659072,1.0,170.0
...,...,...,...,...,...
2155,"E6, F#6","88, 90","1318.5102276514808, 1479.977690846539",0.25,1067.0
2156,"E6, F#6","88, 90","1318.5102276514808, 1479.977690846539",0.25,1067.25
2157,"D6, G5","86, 79","1174.659071669631, 783.990871963499",0.5,1067.5
2158,E6,88,1318.510228,0.25,1067.75


In [2]:
import mido
import rtmidi
from mido import Message
import fluidsynth

class MidiSynthesizer:
    def __init__(self, soundfont_path):
        self.synth = fluidsynth.Synth()
        # Start the synthesizer with the appropriate audio driver
        self.synth.start(driver="coreaudio")  # Change to 'alsa', 'dsound', etc., based on your OS
        self.sf_id = self.load_soundfont(soundfont_path)
    
    def load_soundfont(self, soundfont_path):
        sf_id = self.synth.sfload(soundfont_path)
        if sf_id == -1:
            raise ValueError("Soundfont failed to load")
        return sf_id

    def program_select(self, channel, bank, preset):
        """
        Select the instrument program for a specific channel.

        @param:
            channel (int): MIDI channel (0-15)
            bank (int): SoundFont bank number
            preset (int): Preset number (program number) within the bank
        """
        self.synth.program_select(channel, self.sf_id, bank, preset)

    def play_note(self, channel, midi_number, velocity):
        """
        Play a note on a specific channel with given parameters.

        @param:
            - channel (int): MIDI channel on which to play the note
            - midi_number (int): MIDI number of the note to play
            - velocity (int): Velocity of the note (controls volume and articulation)
        """
        self.synth.noteon(channel, midi_number, velocity)

    def stop_note(self, channel, midi_number):
        """
        Stop a note on a specific channel.

        @param:
            - channel (int): MIDI channel on which to stop the note
            - midi_number (int): MIDI number of the note to stop
        """
        self.synth.noteoff(channel, midi_number)

    def stop(self):
        """
        Stop the synthesizer and clean up resources.
        """
        self.synth.delete()

    def handle_midi_message(self, msg: Message):
        """Handle different types of MIDI messages."""
        if msg.type == 'note_on':
            self.play_note(msg.channel, msg.note, msg.velocity)
        elif msg.type == 'note_off':
            self.stop_note(msg.channel, msg.note)
        elif msg.type == 'program_change':
            self.program_select(msg.channel, 0, msg.program)

In [3]:
midi_path = '../resources/midifiles/mozart_vc4_mvt1.mid'
soundfont_path = '../resources/soundfonts/MuseScore_General.sf3'

synth = MidiSynthesizer(soundfont_path)

# Function to play MIDI file
def play_midi_file(midi_file, synthesizer):
    midi_data = mido.MidiFile(midi_file)
    for msg in midi_data.play():
        if not msg.is_meta:
            print(dir(msg))
            synthesizer.handle_midi_message(msg)

# Example usage
# play_midi_file(midi_path, synth)

In [4]:
def create_msg_dict(midi_file):
    """
    Create dictionaries storing all messages and program controls for 
    the MIDI, assuming program is tied to the same channel throughout the piece
    @param:
        - midi_file: str, path to the MIDI file
    @return:
        - message_dict: dict, keys are elapsed time (in sec) and values are 
                        lists of messages all occuring at that time
            -> dict, {elapsed_time: [msg1, msg2, ...]}
        - program_dict: dict, with keys as channel number and values 
                        are a message of type 'program change'
            -> {channel: program_msg}
    """
    midi_data = mido.MidiFile(midi_file)
    message_dict = {}
    program_dict = {}  # Initialize program_dict
    elapsed_time = 0
    for msg in midi_data:
        if not msg.is_meta:
            elapsed_time += msg.time
            if elapsed_time not in message_dict:
                message_dict[elapsed_time] = []
            message_dict[elapsed_time].append(msg)

            # Check if the message is a 'program_change' message
            if msg.type == 'program_change':
                program_dict[msg.channel] = msg
    return message_dict, program_dict

midi_path = '../resources/midifiles/mozart_vc4_mvt1.mid'
message_dict, program_dict = create_msg_dict(midi_path)

In [10]:
message_dict

{0: [Message('control_change', channel=0, control=0, value=0, time=0),
  Message('program_change', channel=0, program=40, time=0),
  Message('control_change', channel=0, control=10, value=67, time=0),
  Message('control_change', channel=0, control=91, value=66, time=0),
  Message('control_change', channel=0, control=93, value=32, time=0),
  Message('control_change', channel=0, control=121, value=0, time=0),
  Message('control_change', channel=1, control=0, value=0, time=0),
  Message('program_change', channel=1, program=48, time=0),
  Message('control_change', channel=1, control=10, value=45, time=0),
  Message('control_change', channel=1, control=91, value=66, time=0),
  Message('control_change', channel=1, control=93, value=2, time=0),
  Message('control_change', channel=1, control=121, value=0, time=0),
  Message('control_change', channel=2, control=0, value=0, time=0),
  Message('program_change', channel=2, program=48, time=0),
  Message('control_change', channel=2, control=10, val

In [13]:
import pandas as pd
from mido import Message

def make_pitchdf(message_dict):
    """
    Create a DataFrame from a dictionary of MIDI messages, calculating the duration of each note.
    @param:
        - message_dict: dict, keys are elapsed time (in sec) and values are 
                        lists of messages all occurring at that time
            -> dict, {elapsed_time: [msg1, msg2, ...]}
    @return:
        - pd.DataFrame with columns for start time, pitch, MIDI number, velocity, and duration
    """
    note_start_times = {}  # Dictionary to keep track of note start times
    rows = []  # List to store note details including calculated duration

    for elapsed_time, messages in message_dict.items():
        for msg in messages:
            # Check if the message is a note-related message before accessing the note attribute
            if msg.type in ['note_on', 'note_off']:
                key = (msg.channel, msg.note)  # Unique key for each note
                if msg.type == 'note_on' and msg.velocity > 0:
                    # Record the start time of the note
                    velocity = msg.velocity
                    note_start_times[key] = (elapsed_time, velocity)
                elif msg.type == 'note_off' or (msg.type == 'note_on' and msg.velocity == 0):
                    # Calculate duration and prepare the row for DataFrame
                    if key in note_start_times:
                        start_time, velocity = note_start_times[key]
                        duration = elapsed_time - start_time
                        row = [start_time, msg.channel, msg.note, velocity, duration]
                        rows.append(row)
                        del note_start_times[key]  # Remove the note from start times

    # Create DataFrame from the rows
    pitch_df = pd.DataFrame(rows, columns=['start', 'channel', 'pitch', 'velocity', 'duration'])
    return pitch_df

# Example usage
pitch_df = make_pitchdf(message_dict)

In [14]:
pitch_df

Unnamed: 0,start,channel,pitch,velocity,duration
0,2.067617,1,62,96,0.506465
1,2.067617,2,50,93,0.506465
2,2.067617,2,38,93,0.506465
3,2.584858,1,62,96,0.377155
4,2.584858,2,50,93,0.377155
...,...,...,...,...,...
6623,572.474249,2,50,93,0.167151
6624,572.474249,2,38,93,0.167151
6625,572.648668,1,62,96,0.690407
6626,572.648668,2,50,93,0.690407


In [4]:
import time

def play_from_msg_dict(message_dict, program_dict, synthesizer, start_from=0):

    for channel, program_msg in program_dict.items():
        synthesizer.program_select(program_msg.channel, 0, program_msg.program)
        print(f'Set program {program_msg.program} on channel {program_msg.channel}')
    
    # Assuming messages are sorted by time, no need to sort again
    times = list(message_dict.keys())
    
    # Find the index to start from based on the start_from time
    start_index = next((i for i, t in enumerate(times) if t >= start_from), None)
    if start_index is None:
        print("Start time is beyond the last message. Exiting.")
        return
    
    for i in range(start_index, len(times)):
        start_time = times[i]
        messages = message_dict[start_time]
        for msg in messages:
            synthesizer.handle_midi_message(msg)
        if i + 1 < len(times):
            next_start_time = times[i + 1]
            sleep_duration = next_start_time - start_time
            time.sleep(sleep_duration)

# Example usage
# play_from_msg_dict(message_dict, program_dict, synth, start_from=10)
# synth.stop()

In [None]:
slower_message_dict = {k * 2: v for k, v in message_dict.items()}
play_from_msg_dict(slower_message_dict, program_dict, synth, start_from=0)

In [None]:
def play_channels(message_dict, program_dict, synthesizer, start_from=0, channels=None):

    for channel, program_msg in program_dict.items():
        synthesizer.program_select(program_msg.channel, 0, program_msg.program)
        print(f'Set program {program_msg.program} on channel {program_msg.channel}')
    
    # Assuming messages are sorted by time, no need to sort again
    times = list(message_dict.keys())
    
    # Find the index to start from based on the start_from time
    start_index = next((i for i, t in enumerate(times) if t >= start_from), None)
    if start_index is None:
        print("Start time is beyond the last message. Exiting.")
        return
    
    for i in range(start_index, len(times)):
        start_time = times[i]
        messages = message_dict[start_time]
        for msg in messages:
            if msg.channel in channels:
                synthesizer.handle_midi_message(msg)
        if i + 1 < len(times):
            next_start_time = times[i + 1]
            sleep_duration = next_start_time - start_time
            time.sleep(sleep_duration)

# Example usage
play_channels(message_dict, program_dict, synth, start_from=30, channels=[1, 2])

In [6]:
synth.stop()

In [7]:
import sounddevice as sd
import numpy as np
import time

class AudioRecorder:
    def __init__(self, samplerate=44100, channels=1):
        """Audio recorder class for storing multiple audio recording chunks in a dict"""
        # Config variables
        self.samplerate = samplerate
        self.channels = channels

        # Recording dict to store audio chunks in
        # {tuple(start_time, end_time): np.array(audio_chunk)}
        self.recordings = {} 

        # Initialize recording variables
        self.start_time = None
        self.current_recording = None

    def start_recording(self):
        """Start appending audio chunks to current_recording list"""
        self.start_time = time.time()
        self.current_recording = []
        print("Recording started...")

    def stop_recording(self):
        """Append current_recording to recordings dict and reset variables"""
        if self.start_time is not None:
            end_time = time.time()
            recording_duration = end_time - self.start_time
            recording_key = (self.start_time, end_time)
            self.recordings[recording_key] = np.concatenate(self.current_recording, axis=0)
            self.start_time = None
            self.current_recording = None
            print(f"Recording stopped. Duration: {recording_duration:.2f} seconds")

    def record_callback(self, indata, frames, time, status):
        if self.current_recording is not None:
            self.current_recording.append(indata.copy())

    def start(self):
        """Initialize input stream (not necessarily recording yet)"""
        self.stream = sd.InputStream(samplerate=self.samplerate, channels=self.channels, callback=self.record_callback)
        self.stream.start()

    def stop(self):
        self.stream.stop()
        self.stream.close()

    def play_audio(self, start_time, end_time):
        for key in self.recordings:
            if key[0] <= start_time < key[1] or key[0] < end_time <= key[1] or (start_time <= key[0] and end_time >= key[1]):
                chunk = self.recordings[key]
                sd.play(chunk, samplerate=self.samplerate)
                sd.wait()

    def get_recordings(self):
        return self.recordings

# Usage example
recorder = AudioRecorder()
recorder.start()

# Start recording
recorder.start_recording()
time.sleep(5)  # Record for 5 seconds
recorder.stop_recording()

# Stop the input stream
recorder.stop()

# Play back the recorded audio
recordings = recorder.get_recordings()
for key in recordings:
    print(f"Playing back recording from {key[0]} to {key[1]}")
    recorder.play_audio(key[0], key[1])


Recording started...
Recording stopped. Duration: 5.01 seconds
Playing back recording from 1719357695.4525921 to 1719357700.4586828
