In [None]:
import numpy as np
from scipy import signal
from scipy.fft import fft, ifft
import sounddevice as sd
import queue
import threading
from IPython.display import clear_output
import time

In [None]:
class BeatDetector:
    def __init__(self, sample_rate=48000):
        self.sample_rate = sample_rate
        self.window_size = int(2.2 * sample_rate)  # 2.2 seconds window
        self.min_bpm = 60
        self.max_bpm = 200
        
        self.frequency_bands = [
            (0, 200),
            (200, 400),
            (400, 800),
            (800, 1600),
            (1600, 3200),
            (3200, sample_rate//2)
        ]
        
        self.hanning_size = int(0.4 * sample_rate)
        self.hanning_window = signal.hann(self.hanning_size)
    
    def filter_bank(self, audio_data):
        fft_data = fft(audio_data)
        freq_step = self.sample_rate / len(audio_data)
        bands = []
        
        for low, high in self.frequency_bands:
            band_data = np.zeros_like(fft_data)
            low_bin = int(low / freq_step)
            high_bin = int(high / freq_step)
            
            band_data[low_bin:high_bin] = fft_data[low_bin:high_bin]
            band_data[-high_bin:-low_bin] = fft_data[-high_bin:-low_bin]
            
            band_signal = np.real(ifft(band_data))
            bands.append(band_signal)
            
        return bands
    
    def smooth_envelope(self, signal_data):
        rectified = np.abs(signal_data)
        smoothed = signal.convolve(rectified, self.hanning_window, mode='same')
        return smoothed / np.max(smoothed)
    
    def diff_rectify(self, signal_data):
        diff = np.diff(signal_data)
        return np.maximum(0, diff)
    
    def create_comb_filter(self, bpm, length):
        period = int(60.0 * self.sample_rate / bpm)
        comb = np.zeros(length)
        
        for i in range(3):
            if i * period < length:
                comb[i * period] = 1
        
        return comb
    
    def analyze_tempo(self, audio_data):
        if len(audio_data) > self.window_size:
            audio_data = audio_data[:self.window_size]
        elif len(audio_data) < self.window_size:
            audio_data = np.pad(audio_data, (0, self.window_size - len(audio_data)))
        
        bands = self.filter_bank(audio_data)
        
        processed_bands = []
        for band in bands:
            smoothed = self.smooth_envelope(band)
            diff_rect = self.diff_rectify(smoothed)
            processed_bands.append(diff_rect)
        
        tempo_energies = []
        for bpm in range(self.min_bpm, self.max_bpm + 1):
            comb = self.create_comb_filter(bpm, len(processed_bands[0]))
            
            total_energy = 0
            for band in processed_bands:
                convolved = signal.convolve(band, comb, mode='same')
                total_energy += np.sum(convolved ** 2)
            
            tempo_energies.append(total_energy)
        
        tempo_energies = np.array(tempo_energies)
        detected_bpm = self.min_bpm + np.argmax(tempo_energies)
        
        peak_energy = np.max(tempo_energies)
        mean_energy = np.mean(tempo_energies)
        confidence = (peak_energy - mean_energy) / peak_energy if peak_energy > 0 else 0
        
        return detected_bpm, confidence

In [None]:
class AudioInputStream:
    def __init__(self, sample_rate=48000, window_size=None):
        self.sample_rate = sample_rate
        self.window_size = window_size if window_size else int(2.2 * sample_rate)
        self.audio_queue = queue.Queue()
        self.stop_flag = False
        self.detector = BeatDetector(sample_rate=sample_rate)
        self.last_print_time = 0
        
    def audio_callback(self, indata, frames, time, status):
        if status:
            print(status)
        self.audio_queue.put(indata[:, 0])
    
    def process_audio(self):
        buffer = np.array([])
        
        while not self.stop_flag:
            try:
                new_data = self.audio_queue.get(timeout=1)
                buffer = np.append(buffer, new_data)
                
                if len(buffer) >= self.window_size:
                    current_time = time.time()
                    
                    # Only update display every 500ms
                    if current_time - self.last_print_time >= 0.5:
                        bpm, confidence = self.detector.analyze_tempo(buffer)
                        clear_output(wait=True)
                        print(f"BPM: {bpm:.0f} | Confidence: {confidence:.2f}")
                        self.last_print_time = current_time
                    
                    buffer = buffer[-self.window_size:]
            except queue.Empty:
                continue
    
    def start(self):
        self.stop_flag = False
        
        stream = sd.InputStream(
            channels=1,
            samplerate=self.sample_rate,
            callback=self.audio_callback
        )
        
        process_thread = threading.Thread(target=self.process_audio)
        
        with stream:
            process_thread.start()
            print("Listening for audio input... Press Ctrl+C to stop")
            
            try:
                while True:
                    time.sleep(0.1)
            except KeyboardInterrupt:
                self.stop()
                process_thread.join()
    
    def stop(self):
        self.stop_flag = True

In [None]:
# Run this cell to start audio input and BPM detection
audio_stream = AudioInputStream()
audio_stream.start()

BPM: 77 | Confidence: 0.30


: 