In [None]:
# Example 18: FM radio - signal as function of time
# SPDX-FileCopyrightText: Copyright (C) 2023 Andreas Naber <annappo@web.de>
# SPDX-License-Identifier: GPL-3.0-only

%matplotlib widget

import asyncio
from rtlsdr import RtlSdr
import matplotlib.pyplot as plt
import numpy as np
import time
from collections import deque
import linecache
import sys

# -------------- global variables -------------------        

MEAS_RUNNING = True     # when switched to False all processes are stopped

# configuration SDR
BIAS_TEE = False        # set to True with GPS antenna
SAMPLE_RATE = 2.048e6   
FREQ_CORRECTION = -1    # in ppm
CENTER_FREQ = 106.3e6   # FM in range 87.5e6 - 108e6
GAIN = 6                #'auto' or 0..49.6; tuner GAIN in dB

MSEC = 64               # time in ms between two plots 
NSMP = MSEC*2048        # number of samples per cycle      
NT = 2*2048             # number of points for time plot

NO_AVG = 50             # data number to average
BOX = np.ones(NO_AVG)/NO_AVG  # for low-pass filter (convolution)

MEAS_TIME = 3000.0      # in seconds

DATA_FILE = ''

# ------- Exception handling  -------------------

EXC = []

def printException():
    global EXC
    exc_type, exc_obj, tb = sys.exc_info()
    f = tb.tb_frame
    lineno = tb.tb_lineno
    filename = f.f_code.co_filename
    linecache.checkcache(filename)
    line = linecache.getline(filename, lineno, f.f_globals)
    EXC.append(('EXCEPTION IN ({}, LINE {} "{}"): {}'.\
                format(filename, lineno, line.strip(), exc_obj)))        
    

# -------- keyboard events ---------------

def onKeypress(event):
    global STATUS_MSG
    global MEAS_RUNNING
    sys.stdout.flush()
    if event.key in ['q','Q']:
        MEAS_RUNNING = False
        STATUS_MSG = 'q pressed'

# ------- data buffer -----------------

MAXBUFSIZE = 16                     
BUFFER = deque([],maxlen=MAXBUFSIZE)
NBUF = 0                            
BUFSKIP = 0


def pushToBuffer(data):
    global BUFFER
    global NBUF
    global BUFSKIP

    if NBUF >= MAXBUFSIZE:
        BUFFER.clear()
        NBUF = 0
        BUFSKIP += MAXBUFSIZE
        
    BUFFER.append(data)
    NBUF += 1
    
    
def pullFromBuffer():
    global BUFFER
    global NBUF
    global BUFSKIP

    try:
        data = BUFFER.popleft()
        NBUF -= 1
        skip = BUFSKIP
        BUFSKIP = 0
    except IndexError:
        data = []
        skip = 0
    
    return data,skip

# ------- process & plot data ---------


def plot(y):
    line1.set_ydata(y.real) 
    line2.set_ydata(y.imag) 
    fig.canvas.draw_idle()   
    

# ------- plot data -------------------


async def processData():   
    global MEAS_RUNNING
    try:
        while MEAS_RUNNING:            
            #t1 = time.perf_counter()
            if NBUF >= 1:
                y,skip = pullFromBuffer()
                y = np.convolve(y,BOX,'same')
                plot(y[:NT])
                if skip > 0:
                    raise ValueError('buffer overload')
            await asyncio.sleep(0.00001) 
    except:
        printException()
    finally:
        MEAS_RUNNING = False
        

# ----------- Streaming ----------------        

async def streamData():
    global MEAS_RUNNING
    start_time = time.time()
    end_time = start_time+MEAS_TIME
    try:
        with open(DATA_FILE,'rb') as f1:
            while MEAS_RUNNING:
                if NBUF < 1:
                    byteData = np.fromfile(f1,dtype=np.uint16,count=NSMP)
                    re,im = np.divmod(byteData,256)
                    samples = np.asarray(re+1j*im,dtype=np.complex64)/127.5\
                    - (1+1j)
                    if len(samples)>0:
                        pushToBuffer(samples)
                    else:
                        MEAS_RUNNING = False
                if time.time()>end_time:                
                    MEAS_RUNNING = False                                            
                await asyncio.sleep(0)
    except BaseException as err:
        printException()
    finally:
        MEAS_RUNNING = False        
        await task2
        

async def streamLive():
    global MEAS_RUNNING
    
    sdr = RtlSdr()
    sdr.set_bias_tee(BIAS_TEE)
    sdr.sample_rate = SAMPLE_RATE
    sdr.freq_correction = FREQ_CORRECTION    
    sdr.center_freq = CENTER_FREQ   
    sdr.gain = GAIN 
        
    start_time = time.time()
    end_time = start_time + MEAS_TIME
    try:
        async for samples in sdr.stream(num_samples_or_bytes=NSMP, 
                                        format='samples'):
            pushToBuffer(samples)
            if time.time()>end_time:                
                MEAS_RUNNING = False                    
            if not MEAS_RUNNING:                
                await sdr.stop()
                sdr.close()

    except:    
        printException()
    finally:
        MEAS_RUNNING = False


# --------------- prepare plot ------------------        
    
# initial values for time axis
xt = np.linspace(0.0, NT/SAMPLE_RATE, NT, endpoint=False)*1000.0  # in ms
yt = np.zeros(NT)
                
fig,ax = plt.subplots(figsize=(6,4))
fig.canvas.header_visible = False
fig.canvas.mpl_connect('key_press_event', onKeypress)   

# plot for amplitude as function of time
line1, = ax.plot(xt,yt,lw=.5,label='I')
line2, = ax.plot(xt,yt,lw=.5,label='Q')
ax.set_ylim(-0.5,0.5)
ax.set_xlabel('Time (ms)')
ax.set_ylabel('Amplitude (a.u.)')
ax.legend(loc='upper right')
plt.tight_layout()

print('Click on graph and press q to exit!')

plt.show()

# ------ Main ---------------------------

# start async tasks
loop = asyncio.get_event_loop()
task2 = loop.create_task(processData())
if DATA_FILE =='':
    task1 = loop.create_task(streamLive())
else:
    task1 = loop.create_task(streamData())    
    
