In [1]:
import pyaudio

audio = pyaudio.PyAudio()

# List all available devices
print("Available audio devices:")
for i in range(audio.get_device_count()):
    device_info = audio.get_device_info_by_index(i)
    is_input = device_info['maxInputChannels'] > 0
    is_output = device_info['maxOutputChannels'] > 0
    device_type = []
    if "MAYA" not in device_info['name']:
        continue
    if is_input:
        device_type.append("Input: " + str(device_info['maxInputChannels']))
    if is_output:
        device_type.append("Output: " + str(device_info['maxOutputChannels']))
    device_type_str = ", ".join(device_type)
    print(f"Index {i}: {device_info['name']} - {device_type_str}")


Available audio devices:
Index 3: MAYA44USB Ch12 (ESI Audio Devic - Input: 4
Index 4: MAYA44USB Ch34 (ESI Audio Devic - Input: 4
Index 5: MAYA44USB Multichannel 4 (ESI A - Input: 4
Index 10: MAYA44USB Ch34 (ESI Audio Devic - Output: 4
Index 12: MAYA44USB Ch12 (ESI Audio Devic - Output: 4
Index 16: MAYA44USB Ch12 (ESI Audio Device (WDM) - MAYA44USB) - Input: 4
Index 17: MAYA44USB Ch34 (ESI Audio Device (WDM) - MAYA44USB) - Input: 4
Index 18: MAYA44USB Multichannel 4 (ESI Audio Device (WDM) - MAYA44USB) - Input: 4
Index 22: Speakers (ESI Audio Device (WDM) - MAYA44USB) - Output: 4
Index 23: MAYA44USB Ch34 (ESI Audio Device (WDM) - MAYA44USB) - Output: 4
Index 25: MAYA44USB Ch12 (ESI Audio Device (WDM) - MAYA44USB) - Output: 4
Index 27: Speakers (ESI Audio Device (WDM) - MAYA44USB) - Output: 4
Index 28: MAYA44USB Ch34 (ESI Audio Device (WDM) - MAYA44USB) - Output: 2
Index 31: MAYA44USB Ch12 (ESI Audio Device (WDM) - MAYA44USB) - Output: 2
Index 33: MAYA44USB Ch12 (ESI Audio Device (WDM) -

In [8]:
import pyaudio
import wave
import os
import numpy as np

# Recording parameters
RECORD_SECONDS = 5
FRAMES_PER_BUFFER = 1024
FORMAT = pyaudio.paFloat32
TOTAL_CHANNELS = 4  # Total channels in the device
TARGET_CHANNEL = 0  # Index of the single channel to extract (0-based)
SR = 48000

# Initialize PyAudio
audio = pyaudio.PyAudio()

# Identify input device
input_device_index = None
for i in range(audio.get_device_count()):
    device_info = audio.get_device_info_by_index(i)
    if "MAYA44USB Multichannel 4" in device_info['name'] and device_info['maxInputChannels'] == 4:
        input_device_index = i
        break

if input_device_index is None:
    raise ValueError("Multichannel input device not found")

print(f"Selected input device: {audio.get_device_info_by_index(input_device_index)['name']} (Index: {input_device_index})")

# Configure the input stream
stream = audio.open(
    format=FORMAT,
    channels=TOTAL_CHANNELS,  # Record all channels
    rate=SR,
    input=True,
    input_device_index=input_device_index,
    frames_per_buffer=FRAMES_PER_BUFFER
)

# Record audio
print("Recording...")
recorded_frames = []
for _ in range(0, int(SR / FRAMES_PER_BUFFER * RECORD_SECONDS)):
    data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
    # Convert raw bytes to numpy array and extract the target channel
    audio_data = np.frombuffer(data, dtype=np.float32)
    single_channel_data = audio_data[TARGET_CHANNEL::TOTAL_CHANNELS]  # Extract only the target channel
    recorded_frames.append(single_channel_data)

# Stop and close stream
stream.stop_stream()
stream.close()
audio.terminate()

# Combine all frames for the single channel
recorded_audio = np.concatenate(recorded_frames)

# Save the single-channel audio to a WAV file
BASE_DIR = "."
recorded_file = os.path.join(BASE_DIR, "single_channel_audio.wav")
wf = wave.open(recorded_file, 'wb')
wf.setnchannels(1)  # Single channel
wf.setsampwidth(audio.get_sample_size(FORMAT))
wf.setframerate(SR)
wf.writeframes(recorded_audio.tobytes())
wf.close()

print(f"Single channel audio saved to {recorded_file}")


Selected input device: MAYA44USB Multichannel 4 (ESI A (Index: 5)
Recording...
Single channel audio saved to .\single_channel_audio.wav


In [4]:
# Play the recorded audio
print("Playing back recorded audio...")
wf = wave.open(recorded_file, 'rb')

# Open a playback stream
playback_stream = audio.open(
    format=audio.get_format_from_width(wf.getsampwidth()),
    channels=wf.getnchannels(),
    rate=wf.getframerate(),
    output=True
)

# Read and play back audio frames
data = wf.readframes(FRAMES_PER_BUFFER)
while data:
    playback_stream.write(data)
    data = wf.readframes(FRAMES_PER_BUFFER)

playback_stream.stop_stream()
playback_stream.close()
print("Playback finished.")


Playing back recorded audio...
Playback finished.


In [11]:
import time
import numpy as np
import pyaudio

# Identify input and output devices
audio = pyaudio.PyAudio()
input_device_index = None
output_device_index = None

for i in range(audio.get_device_count()):
    device_info = audio.get_device_info_by_index(i)
    if (not input_device_index and "MAYA44USB" in device_info['name'] and "Ch12" in device_info['name'] and
            device_info['maxInputChannels'] == 4):
        input_device_index = i
    if (not output_device_index and "MAYA44USB" in device_info['name'] and "Ch12" in device_info['name'] and
            device_info['maxOutputChannels'] == 4):
        output_device_index = i

if input_device_index is None or output_device_index is None:
    raise ValueError("Suitable input or output device not found")

# Parameters
SAMPLE_RATE = 48000  # Hz
DURATION = 2.0  # seconds
CHUNK_SIZE = 1024
FREQUENCY = 440  # Hz

# Generate a sine wave sound
t = np.linspace(0, DURATION, int(SAMPLE_RATE * DURATION), endpoint=False)
sound = (0.5 * np.sin(2 * np.pi * FREQUENCY * t)).astype(np.float32)

# Initialize PyAudio streams with specific devices
stream_out = audio.open(format=pyaudio.paFloat32,
                        channels=1,
                        rate=SAMPLE_RATE,
                        output=True,
                        output_device_index=output_device_index)
stream_in = audio.open(format=pyaudio.paFloat32,
                       channels=1,
                       rate=SAMPLE_RATE,
                       input=True,
                       frames_per_buffer=CHUNK_SIZE,
                       input_device_index=input_device_index)

# Measure delay
def measure_delay():
    global sound

    # Play sound and record simultaneously
    print("Starting measurement...")
    stream_in.start_stream()
    stream_out.start_stream()

    start_time = time.time()  # Timestamp for playback
    stream_out.write(sound.tobytes())
    recorded_frames = []

    for _ in range(0, int(SAMPLE_RATE / CHUNK_SIZE * DURATION)):
        data = stream_in.read(CHUNK_SIZE, exception_on_overflow=False)
        recorded_frames.append(np.frombuffer(data, dtype=np.float32))

    end_time = time.time()  # Timestamp for recording completion
    print(f"Sound playback time: {end_time - start_time:.6f} seconds")

    # Combine recorded frames into a single array
    recorded_audio = np.hstack(recorded_frames)

    # Cross-correlation to find delay
    correlation = np.correlate(recorded_audio, sound, mode='full')
    delay_samples = np.argmax(correlation) - len(sound) + 1
    delay_seconds = delay_samples / SAMPLE_RATE

    print(f"Measured delay: {delay_seconds:.6f} seconds")

# Run the measurement
measure_delay()

# Close streams
stream_in.stop_stream()
stream_out.stop_stream()
stream_in.close()
stream_out.close()
audio.terminate()


Starting measurement...
Sound playback time: 3.886354 seconds
Measured delay: 0.004542 seconds


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import threading
import time

# Add this to your setup
def setup_live_plot():
    global fig, ax, line, live_data

    # Initialize live plot
    fig, ax = plt.subplots()
    live_data = np.zeros(len(sound))  # Placeholder for the live stream data
    line, = ax.plot(live_data, label="Recorded Signal")

    ax.set_ylim(-0.005, 0.005)  # Adjust according to expected signal amplitude
    ax.set_xlim(0, len(sound))  # Set x-axis to the signal length
    ax.legend()
    plt.subplots_adjust(bottom=0.2)

    # Add a listener for pressing Enter to execute the sweep
    fig.canvas.mpl_connect('key_press_event', on_key_enter)

    ani = FuncAnimation(fig, update_live_plot, interval=50)  # Update plot every 50 ms
    plt.show()

def update_live_plot(frame):
    global line, live_data
    line.set_ydata(live_data)  # Update plot with the latest live data
    return line,

def on_key_enter(event):
    if event.key == "enter":
        threading.Thread(target=play_and_record_live).start()

def play_and_record_live():
    global Ains, live_data

    def playback():
        stream.write(sound.tobytes())  # Play the sweep sound

    def recording():
        global Ains, live_data
        frames = []
        num_frames = int(len(sound) / CHUNK)
        for _ in range(num_frames):
            data = stream.read(CHUNK, exception_on_overflow=False)
            frame = np.frombuffer(data, dtype=np.float32)
            frames.append(frame)

            # Update live_data for real-time display
            live_data = np.hstack(frames)[-len(sound):]  # Keep only the most recent data

        Ains = np.hstack(frames)

        # Ensure Ains matches sound length exactly
        if len(Ains) > len(sound):
            Ains = Ains[:len(sound)]
        elif len(Ains) < len(sound):
            Ains = np.pad(Ains, (0, len(sound) - len(Ains)), 'constant')

    play_thread = threading.Thread(target=playback)
    record_thread = threading.Thread(target=recording)

    play_thread.start()
    record_thread.start()

    play_thread.join()
    record_thread.join()

# Add this line to main()
setup_live_plot()
