# 🔇 Advanced Noise Cancellation & Safe Audio System

## Overview
This notebook demonstrates various noise cancellation techniques and implements a **Safe Audio Environment System** that keeps audio levels below **70 dB** for hearing protection.

### 🎯 Key Features:
- **Real-time noise reduction** using spectral subtraction
- **Adaptive volume control** to maintain safe dB levels
- **Environmental noise learning** for personalized filtering
- **Frequency analysis** and visualization
- **Hearing protection** with automatic limiting

### 📊 Safety Standard:
> **70 dB Threshold**: Sounds at or below 70 dB are generally considered safe for prolonged exposure without risk of hearing damage.

Let's explore different approaches to noise cancellation and audio safety!

In [None]:
# Install required packages for advanced noise cancellation
%pip install pyaudio numpy matplotlib scipy soundfile librosa pydub audioop-lts

import numpy as np
import matplotlib.pyplot as plt
import pyaudio
from scipy import signal
from scipy.fft import fft, ifft
import soundfile as sf
import librosa
from pydub import AudioSegment
import time
import threading
from collections import deque

print("✅ All packages installed and imported successfully!")
print("🔧 Setting up Safe Audio Environment System...")

In [None]:
# 📊 Audio Safety and dB Level Functions

def calculate_db_level(audio_data):
    """Calculate dB level of audio data"""
    if len(audio_data) == 0:
        return -np.inf
    
    # Calculate RMS (Root Mean Square)
    rms = np.sqrt(np.mean(audio_data ** 2))
    
    # Convert to dB relative to full scale
    if rms > 0:
        db_level = 20 * np.log10(rms)
        # Rough calibration to approximate real-world dB levels
        return db_level + 94  # Calibration offset
    return -np.inf

def is_safe_level(db_level, threshold=70.0):
    """Check if audio level is safe for prolonged exposure"""
    return db_level <= threshold

def apply_safety_limiting(audio_data, max_db=70.0):
    """Apply safety limiting to keep audio below specified dB level"""
    current_db = calculate_db_level(audio_data)
    
    if current_db > max_db:
        # Calculate reduction factor needed
        reduction_db = current_db - max_db
        reduction_factor = 10 ** (-reduction_db / 20.0)
        
        # Apply smooth reduction
        audio_data = audio_data * reduction_factor
        
        # Apply soft limiting to prevent clipping
        audio_data = np.tanh(audio_data * 0.8) * 0.8
        
    return audio_data, calculate_db_level(audio_data)

# Test with sample data
print("🧪 Testing safety functions...")
test_audio = np.random.normal(0, 0.3, 44100)  # 1 second of noise
original_db = calculate_db_level(test_audio)
safe_audio, safe_db = apply_safety_limiting(test_audio, max_db=70.0)

print(f"Original level: {original_db:.1f} dB")
print(f"After safety limiting: {safe_db:.1f} dB")
print(f"Safe for prolonged exposure: {is_safe_level(safe_db)}")

# Visualize the difference
plt.figure(figsize=(12, 6))

plt.subplot(2, 1, 1)
plt.plot(test_audio[:1000])
plt.title(f'Original Audio (Level: {original_db:.1f} dB)')
plt.ylabel('Amplitude')

plt.subplot(2, 1, 2)
plt.plot(safe_audio[:1000])
plt.title(f'Safety Limited Audio (Level: {safe_db:.1f} dB)')
plt.ylabel('Amplitude')
plt.xlabel('Sample')

plt.tight_layout()
plt.show()

In [3]:
%pip install sounddevice

import numpy as np
import sounddevice as sd

Defaulting to user installation because normal site-packages is not writeable
Collecting sounddevice
  Using cached sounddevice-0.5.2-py3-none-win_amd64.whl.metadata (1.6 kB)
