Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change sdr module to allow capturing data in blocks #27

Merged
merged 16 commits into from
Feb 18, 2022
Merged
141 changes: 116 additions & 25 deletions ugradio_code/src/sdr.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,113 @@
'''This module uses the pyrtlsdr package (built on librtlsdr) to interface
to SDR dongles based on the RTL2832/R820T2 chipset.'''
"""This module uses the pyrtlsdr package (built on librtlsdr) to interface
to SDR dongles based on the RTL2832/R820T2 chipset."""

from __future__ import print_function
from rtlsdr import RtlSdr
import numpy as np
import logging
import functools
import asyncio
import signal

SAMPLE_RATE_TOLERANCE = 0.1 # Hz
BUFFER_SIZE = 4096

async def _streaming(sdr, nblocks, nsamples):
'''Asynchronously read nblocks of data from the sdr.'''
data = np.empty((nblocks, nsamples), dtype="complex64")
count = 0
async for samples in sdr.stream(num_samples_or_bytes=nsamples):
data[count] = samples
count += 1
if count >= nblocks:
break
try:
await sdr.stop()
except(AssertionError):
logging.warn(f'Only returning {count} blocks.')
return data[:count].copy()
return data

def handle_exception(loop, context, sdr):
'''Handle any exceptions that happen while in the asyncio loop.'''
msg = context.get("exception", context["message"])
logging.error(f"Caught exception: {msg}")
asyncio.create_task(shutdown(loop, sdr))

async def shutdown(loop, sdr, signal=None):
'''If an interrupt happens, shut down gracefully.'''
if signal:
logging.info(f"Received exit signal {signal.name}...")
tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()]
await sdr.stop()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()

def capture_data(
direct=True,
center_freq=1420e6,
nsamples=2048,
nblocks=1,
sample_rate=2.2e6,
gain=0.,
):
"""
Use the SDR dongle to capture voltage samples from the input. Note that
the analog system on these devices only lets through signals from 0.5 to
24 MHz.

There are two modes (corresponding to the value of direct):
direct = True: the direct sampling is enabled (no mixing), center_freq does
not matter and gain probably does not matter. Data returned is real.
direct = False: use the standard I/Q sampling, center_freq is the LO of the
mixer. Returns complex data.

Arguments:
direct (bool): which mode to use. Default: True.
center_freq (float): the center frequency in Hz of the downconverter
(LO of mixer). Ignored if direct == True. Default: 1420e6.
nsamples (int): number of samples to acquire. Default: 2048.
nblocks (int): number of blocks of samples to acquire. Default: 1.
sample_rate (float): sample rate in Hz. Default: 2.2e6.
gain (float): gain in dB to apply. Probably unnecessary when
direct == True. Default: 0.

Returns:
numpy.ndarray of type float64 (direct == True) or complex64
(direct == False). Shape is (nblocks, nsamples) when nblocks > 1 or
(nsamples,) when nblocks == 1.
"""
sdr = RtlSdr()
# Make a new event loop and set it as the default
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
if direct:
sdr.set_direct_sampling('q')
sdr.set_center_freq(0) # turn off the LO
else:
sdr.set_direct_sampling(0)
sdr.set_center_freq(center_freq)
sdr.set_gain(gain)
sdr.set_sample_rate(sample_rate)
_ = sdr.read_samples(BUFFER_SIZE) # clear the buffer
# Add signal handlers
for s in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
s, lambda: asyncio.create_task(shutdown(loop,sdr,signal=s)))
# splice sdr handle into handle_exception arguments
h = functools.partial(handle_exception, sdr=sdr)
loop.set_exception_handler(h)
data = loop.run_until_complete(_streaming(sdr, nblocks, nsamples))
finally:
sdr.close()
loop.close()

if direct:
return data.real
else:
return data

def capture_data_direct(nsamples=2048, sample_rate=2.2e6, gain=1.):
'''
Use the SDR dongle as an ADC to directly capture voltage samples from the
Expand All @@ -21,18 +121,13 @@ def capture_data_direct(nsamples=2048, sample_rate=2.2e6, gain=1.):
Returns:
numpy array (dtype float64) with dimensions (nsamples,)
'''
sdr = RtlSdr()
sdr.set_direct_sampling('q') # read from RF directly
sdr.set_center_freq(0) # essentially turn off the LO
sdr.set_sample_rate(sample_rate)
#assert abs(sample_rate - sdr.get_sample_rate()) < SAMPLE_RATE_TOLERANCE
sdr.set_gain(gain) # adjust input gain XXX does this matter?
#assert gain == sdr.get_gain()
_ = sdr.read_samples(BUFFER_SIZE) # clear the buffer
data = sdr.read_samples(nsamples)
data = data.real # only real values have meaning
data = capture_data(
direct=True,
nsamples=nsamples,
sample_rate=sample_rate,
gain=gain
)
return data


def capture_data_mixer(center_freq, nsamples=2048, sample_rate=2.2e6, gain=1.):
'''
Expand All @@ -49,15 +144,11 @@ def capture_data_mixer(center_freq, nsamples=2048, sample_rate=2.2e6, gain=1.):
Returns:
numpy array (dtype float64) with dimensions (nsamples,)
'''
sdr = RtlSdr()
sdr.set_direct_sampling(0) # standard I/Q sampling mode
sdr.set_center_freq(center_freq)
sdr.set_sample_rate(sample_rate)
#assert abs(sample_rate - sdr.get_sample_rate()) < SAMPLE_RATE_TOLERANCE
sdr.set_gain(gain)
#assert gain == sdr.get_gain()
_ = sdr.read_samples(BUFFER_SIZE) # clear the buffer
data = sdr.read_samples(nsamples)
#data = data.real # only real values have meaning
data = capture_data(
direct=False,
center_freq=center_freq,
nsamples=nsamples,
sample_rate=sample_rate,
gain=gain
)
return data