In [None]:
from pynq import Overlay, allocate

overlay = Overlay('/home/xilinx/jupyter_notebooks/ws/pynq-caht/base3.bit') # Replace with your own path

In [2]:
audio = overlay.audio_direct_0

In [3]:
print(hex(audio.mmio.base_addr))

0x40000000


In [4]:
audio.record(20)

time: 19.970481872558594


In [5]:
audio.save("rec1.wav")

In [6]:
audio.sample_rate

19528.82271388245

To convert the WAV file back into PDM format, use the following implementations. 
The second implementation uses Numba, which speeds up the upsampling. 
However, you might need to connect your PYNQ board to the internet to install Numba first before being able to run the code on board.

In [None]:
import numpy as np
from scipy import signal
from scipy.io import wavfile
import wave
import time

In [None]:
''' Numpy/Scipy Implementation '''

def delta_sigma_chunked(upsampled):
    """Optimized delta-sigma without Numba."""
    pdm = np.zeros(len(upsampled), dtype=np.uint8)
    error = np.float64(0.0)

    # Process in smaller chunks for progress feedback
    chunk_size = 1000000
    total = len(upsampled)

    for start in range(0, total, chunk_size):
        end = min(start + chunk_size, total)
        chunk = upsampled[start:end]

        for i, val in enumerate(chunk):
            if val > error:
                pdm[start + i] = 1
                error = error + 1.0 - val
            else:
                error = error - val

        print(f"  {100*end//total}% done...")

    return pdm

def pcm_to_pdm(pcm_samples, pcm_rate, pdm_rate=3072000):
    """Convert PCM to PDM."""
    pcm = pcm_samples.astype(np.float64)
    if pcm_samples.dtype == np.int16:
        pcm = pcm / 32768.0
    pcm = (pcm - pcm.min()) / (pcm.max() - pcm.min() + 1e-10)

    ratio = pdm_rate // pcm_rate
    print(f"Upsampling by {ratio}x...")
    upsampled = signal.resample_poly(pcm, ratio, 1)

    print(f"Delta-sigma on {len(upsampled):,} samples...")
    pdm = delta_sigma_chunked(upsampled)

    return pdm

def save_pdm(pdm_bits, filepath, pdm_rate=3072000):
    """Save PDM as WAV file."""
    pad = (16 - len(pdm_bits) % 16) % 16
    if pad:
        pdm_bits = np.concatenate([pdm_bits, np.zeros(pad, dtype=np.uint8)])

    reshaped = pdm_bits.reshape(-1, 16)
    packed = np.zeros(len(reshaped), dtype=np.uint16)
    for i in range(16):
        packed |= reshaped[:, i].astype(np.uint16) << i

    with wave.open(filepath, 'wb') as wav:
        wav.setnchannels(1)
        wav.setsampwidth(2)
        wav.setframerate(pdm_rate // 16)
        wav.writeframes(packed.tobytes())

    print(f"Saved: {filepath}")

In [None]:
### Example use:
rate, pcm = wavfile.read("recording.wav") # Replace with your wav file name
if len(pcm.shape) > 1:
    pcm = pcm.mean(axis=1).astype(pcm.dtype)

start = time.time()
pdm = pcm_to_pdm(pcm, rate)
print(f"Took {time.time()-start:.1f}s")
save_pdm(pdm, "recording.pdm")

In [None]:
from numba import jit

In [None]:
''' Numba Implementation '''

@jit(nopython=True)
def delta_sigma_numba(upsampled):
    """Fast delta-sigma modulation with Numba."""
    pdm = np.zeros(len(upsampled), dtype=np.uint8)
    error = 0.0
    for i in range(len(upsampled)):
        if upsampled[i] > error:
            pdm[i] = 1
            error = error + 1.0 - upsampled[i]
        else:
            error = error - upsampled[i]
    return pdm

def pcm_to_pdm(pcm_samples, pcm_rate, pdm_rate=3072000):
    """Convert PCM audio to PDM format for PYNQ playback."""
    pcm = pcm_samples.astype(np.float64)
    if pcm_samples.dtype == np.int16:
        pcm = pcm / 32768.0
    pcm = (pcm - pcm.min()) / (pcm.max() - pcm.min() + 1e-10)

    ratio = pdm_rate // pcm_rate
    print(f"Upsampling by {ratio}x...")
    upsampled = signal.resample_poly(pcm, ratio, 1)

    print(f"Delta-sigma modulation on {len(upsampled):,} samples...")
    pdm = delta_sigma_numba(upsampled)

    return pdm

def save_pdm(pdm_bits, filepath, pdm_rate=3072000):
    """Save PDM as WAV file (PYNQ .pdm format)."""
    pad = (16 - len(pdm_bits) % 16) % 16
    if pad:
        pdm_bits = np.concatenate([pdm_bits, np.zeros(pad, dtype=np.uint8)])

    reshaped = pdm_bits.reshape(-1, 16)
    packed = np.zeros(len(reshaped), dtype=np.uint16)
    for i in range(16):
        packed |= reshaped[:, i].astype(np.uint16) << i

    with wave.open(filepath, 'wb') as wav:
        wav.setnchannels(1)
        wav.setsampwidth(2)
        wav.setframerate(pdm_rate // 16)
        wav.writeframes(packed.tobytes())

    print(f"Saved: {filepath}")

In [None]:
### Example use:
rate, pcm = wavfile.read("recording.wav") # Replace with your wav file name
if len(pcm.shape) > 1:
    pcm = pcm.mean(axis=1).astype(pcm.dtype)

print(f"PCM: {len(pcm):,} samples at {rate}Hz")

start = time.time()
pdm = pcm_to_pdm(pcm, rate)
elapsed = time.time() - start

print(f"Conversion took {elapsed:.2f} seconds")
save_pdm(pdm, "recording.pdm")