In [1]:
# Install required packages
# !pip install sounddevice numpy scipy matplotlib ipywidgets

import sounddevice as sd
import numpy as np
import scipy.io.wavfile as wav
import time
import os
import datetime
import threading
import ipywidgets as widgets
from IPython.display import display
import matplotlib.pyplot as plt
from scipy import signal

class SleepMonitor:
    def __init__(self, output_dir="sleep_recordings", 
                 channels=1,
                 rate=16000, 
                 chunk_seconds=1,
                 calibration_time=5):
        
        self.output_dir = output_dir
        self.channels = channels
        self.rate = rate
        self.chunk_seconds = chunk_seconds
        self.calibration_time = calibration_time
        
        self.background_noise_level = None
        self.recording = False
        self.frames = []
        self.start_time = None
        
        # Create output directory if it doesn't exist
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
            
        # Get list of available devices
        try:
            self.devices = sd.query_devices()
            print("Available audio devices:")
            for i, device in enumerate(self.devices):
                print(f"{i}: {device['name']} (inputs: {device['max_input_channels']})")
                
            # Select default input device
            self.device = None
            for i, device in enumerate(self.devices):
                if device['max_input_channels'] > 0 and device == sd.default.device[0]:
                    self.device = i
                    print(f"Default input device selected: {device['name']}")
                    break
                    
            if self.device is None and any(d['max_input_channels'] > 0 for d in self.devices):
                # Select first available input device
                for i, device in enumerate(self.devices):
                    if device['max_input_channels'] > 0:
                        self.device = i
                        print(f"Selected input device: {device['name']}")
                        break
        except Exception as e:
            print(f"Error querying devices: {e}")
            self.device = None
            
    def set_device(self, device_id):
        """Set the recording device to use"""
        if device_id >= 0 and device_id < len(self.devices):
            if self.devices[device_id]['max_input_channels'] > 0:
                self.device = device_id
                print(f"Selected device: {self.devices[device_id]['name']}")
            else:
                print(f"Device {device_id} has no input channels")
        else:
            print("Invalid device ID")
            
    def calibrate_noise(self):
        """Calibrate for background noise by listening for a few seconds"""
        print("Calibrating for background noise... please ensure only regular background noise is present")
        
        # Record audio for calibration
        try:
            recording = sd.rec(
                int(self.rate * self.calibration_time),
                samplerate=self.rate,
                channels=self.channels,
                device=self.device,
                blocking=True
            )
            
            # Calculate RMS of the background noise
            if len(recording) > 0:
                rms = np.sqrt(np.mean(np.square(recording)))
                self.background_noise_level = rms
                print(f"Background noise level: {rms:.6f}")
                return True
            else:
                print("No audio recorded during calibration")
                return False
                
        except Exception as e:
            print(f"Calibration error: {e}")
            return False
            
    def audio_callback(self, indata, frames, time, status):
        """This function is called for each audio block"""
        if status:
            print(status)
        self.frames.append(indata.copy())
        
        # Process the current chunk for visualization/analysis
        if len(indata) > 0:
            # Calculate RMS for the current chunk
            rms = np.sqrt(np.mean(np.square(indata)))
            
            # Calculate time elapsed
            elapsed = time.currentTime - self.start_time_sec
            
            if self.background_noise_level and rms > self.background_noise_level * 1.5:
                # Log significant sound event
                timestamp = datetime.datetime.now().strftime("%H:%M:%S")
                print(f"Sound event detected at {timestamp} (level: {rms:.6f})")
            
    def start_recording(self):
        if self.recording:
            print("Already recording!")
            return
        
        if self.device is None:
            print("No recording device selected!")
            return
            
        # First calibrate
        if not self.calibrate_noise():
            print("Calibration failed, using default threshold")
        
        print("Starting sleep monitoring...")
        self.recording = True
        self.frames = []
        
        # Set up stream
        try:
            self.start_time = datetime.datetime.now()
            self.start_time_sec = time.time()
            
            # Start the stream
            self.stream = sd.InputStream(
                samplerate=self.rate,
                channels=self.channels,
                device=self.device,
                callback=self.audio_callback
            )
            self.stream.start()
            
            # Start a separate thread to handle saving recordings
            self.save_thread = threading.Thread(target=self._save_recordings)
            self.save_thread.daemon = True
            self.save_thread.start()
            
            print("Sleep monitoring started. Use the Stop button to end recording.")
            
        except Exception as e:
            self.recording = False
            print(f"Error starting recording: {e}")
            
    def stop_recording(self):
        if not self.recording:
            print("Not currently recording!")
            return
            
        self.recording = False
        
        try:
            # Stop the stream
            if hasattr(self, 'stream'):
                self.stream.stop()
                self.stream.close()
                
            # Wait for save thread to complete
            if hasattr(self, 'save_thread'):
                self.save_thread.join(timeout=5)
                
            # Save any remaining frames
            self._final_save()
            
            print("Sleep monitoring stopped.")
            
        except Exception as e:
            print(f"Error stopping recording: {e}")
            
    def _save_recordings(self):
        """Thread function to periodically save audio to files"""
        segment_duration = 30 * 60  # 30 minutes in seconds
        current_segment_start = time.time()
        
        while self.recording:
            current_time = time.time()
            elapsed = current_time - current_segment_start
            
            # Save a segment every 30 minutes
            if elapsed >= segment_duration and self.frames:
                self._save_current_segment()
                current_segment_start = time.time()
                
            # Sleep for a bit to avoid hogging CPU
            time.sleep(10)
            
    def _save_current_segment(self):
        """Save current audio frames to a file"""
        if not self.frames:
            return
            
        # Create session directory
        current_date = datetime.datetime.now().strftime("%Y-%m-%d")
        session_dir = os.path.join(self.output_dir, current_date)
        if not os.path.exists(session_dir):
            os.makedirs(session_dir)
            
        # Save the recording
        timestamp = datetime.datetime.now().strftime("%H-%M-%S")
        filename = os.path.join(session_dir, f"sleep_{timestamp}.wav")
        
        # Combine all frames
        try:
            audio_data = np.concatenate(self.frames, axis=0)
            wav.write(filename, self.rate, audio_data)
            print(f"Saved recording to {filename}")
            
            # Analyze the segment
            self._analyze_segment(filename, audio_data)
            
            # Clear frames list to start a new segment
            self.frames = []
            
        except Exception as e:
            print(f"Error saving audio: {e}")
            
    def _final_save(self):
        """Save any remaining frames when stopping recording"""
        if self.frames:
            self._save_current_segment()
            
    def _analyze_segment(self, filename, audio_data):
        """Analyze the audio segment for interesting sleep events"""
        try:
            # Create analysis file
            analysis_filename = filename.replace('.wav', '_analysis.txt')
            
            # Find sound events
            sound_events = []
            chunk_size = int(self.rate)  # 1-second chunks
            
            for i in range(0, len(audio_data), chunk_size):
                chunk = audio_data[i:i+chunk_size]
                if len(chunk) > 0:
                    rms = np.sqrt(np.mean(np.square(chunk)))
                    
                    # If we have calibrated noise level and this is significantly above it
                    if self.background_noise_level and rms > self.background_noise_level * 1.5:
                        time_point = i / self.rate
                        sound_events.append((time_point, rms))
                        
            # Generate a simple spectrogram for visual analysis
            fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8))
            
            # Plot waveform
            ax1.plot(np.arange(len(audio_data)) / self.rate, audio_data)
            ax1.set_title('Waveform')
            ax1.set_xlabel('Time (s)')
            ax1.set_ylabel('Amplitude')
            
            # Plot spectrogram
            f, t, Sxx = signal.spectrogram(audio_data.flatten(), self.rate)
            ax2.pcolormesh(t, f, 10 * np.log10(Sxx), shading='gouraud')
            ax2.set_title('Spectrogram')
            ax2.set_ylabel('Frequency (Hz)')
            ax2.set_xlabel('Time (s)')
            
            # Save the figure
            plt.tight_layout()
            plt.savefig(filename.replace('.wav', '_analysis.png'))
            plt.close()
            
            # Write text analysis
            with open(analysis_filename, 'w') as f:
                f.write(f"Sleep Sound Analysis for {filename}\n")
                f.write(f"Recorded at: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
                
                duration = len(audio_data) / self.rate
                f.write(f"Total recording duration: {duration:.2f} seconds\n")
                
                if self.background_noise_level:
                    f.write(f"Background noise level: {self.background_noise_level:.6f}\n")
                
                if sound_events:
                    f.write(f"Detected {len(sound_events)} sound events\n\n")
                    f.write("Sound events (timestamp in seconds):\n")
                    for i, (time_point, level) in enumerate(sound_events):
                        f.write(f"  Event {i+1}: at {time_point:.2f}s (level: {level:.6f})\n")
                else:
                    f.write("No significant sound events detected.\n")
                    
            print(f"Analysis saved to {analysis_filename}")
                
        except Exception as e:
            print(f"Error analyzing audio: {e}")

# Create an instance of the sleep monitor
monitor = SleepMonitor()

# Create device selection dropdown if devices are available
device_dropdown = None
if hasattr(monitor, 'devices') and monitor.devices:
    device_options = [(f"{i}: {d['name']}", i) for i, d in enumerate(monitor.devices) 
                     if d['max_input_channels'] > 0]
    
    if device_options:
        device_dropdown = widgets.Dropdown(
            options=device_options,
            value=monitor.device if monitor.device is not None else device_options[0][1],
            description='Input device:',
            disabled=False,
        )
        
        def on_device_change(change):
            monitor.set_device(change['new'])
            
        device_dropdown.observe(on_device_change, names='value')

# Create widgets for controlling the monitor
start_button = widgets.Button(description="Start Recording")
stop_button = widgets.Button(description="Stop Recording")
status_output = widgets.Output()

# Define button click handlers
def on_start_button_clicked(b):
    with status_output:
        monitor.start_recording()
        
def on_stop_button_clicked(b):
    with status_output:
        monitor.stop_recording()

# Connect handlers to buttons
start_button.on_click(on_start_button_clicked)
stop_button.on_click(on_stop_button_clicked)

# Display the UI elements
print("Sleep Monitor - Select your input device if needed, then use buttons to control recording")
if device_dropdown:
    display(device_dropdown)
display(widgets.HBox([start_button, stop_button]))
display(status_output)

Available audio devices:
0: MacBook Pro Microphone (inputs: 1)
1: MacBook Pro Speakers (inputs: 0)
2: Microsoft Teams Audio (inputs: 1)
Selected input device: MacBook Pro Microphone
Sleep Monitor - Select your input device if needed, then use buttons to control recording


Dropdown(description='Input device:', options=(('0: MacBook Pro Microphone', 0), ('2: Microsoft Teams Audio', …

HBox(children=(Button(description='Start Recording', style=ButtonStyle()), Button(description='Stop Recording'…

Output()