In [1]:
import time
import numpy as np
from scipy import signal
from pynq import Overlay, allocate
from rtlsdr import RtlSdr

ol = Overlay("./overlays/pynq_fm.bit")

In [2]:
# check IP contents in bitstream
print(ol.ip_dict.keys())

dma = ol.axi_dma_0

dict_keys(['axi_dma_0', 'processing_system7_0'])


In [3]:
len_in = 1024 * 1500
len_out = int(len_in / 40)

resize = 1 / 40

In [4]:
# make input buffer and output buffer for AXI DMA
input_buffer = allocate(shape=(len_in,), dtype=np.uint16)
output_buffer = allocate(shape=(len_out,), dtype=np.int16)

print(f"Input Buffer Address: {hex(input_buffer.device_address)}")
print(f"Output Buffer Address: {hex(output_buffer.device_address)}")

Input Buffer Address: 0x15900000
Output Buffer Address: 0x15860000


In [5]:
# connect and configure RTL-SDR
sdr = RtlSdr()

fc = 95900000
fs = 1920000
ppm = 60

sdr.sample_rate = fs
sdr.center_freq = fc
sdr.freq_correction = ppm
sdr.gain = "auto"

Found Rafael Micro R820T tuner
[R82XX] PLL not locked!


In [7]:
# increase usb memory buffer
!echo 0 > /sys/module/usbcore/parameters/usbfs_memory_mb

In [8]:
from IPython.display import Audio
import nest_asyncio
import asyncio

In [9]:
nest_asyncio.apply()

async def reader_task(raw_queue, total_iterations):
    """
    [Task 1] SDR에서 데이터를 읽어오는 역할 (Producer)
    """
    print("Task 1: Reader started")
    it = 0
    # SDR 스트리밍 시작
    async for data in sdr.stream(num_samples_or_bytes=len_in*2, format='bytes', loop=None):
        if it >= total_iterations:
            break
        
        # 데이터를 큐에 넣음 (큐가 꽉 차면 공간이 생길 때까지 여기서 대기함 - 배압 조절)
        await raw_queue.put(data)
        it += 1
    
    # 작업이 끝났음을 알리기 위해 None(Sentinel)을 보냄
    await raw_queue.put(None)
    print("Task 1: Reader finished")
    
    # 스트림 종료 처리
    await sdr.stop()
    sdr.close() # 필요 시 주석 해제 (보통 컨텍스트 매니저나 외부에서 관리)
    
async def processor_task(raw_queue, audio_queue):
    """
    [Task 2] 데이터를 가공하고 FPGA로 보내는 역할 (Processor)
    """
    print("Task 2: Processor started")
    flag = 0
    
    while True:
        # 1. Raw Queue에서 데이터 가져오기
        data = await raw_queue.get()
        
        # 종료 신호(None) 확인
        if data is None:
            await audio_queue.put(None) # 다음 단계로 종료 신호 전달
            raw_queue.task_done()
            break
            
        # --- CPU 연산 구간 (Data Conversion) ---
        raw_uint8 = np.frombuffer(data, dtype=np.uint8)
        raw_int8 = (raw_uint8.astype(np.int16) - 128).astype(np.int8)
        packed_samples = raw_int8.view(np.uint16)
        
        # FPGA 버퍼로 복사
        np.copyto(input_buffer, packed_samples)
        
        # --- FPGA DMA 전송 (Critical Section) ---
        # DMA는 하드웨어 자원이므로 순차적으로 접근해야 합니다.
        if flag == 1:
            # 이 wait()는 CPU를 블로킹할 수 있습니다. 
            # 만약 PYNQ가 비동기 wait를 지원하지 않는다면 여기서 잠시 멈춥니다.
            dma.sendchannel.wait()
            dma.recvchannel.wait()
        else:
            flag = 1
            
        dma.sendchannel.transfer(input_buffer)
        dma.recvchannel.transfer(output_buffer)
        
        # 결과 데이터 가공
        processed_audio = output_buffer.astype(np.float32) / 32768.0
        
        # 결과를 Audio Queue로 전달
        # 주의: numpy array는 참조이므로 복사본을 넘겨야 안전할 수 있습니다.
        await audio_queue.put(processed_audio.copy())
        
        raw_queue.task_done()
        
    print("Task 2: Processor finished")
    
async def player_task(audio_queue):
    """
    [Task 3] 오디오를 재생하는 역할 (Consumer)
    """
    print("Task 3: Player started")
    
    while True:
        # 2. Audio Queue에서 데이터 가져오기
        audio_data = await audio_queue.get()
        
        if audio_data is None:
            audio_queue.task_done()
            break
            
        # 오디오 재생 (I/O Bound 작업)
        display(Audio(audio_data, autoplay=True, rate=48000, normalize=False))
        
        audio_queue.task_done()
        
    print("Task 3: Player finished")
    
async def main_pipeline():
    # 큐 생성 (maxsize를 설정하여 메모리 폭주 방지 및 흐름 제어)
    # maxsize=2 정도면 더블 버퍼링과 유사한 효과를 냅니다.
    raw_queue = asyncio.Queue(maxsize=3)
    audio_queue = asyncio.Queue(maxsize=3)
    
    ITERATION_NUM = 50
    
    # 3개의 태스크를 동시에 스케줄링
    task1 = asyncio.create_task(reader_task(raw_queue, ITERATION_NUM))
    task2 = asyncio.create_task(processor_task(raw_queue, audio_queue))
    task3 = asyncio.create_task(player_task(audio_queue))
    
    # 모든 작업이 끝날 때까지 대기
    await asyncio.gather(task1, task2, task3)
    print("All tasks completed.")
    
try:
    asyncio.run(main_pipeline())
except Exception as e:
    print(f"An error occurred: {e}")

Task 1: Reader started
Task 2: Processor started
Task 3: Player started


Allocating 15 zero-copy buffers


Task 1: Reader finished
Task 2: Processor finished
Task 3: Player finished
All tasks completed.


In [10]:
# delete input buffer and output buffer
del input_buffer, output_buffer