# Mic check

In [58]:
import sounddevice as sd


def get_default_input_device_info():
    default_input_device = sd.default.device[0]  # get the ID of the default input device
    device_info = sd.query_devices(default_input_device)
    print(device_info)

# Get the info of the default input device
get_default_input_device_info()

{'name': 'MacBook Air 마이크', 'index': 3, 'hostapi': 0, 'max_input_channels': 1, 'max_output_channels': 0, 'default_low_input_latency': 0.0336875, 'default_low_output_latency': 0.01, 'default_high_input_latency': 0.043020833333333335, 'default_high_output_latency': 0.1, 'default_samplerate': 48000.0}


In [59]:
import sounddevice as sd

def list_input_devices():
    devices = sd.query_devices()
    for i, device in enumerate(devices):
        if device['max_input_channels'] > 0:  # this is an input device
            print(f"Device #{i} name: {device['name']}")

# List available input devices (including microphones)
list_input_devices()

Device #1 name: 갤럭시 S2 마이크
Device #2 name: USB PnP Sound Device
Device #3 name: MacBook Air 마이크


In [65]:
import numpy as np

# Choose the device to use for recording
device_id = 1  # replace with the ID of the device you want to use
duration = 3  # seconds

# Create a buffer to store the audio data
buffer = np.zeros((duration * 44100,))
buffer_index = 0

# Define a callback function to process the audio input
def audio_callback(indata, frames, time, status):
    global buffer_index
    volume_norm = np.linalg.norm(indata) * 10
    print(f'\r{"|" * int(volume_norm)}', end='')  # print a simple "volume bar"

    # Store the incoming data in the buffer
    buffer[buffer_index:buffer_index+frames] = indata[:, 0]
    buffer_index += frames

# Create a stream object
stream = sd.InputStream(callback=audio_callback, device=device_id, channels=1, samplerate=44100)

# Start the stream
with stream:
    # Record for 3 seconds
    sd.sleep(duration * 1000)

# Play back the recorded sound
sd.play(buffer, samplerate=44100)

||PaMacCore (AUHAL)|| Error on line 1322: err='-10851', msg=Audio Unit: Invalid Property Value


PortAudioError: Error opening InputStream: Internal PortAudio error [PaErrorCode -9986]

### Import models


In [None]:
import os
import torch
import torch.nn as nn
import torchvision.models as models
from torchvision.models import resnet18

def load_model_interactive(model_dir="models"):
    # List all models in the directory
    model_files = [f for f in os.listdir(model_dir) if f.endswith('.pth') and not f.startswith('._')]
    
    # Display the models to the user
    for idx, model_name in enumerate(model_files, 1):
        print(f"{idx}. {model_name}")
    
    # Get user input
    selected_idx = int(input("Enter the number corresponding to the model you wish to load: ")) - 1
    model_path = os.path.join(model_dir, model_files[selected_idx])
    model = None   
    try:
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            
            # Load the resnet18 model structure
            model = resnet18(pretrained=False)
            num_ftrs = model.fc.in_features
            model.fc = nn.Linear(512, 14)  # Assuming selected_labels is globally accessible
            
            # Load the state dict
            state_dict = torch.load(model_path, map_location=device)
            new_state_dict = {k.replace("module.", ""): v for k, v in state_dict.items()}
            model.load_state_dict(new_state_dict)
            
            print(f"Model loaded from {model_path}")
            model = model.to(device)
            model = model.eval()
    except Exception as e:
        print(f"Failed to load the model. Error: {e}")

    return model

# For demonstration purposes, the function will display model names but won't actually load them here.
# Please run this in your local environment for actual model loading.
model = load_model_interactive()

1. Model2_001.pth
2. Model2_002.pth
3. Model2_003.pth
4. Model2_004.pth
Model loaded from models/Model2_002.pth


In [None]:
class MinMaxNormalize(nn.Module):
    def __init__(self, min_val=None, max_val=None):
        super(MinMaxNormalize, self).__init__()
        self.min_val = min_val
        self.max_val = max_val

    def forward(self, tensor):
        if self.min_val is None or self.max_val is None:
            min_val = torch.min(tensor)
            max_val = torch.max(tensor)
        else:
            min_val = self.min_val
            max_val = self.max_val
        
        normalized_tensor = (tensor - min_val) / (max_val - min_val)
        return normalized_tensor