Using cached sounddevice-0.5.2-py3-none-win_amd64.whl (363 kB)
Installing collected packages: sounddevice
Successfully installed sounddevice-0.5.2
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.0.1 -> 25.1.1
[notice] To update, run: C:\Users\Dinesh\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.13_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


In [1]:
%pip install soundfile

import sounddevice as sd
import soundfile as sf
import numpy as np

# Step 1: Load your song
filename = 'song.mp3'  # Replace with your WAV/FLAC/OGG file
data, samplerate = sf.read(filename)
# If stereo, convert to mono for demonstration (optional)
if data.ndim > 1:
    data = np.mean(data, axis=1)

# Step 2: Invert the waveform
inverse_data = 0 * data

# Step 3: Play the inverted sound
print("Playing inverse sound...")
sd.play(data, samplerate)
sd.play(inverse_data, samplerate)
sd.wait()
print("Done.")



[notice] A new release of pip is available: 25.0.1 -> 25.1.1
[notice] To update, run: C:\Users\Dinesh\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.13_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.
Playing inverse sound...
Done.


In [1]:
# 🎧 Enhanced Audio Device Selection with GUI Controls

import tkinter as tk
from tkinter import ttk, messagebox
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk
import matplotlib.animation as animation
from matplotlib.widgets import Button
import pyaudio
import numpy as np
import threading
import queue

