In [2]:
import sys
sys.path.append("../src")
import importlib
from IPython.display import Audio
import sounddevice as sd
import wave
import time
from scipy.signal import butter, filtfilt
import numpy as np
import matplotlib.pyplot as plt


In [2]:
import logic.utils.io_access as io
import logic.audio_systems.device_helpers as dh
import logic.audio_systems.audio_transformers as at
import logic.audio_systems.speech_audio_stream_observable as saso

In [3]:
# force reload imports
importlib.reload(dh)
importlib.reload(at)
importlib.reload(saso)


<module 'logic.audio_systems.speech_audio_stream_observable' from '../src/logic/audio_systems/speech_audio_stream_observable.py'>

In [4]:
device_index = dh.find_seed_device_index()
device_index

2

In [3]:
devices = sd.query_devices()
target_description = "seeed-2mic-voicecard"
for i, device in enumerate(devices):
    if device['name'].startswith(target_description):
        print(device)

{'name': 'seeed-2mic-voicecard: bcm2835-i2s-wm8960-hifi wm8960-hifi-0 (hw:2,0)', 'index': 2, 'hostapi': 0, 'max_input_channels': 0, 'max_output_channels': 2, 'default_low_input_latency': -1.0, 'default_low_output_latency': 0.005804988662131519, 'default_high_input_latency': -1.0, 'default_high_output_latency': 0.034829931972789115, 'default_samplerate': 44100.0}


In [9]:
input_channels_count = 2
output_channels_count = 1
sample_rate = 44100
record_seconds = 5
testfile_path = io.get_path('data', 'test_audio_pipeline.wav')

In [10]:
# Record audio internally for notebook
class AudioDataObserver:
    def __init__(self, duration):
        self.filename = testfile_path
        self.rate = sample_rate
        self.channels = output_channels_count
        self.duration = duration
        self.binary_audio_data = bytearray()
        self.frames = int(sample_rate * duration)
        self.frame_count = 0

    def on_received(self, audio_data):
        self.binary_audio_data.extend(audio_data)
    
    # this is binary int16 data
    def get_binary_audio_data(self):
        return self.binary_audio_data
    
    def clear_audio_data():
        self.binary_audio_data = bytearray()
        self.frame_count = 0

In [13]:
# record 5 seconds of audio
audio_stream_observable = saso.SpeechAudioStreamObservable()
audio_stream_observer = AudioDataObserver(record_seconds)
audio_stream_observable.add_observer(audio_stream_observer)

# Start the audio stream
try:
    print("Recording audio for 3 seconds...")
    time.sleep(record_seconds)
finally:
    print("Done recording.")
    audio_stream_observable.stop()
    
# Get recording binary
binary_recording_data = audio_stream_observer.get_binary_audio_data()

Recording audio for 3 seconds...
Done recording.


In [None]:
# Convert to numpy array waves
np_input = at.bytes_to_int16(binary_recording_data)

# Play the audio
display(Audio(np_input, rate=sample_rate))


In [None]:
# Visualize recoding
# Create the plots
fig, axs = plt.subplots(2, 1, figsize=(10, 6))

# Plot left channel
axs[0].plot(np_input)
axs[0].set_title('Left Channel')
axs[0].set_xlabel('Sample number')
axs[0].set_ylabel('Amplitude')

# Plot right channel
axs[1].plot(np_input)
axs[1].set_title('Right Channel')
axs[1].set_xlabel('Sample number')
axs[1].set_ylabel('Amplitude')

plt.tight_layout()
plt.show()

In [None]:
print("Min after processing:", np.min(np_input))
print("Max after processing:", np.max(np_input))
print("Input data type:", np_input.dtype)