In [None]:
class MonoToColor(nn.Module):
    def __init__(self, num_channels=3):
        super(MonoToColor, self).__init__()
        self.num_channels = num_channels

    def forward(self, tensor):
        return tensor.repeat(self.num_channels, 1, 1)

In [None]:
from torchvision import transforms
import torchaudio

SAMPLE_RATE = 22050

# Apply the same transformation as used during training
transformation = transforms.Compose([
    torchaudio.transforms.MelSpectrogram(sample_rate=SAMPLE_RATE, n_mels=40),# higher the better but more complex. For talking we use 128, for sound effect, about 40.
    torchaudio.transforms.AmplitudeToDB(stype='power', top_db=80),
    MinMaxNormalize(),
    MonoToColor()
])

In [None]:
import torch.nn.functional as F

def continuous_sound_prediction(model, device, transformation, sample_rate, device_id):
    labels = [
        "children", "nothing2", "drilling", "engine", "siren", 
        "gunshot", "aircon", "jackhammer", "carhorn", "glass", 
        "nock", "street_music", "dog_bark", "nothing1"
    ]

    for count in range(101):
        try:
            # Recording
            duration = 2.0  # seconds
            recording = sd.rec(int(duration * sample_rate), samplerate=sample_rate, channels=1, device=device_id)
            sd.wait()
            
            # Preprocessing
            recording = torch.from_numpy(recording).float().transpose(0, 1)
            if recording.shape[0] > 1:
                recording = torch.mean(recording, dim=0, keepdim=True)
            recording = nn.functional.pad(recording, (0, sample_rate - recording.shape[1]))
            
            # Transformation
            recording = transformation(recording)
            
            # Prediction
            model.eval()
            with torch.no_grad():
                recording = recording.to(device)
                outputs = model(recording.unsqueeze(0))
                probabilities = F.softmax(outputs, dim=1)
                _, predicted = outputs.max(1)

            # Print results
            probs = [f"{label} {prob:.2%}" for label, prob in zip(labels, probabilities[0])]
            print(f"{count} / {' / '.join(probs)}")

        except Exception as e:
            print(f"Error during prediction: {e}")
            break

    print("Finished continuous sound prediction.")
device = 'cpu'
continuous_sound_prediction(model, device, transformation, SAMPLE_RATE, device_id)

0 / children 4.36% / nothing2 1.86% / drilling 2.88% / engine 8.71% / siren 8.70% / gunshot 2.26% / aircon 1.87% / jackhammer 44.62% / carhorn 4.14% / glass 5.80% / nock 2.53% / street_music 1.28% / dog_bark 1.98% / nothing1 9.00%
1 / children 21.78% / nothing2 0.18% / drilling 2.06% / engine 1.87% / siren 12.46% / gunshot 14.08% / aircon 0.38% / jackhammer 10.73% / carhorn 8.33% / glass 0.69% / nock 0.30% / street_music 7.43% / dog_bark 0.78% / nothing1 18.95%
2 / children 22.46% / nothing2 0.09% / drilling 1.61% / engine 1.43% / siren 11.98% / gunshot 13.36% / aircon 0.24% / jackhammer 12.06% / carhorn 7.46% / glass 0.57% / nock 0.18% / street_music 6.19% / dog_bark 0.54% / nothing1 21.83%
3 / children 20.44% / nothing2 0.13% / drilling 1.58% / engine 1.62% / siren 13.86% / gunshot 12.11% / aircon 0.34% / jackhammer 13.44% / carhorn 8.80% / glass 0.54% / nock 0.20% / street_music 5.98% / dog_bark 0.47% / nothing1 20.48%
4 / children 1.51% / nothing2 1.25% / drilling 1.24% / engine 5.

In [None]:
import pyaudio
import wave
import torch
import torchaudio
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms

# Modified record_audio function
def record_audio(input_device_index, sample_rate=44100, channels=1, duration=2):
    p = pyaudio.PyAudio()

    stream = p.open(format=pyaudio.paInt16,
                    channels=channels,
                    rate=sample_rate,
                    input=True,
                    input_device_index=input_device_index,
                    frames_per_buffer=1024)

    frames = []

    for _ in range(0, int(sample_rate / 1024 * duration)):
        data = stream.read(1024)
        frames.append(data)

    stream.stop_stream()
    stream.close()
    p.terminate()

    # Convert frames to numpy array
    byte_string = b''.join(frames)
    audio_array = np.frombuffer(byte_string, dtype=np.int16)
    return audio_array


# Additional transformation methods
def _right_pad_if_necessary(signal, target_sample_rate):
    length_signal = signal.shape[1]
    if length_signal < target_sample_rate:
        num_missing_samples = target_sample_rate - length_signal
        last_dim_padding = (0, num_missing_samples)
        signal = nn.functional.pad(signal, last_dim_padding)
    return signal

def _resample_if_necessary(signal, sr, target_sample_rate):
    if sr != target_sample_rate:
        resampler = torchaudio.transforms.Resample(sr, target_sample_rate)
        signal = resampler(signal)
    return signal

def _mix_down_if_necessary(signal):
    if signal.shape[0] > 1:
        signal = torch.mean(signal, dim=0, keepdim=True)
    return signal


# MonoToColor and MinMaxNormalize classes
class MonoToColor(nn.Module):
    def __init__(self, num_channels=3):
        super(MonoToColor, self).__init__()
        self.num_channels = num_channels

    def forward(self, tensor):
        return tensor.repeat(self.num_channels, 1, 1)

class MinMaxNormalize(nn.Module):
    def __init__(self, min_val=None, max_val=None):
        super(MinMaxNormalize, self).__init__()
        self.min_val = min_val
        self.max_val = max_val

    def forward(self, tensor):
        if self.min_val is None or self.max_val is None:
            min_val = torch.min(tensor)
            max_val = torch.max(tensor)
        else:
            min_val = self.min_val
            max_val = self.max_val
        
        normalized_tensor = (tensor - min_val) / (max_val - min_val)
        return normalized_tensor


# Apply the same transformation as used during training
SAMPLE_RATE = 22050
transformation = transforms.Compose([
    torchaudio.transforms.MelSpectrogram(sample_rate=SAMPLE_RATE, n_mels=40),
    torchaudio.transforms.AmplitudeToDB(stype='power', top_db=80),
    MinMaxNormalize(),
    MonoToColor()
])


# Modify continuous_sound_prediction function to use record_audio
def continuous_sound_prediction(model, device, transformation, sample_rate, device_id):
    labels = [
        "children", "nothing2", "drilling", "engine", "siren", 
        "gunshot", "aircon", "jackhammer", "carhorn", "glass", 
        "nock", "street_music", "dog_bark", "nothing1"
    ]

    for count in range(101):
        try:
            # Recording
            audio_data = record_audio(device_id, sample_rate=sample_rate)
            
            # Preprocessing
            audio_tensor = torch.from_numpy(audio_data).float().unsqueeze(0)
            audio_tensor = _right_pad_if_necessary(audio_tensor, SAMPLE_RATE)
            audio_tensor = _mix_down_if_necessary(audio_tensor)
            
            # Transformation
            transformed_audio = transformation(audio_tensor)
            
            # Prediction
            model.eval()
            with torch.no_grad():
                transformed_audio = transformed_audio.to(device)
                outputs = model(transformed_audio.unsqueeze(0))
                probabilities = F.softmax(outputs, dim=1)
                _, predicted = outputs.max(1)

            # Print results
            probs = [f"{label} {prob:.2%}" for label, prob in zip(labels, probabilities[0])]
            print(f"{count} / {' / '.join(probs)}")

        except Exception as e:
            print(f"Error during prediction: {e}")
            break

    print("Finished continuous sound prediction.")

device = 'cpu'
continuous_sound_prediction(model, device, transformation, SAMPLE_RATE, device_id)

||PaMacCore (AUHAL)|| Error on line 2523: err='-50', msg=Unknown Error


KeyboardInterrupt: 