class AudioDeviceSelector:
    def __init__(self):
        self.audio = pyaudio.PyAudio()
        self.selected_input_device = None
        self.selected_output_device = None
        self.audio_processor = None
        
    def get_audio_devices(self):
        """Get all available audio input and output devices"""
        input_devices = []
        output_devices = []
        
        for i in range(self.audio.get_device_count()):
            try:
                device_info = self.audio.get_device_info_by_index(i)
                
                # Input devices (microphones)
                if device_info['maxInputChannels'] > 0:
                    device_type = "🎤 MICROPHONE"
                    if any(keyword in device_info['name'].lower() for keyword in 
                          ['desktop', 'realtek', 'amd audio', 'pc', 'array']):
                        device_type = "🖥️ DESKTOP MICROPHONE"
                    elif any(keyword in device_info['name'].lower() for keyword in 
                           ['headset', 'headphone', 'bluetooth']):
                        device_type = "🎧 HEADPHONE/HEADSET MIC"
                        
                    input_devices.append({
                        'index': i,
                        'name': device_info['name'],
                        'type': device_type,
                        'channels': device_info['maxInputChannels'],
                        'rate': device_info['defaultSampleRate']
                    })
                
                # Output devices (speakers/headphones)
                if device_info['maxOutputChannels'] > 0:
                    device_type = "🔊 SPEAKER"
                    if any(keyword in device_info['name'].lower() for keyword in 
                          ['headset', 'headphone', 'bluetooth']):
                        device_type = "🎧 HEADPHONES"
                    elif any(keyword in device_info['name'].lower() for keyword in 
                           ['speaker', 'realtek', 'amd audio']):
                        device_type = "🖥️ PC SPEAKERS"
                        
                    output_devices.append({
                        'index': i,
                        'name': device_info['name'],
                        'type': device_type,
                        'channels': device_info['maxOutputChannels'],
                        'rate': device_info['defaultSampleRate']
                    })
                    
            except Exception as e:
                continue
                
        return input_devices, output_devices
    
    def create_device_selection_gui(self):
        """Create GUI for device selection with integrated audio visualization"""
        self.root = tk.Tk()
        self.root.title("🎧 Audio Device & Real-time Visualizer")
        self.root.geometry("1200x800")
        
        # Get devices
        input_devices, output_devices = self.get_audio_devices()
        
        # Main frame
        main_frame = ttk.Frame(self.root)
        main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
        
        # Device selection frame
        device_frame = ttk.LabelFrame(main_frame, text="🎤🔊 Audio Device Selection", padding=10)
        device_frame.pack(fill=tk.X, pady=(0, 10))
        
        # Input device selection
        ttk.Label(device_frame, text="🎤 Select Microphone:", font=("Arial", 11, "bold")).grid(row=0, column=0, sticky="w", pady=5)
        self.input_var = tk.StringVar()
        input_combo = ttk.Combobox(device_frame, textvariable=self.input_var, width=60, state="readonly")
        input_combo.grid(row=0, column=1, padx=(10, 0), pady=5, sticky="ew")
        
        input_options = []
        for device in input_devices:
            option = f"[{device['index']}] {device['name']} - {device['type']}"
            input_options.append(option)
        input_combo['values'] = input_options
        
        # Pre-select PC microphone if available
        for i, device in enumerate(input_devices):
            if "🖥️ DESKTOP MICROPHONE" in device['type']:
                input_combo.current(i)
                break
        
        # Output device selection
        ttk.Label(device_frame, text="🔊 Select Output:", font=("Arial", 11, "bold")).grid(row=1, column=0, sticky="w", pady=5)
        self.output_var = tk.StringVar()
        output_combo = ttk.Combobox(device_frame, textvariable=self.output_var, width=60, state="readonly")
        output_combo.grid(row=1, column=1, padx=(10, 0), pady=5, sticky="ew")
        
        output_options = []
        for device in output_devices:
            option = f"[{device['index']}] {device['name']} - {device['type']}"
            output_options.append(option)
        output_combo['values'] = output_options
        
        # Pre-select headphones if available
        for i, device in enumerate(output_devices):
            if "🎧 HEADPHONES" in device['type']:
                output_combo.current(i)
                break
        
        device_frame.columnconfigure(1, weight=1)
        
        # Control buttons
        button_frame = ttk.Frame(device_frame)
        button_frame.grid(row=2, column=0, columnspan=2, pady=10)
        
        start_btn = ttk.Button(button_frame, text="🎵 Start Audio Processing", 
                              command=lambda: self.start_audio_processing(input_devices, output_devices))
        start_btn.pack(side=tk.LEFT, padx=5)
        
        stop_btn = ttk.Button(button_frame, text="⏹️ Stop", command=self.stop_audio_processing)
        stop_btn.pack(side=tk.LEFT, padx=5)
        
        # Matplotlib figure frame
        plot_frame = ttk.LabelFrame(main_frame, text="📊 Real-time Audio Visualization", padding=5)
        plot_frame.pack(fill=tk.BOTH, expand=True)
        
        # Create matplotlib figure
        self.fig, ((self.ax1, self.ax2), (self.ax3, self.ax4)) = plt.subplots(2, 2, figsize=(12, 8))
        self.fig.suptitle('Real-time Audio Processing & Visualization', fontsize=14)
        
        # Configure subplots
        self.ax1.set_title('🎤 Input Waveform')
        self.ax1.set_ylim(-1, 1)
        self.ax1.grid(True, alpha=0.3)
        
        self.ax2.set_title('🔊 Output Waveform')
        self.ax2.set_ylim(-1, 1)
        self.ax2.grid(True, alpha=0.3)
        
        self.ax3.set_title('📈 Frequency Spectrum')
        self.ax3.set_xlabel('Frequency (Hz)')
        self.ax3.set_ylabel('Magnitude')
        self.ax3.grid(True, alpha=0.3)
        
        self.ax4.set_title('🛡️ Safety Monitor')
        self.ax4.set_xlabel('Time')
        self.ax4.set_ylabel('dB Level')
        self.ax4.axhline(y=70, color='red', linestyle='--', label='70 dB Safety Limit')
        self.ax4.legend()
        self.ax4.grid(True, alpha=0.3)
        
        # Embed matplotlib in tkinter
        self.canvas = FigureCanvasTkAgg(self.fig, plot_frame)
        self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
        
        # Initialize plot lines
        self.line_input, = self.ax1.plot([], [], 'b-', linewidth=1)
        self.line_output, = self.ax2.plot([], [], 'g-', linewidth=1)
        self.line_freq, = self.ax3.plot([], [], 'r-', linewidth=1)
        self.line_db, = self.ax4.plot([], [], 'orange', linewidth=2)
        
        # Audio processing variables
        self.audio_queue = queue.Queue()
        self.db_history = []
        self.time_history = []
        self.processing = False
        
        # Status
        self.status_label = ttk.Label(main_frame, text="🔴 Ready - Select devices and click Start", 
                                     font=("Arial", 10))
        self.status_label.pack(pady=5)
        
        return self.root, input_devices, output_devices
    
    def start_audio_processing(self, input_devices, output_devices):
        """Start real-time audio processing with selected devices"""
        # Get selected devices
        input_selection = self.input_var.get()
        output_selection = self.output_var.get()
        
        if not input_selection or not output_selection:
            messagebox.showerror("Error", "Please select both input and output devices!")
            return
        
        # Extract device indices
        input_index = int(input_selection.split(']')[0].split('[')[1])
        output_index = int(output_selection.split(']')[0].split('[')[1])
        
        # Find selected device info
        selected_input = next(d for d in input_devices if d['index'] == input_index)
        selected_output = next(d for d in output_devices if d['index'] == output_index)
        
        self.selected_input_device = selected_input
        self.selected_output_device = selected_output
        
        print(f"🎤 Input: {selected_input['name']}")
        print(f"🔊 Output: {selected_output['name']}")
        
        # Start audio processing thread
        self.processing = True
        self.audio_thread = threading.Thread(target=self.audio_processing_loop)
        self.audio_thread.daemon = True
        self.audio_thread.start()
        
        # Start visualization
        self.animation = animation.FuncAnimation(self.fig, self.update_plots, 
                                               interval=50, blit=False)
        self.canvas.draw()
        
        self.status_label.config(text="🟢 Processing - Audio active with safety monitoring")
    
    def audio_processing_loop(self):
        """Main audio processing loop with safety features"""
        CHUNK = 1024
        FORMAT = pyaudio.paInt16
        CHANNELS = 1
        RATE = 44100
        
        try:
            # Input stream (microphone)
            input_stream = self.audio.open(
                format=FORMAT,
                channels=CHANNELS,
                rate=RATE,
                input=True,
                input_device_index=self.selected_input_device['index'],
                frames_per_buffer=CHUNK
            )
            
            # Output stream (headphones/speakers)
            output_stream = self.audio.open(
                format=FORMAT,
                channels=CHANNELS,
                rate=RATE,
                output=True,
                output_device_index=self.selected_output_device['index'],
                frames_per_buffer=CHUNK
            )
            
            print("🎵 Audio streams started successfully!")
            
            while self.processing:
                # Read from microphone
                input_data = input_stream.read(CHUNK, exception_on_overflow=False)
                audio_input = np.frombuffer(input_data, dtype=np.int16) / 32768.0
                
                # Apply safety processing
                processed_audio = self.apply_safety_processing(audio_input)
                
                # Convert back to output format
                output_data = (processed_audio * 32767).astype(np.int16).tobytes()
                
                # Send to output device
                output_stream.write(output_data)
                
                # Queue data for visualization
                if not self.audio_queue.full():
                    self.audio_queue.put({
                        'input': audio_input,
                        'output': processed_audio,
                        'timestamp': len(self.time_history)
                    })
                
        except Exception as e:
            print(f"❌ Audio processing error: {e}")
            messagebox.showerror("Audio Error", f"Failed to start audio processing:\n{e}")
        finally:
            try:
                input_stream.stop_stream()
                input_stream.close()
                output_stream.stop_stream()
                output_stream.close()
            except:
                pass
    
    def apply_safety_processing(self, audio_data):
        """Apply safety processing to keep audio below 70 dB"""
        # Calculate current dB level
        rms = np.sqrt(np.mean(audio_data ** 2))
        if rms > 0:
            db_level = 20 * np.log10(rms) + 94  # Calibration offset
        else:
            db_level = -np.inf
        
        # Apply safety limiting
        max_db = 70.0
        if db_level > max_db:
            reduction_db = db_level - max_db
            reduction_factor = 10 ** (-reduction_db / 20.0)
            processed_audio = audio_data * reduction_factor
        else:
            processed_audio = audio_data.copy()
        
        # Store for monitoring
        self.db_history.append(db_level if db_level > -np.inf else 0)
        self.time_history.append(len(self.time_history))
        
        # Keep only recent history
        if len(self.db_history) > 500:
            self.db_history.pop(0)
            self.time_history.pop(0)
        
        return processed_audio
    
    def update_plots(self, frame):
        """Update visualization plots"""
        if not self.audio_queue.empty():
            try:
                data = self.audio_queue.get_nowait()
                
                # Update waveforms
                x_axis = np.arange(len(data['input']))
                
                self.line_input.set_data(x_axis, data['input'])
                self.ax1.set_xlim(0, len(data['input']))
                self.ax1.set_ylim(-1, 1)
                
                self.line_output.set_data(x_axis, data['output'])
                self.ax2.set_xlim(0, len(data['output']))
                self.ax2.set_ylim(-1, 1)
                
                # Update frequency spectrum
                windowed = data['input'] * np.hanning(len(data['input']))
                fft_data = np.fft.fft(windowed)
                magnitude = np.abs(fft_data[:len(fft_data)//2])
                freqs = np.fft.fftfreq(len(fft_data), 1/44100)[:len(fft_data)//2]
                
                self.line_freq.set_data(freqs, magnitude)
                self.ax3.set_xlim(0, 8000)  # Show up to 8kHz
                if magnitude.max() > 0:
                    self.ax3.set_ylim(0, magnitude.max() * 1.1)
                
            except queue.Empty:
                pass
        
        # Update dB monitoring
        if self.db_history:
            self.line_db.set_data(self.time_history, self.db_history)
            self.ax4.set_xlim(max(0, len(self.time_history) - 200), len(self.time_history))
            self.ax4.set_ylim(0, max(80, max(self.db_history[-50:]) + 10))
        
        return self.line_input, self.line_output, self.line_freq, self.line_db
    
    def stop_audio_processing(self):
        """Stop audio processing"""
        self.processing = False
        if hasattr(self, 'animation'):
            self.animation.event_source.stop()
        self.status_label.config(text="🔴 Stopped")
        print("⏹️ Audio processing stopped")
    
    def run(self):
        """Run the application"""
        root, input_devices, output_devices = self.create_device_selection_gui()
        
        def on_closing():
            self.stop_audio_processing()
            self.audio.terminate()
            root.destroy()
        
        root.protocol("WM_DELETE_WINDOW", on_closing)
        root.mainloop()

# Create and run the audio device selector
print("🎧 Initializing Enhanced Audio Device Selector...")
print("Features:")
print("  🎤 PC Microphone input")
print("  🔊 Headphone output") 
print("  📊 Real-time visualization")
print("  🛡️ 70 dB safety monitoring")
print("  🎚️ GUI device selection")

selector = AudioDeviceSelector()
selector.run()

🎧 Initializing Enhanced Audio Device Selector...
Features:
  🎤 PC Microphone input
  🔊 Headphone output
  📊 Real-time visualization
  🛡️ 70 dB safety monitoring
  🎚️ GUI device selection


  func(*args)
  func(*args)
  func(*args)
  func(*args)


🎤 Input: Microphone Array (AMD Audio Dev
🔊 Output: Primary Sound Driver


  self.animation = animation.FuncAnimation(self.fig, self.update_plots,
  self.canvas.draw()
  self.canvas.draw()
  self.canvas.draw()
  self.canvas.draw()


🎵 Audio streams started successfully!
⏹️ Audio processing stopped


: 