Skip to content

Commit

Permalink
Merge pull request #1695 from DSheirer/1694-sdrplay-rsp-tuner-memory-…
Browse files Browse the repository at this point in the history
…leaks

#1694 SDRPlay RSP Memory Leak & Audio Playback Delays
  • Loading branch information
DSheirer committed Nov 5, 2023
2 parents bf27812 + d3dc1a8 commit e1bc9ea
Show file tree
Hide file tree
Showing 13 changed files with 90 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public class ComplexPolyphaseChannelizerM2 extends AbstractComplexPolyphaseChann
*/
private static final int PROCESSED_CHANNEL_RESULTS_THRESHOLD = 1024;

//Sized to process 1/20th batch of 152 buffers a second, at 40 times per second
private IFFTProcessorDispatcher mIFFTProcessorDispatcher = new IFFTProcessorDispatcher(152 / 20, 25);
//Sized to process 40 times per second
private IFFTProcessorDispatcher mIFFTProcessorDispatcher = new IFFTProcessorDispatcher(25);
private FloatFFT_1D mFFT;
private float[] mInlineSamples;
private float[] mInlineFilter;
Expand Down Expand Up @@ -406,9 +406,9 @@ private void init(float[] coefficients)
*/
public class IFFTProcessorDispatcher extends Dispatcher<List<float[]>>
{
public IFFTProcessorDispatcher(int batchSize, long interval)
public IFFTProcessorDispatcher(long interval)
{
super("sdrtrunk polyphase ifft processor", batchSize, interval);
super("sdrtrunk polyphase ifft processor", interval);

//We create a listener interface to receive the batched channel results arrays from the scheduled thread pool
//dispatcher thread that is part of this continuous buffer processor. We perform an IFFT on each
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public PolyphaseChannelManager(INativeBufferProvider nativeBufferProvider, long
}

mChannelCalculator = new ChannelCalculator(sampleRate, channelCount, frequency, CHANNEL_OVERSAMPLING);
mBufferDispatcher = new Dispatcher("sdrtrunk polyphase buffer processor", 50, 10);
mBufferDispatcher = new Dispatcher("sdrtrunk polyphase buffer processor", 10);
mBufferDispatcher.setListener(mNativeBufferReceiver);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ public ChannelOutputProcessor(int inputChannelCount, double sampleRate, Heartbea
mInputChannelCount = inputChannelCount;
//Process 1/10th of the sample rate per second at a rate of 20 times a second (200% of anticipated rate)
mHeartbeatManager = heartbeatManager;
mChannelResultsDispatcher = new Dispatcher("sdrtrunk polyphase channel", (int)(sampleRate / 10),
50, mHeartbeatManager);
mChannelResultsDispatcher = new Dispatcher("sdrtrunk polyphase channel",50, mHeartbeatManager);
mChannelResultsDispatcher.setListener(floats -> {
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class BinaryRecorder extends Module implements IByteBufferListener
private final static Logger mLog = LoggerFactory.getLogger(BinaryRecorder.class);
private static final int MAX_RECORDING_BYTE_SIZE = 524288; //500 kB

private Dispatcher<ByteBuffer> mBufferProcessor = new Dispatcher<>("sdrtrunk binary recorder", 200, 250);
private Dispatcher<ByteBuffer> mBufferProcessor = new Dispatcher<>("sdrtrunk binary recorder", 250);
private AtomicBoolean mRunning = new AtomicBoolean();
private Path mBaseRecordingPath;
private String mRecordingIdentifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class ComplexSamplesWaveRecorder extends Module implements IComplexSample
{
private final static Logger mLog = LoggerFactory.getLogger(ComplexSamplesWaveRecorder.class);

private Dispatcher<ComplexSamples> mBufferProcessor = new Dispatcher<>("sdrtrunk complex wave recorder", 100, 250);
private Dispatcher<ComplexSamples> mBufferProcessor = new Dispatcher<>("sdrtrunk complex wave recorder", 250);
private AtomicBoolean mRunning = new AtomicBoolean();
private BufferWaveWriter mWriter;
private String mFilePrefix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class NativeBufferWaveRecorder extends Module implements Listener<INative
private static final Logger mLog = LoggerFactory.getLogger(ComplexSamplesWaveRecorder.class);
private static final long STATUS_UPDATE_BYTE_INTERVAL = 1_048_576;
private static final long MAX_RECORDING_SIZE = (long)Integer.MAX_VALUE * 2l;
private Dispatcher<INativeBuffer> mBufferProcessor = new Dispatcher<>("sdrtrunk native buffer wave recorder", 100, 250);
private Dispatcher<INativeBuffer> mBufferProcessor = new Dispatcher<>("sdrtrunk native buffer wave recorder", 250);

private AtomicBoolean mRunning = new AtomicBoolean();
private NativeBufferWaveWriter mWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ public HalfBandTunerChannelSource(Listener<SourceEvent> producerSourceEventListe
mQDecimationFilter = DecimationFilterFactory.getRealDecimationFilter(decimation);

//Set dispatcher to process 1/10 of estimated sample arrival rate, 20 times per second (up to 200% per interval)
mBufferDispatcher = new Dispatcher("sdrtrunk heterodyne channel " + tunerChannel.getFrequency(),
(int)(sampleRate / 10), 50, getHeartbeatManager());
mBufferDispatcher = new Dispatcher("sdrtrunk heterodyne channel " + tunerChannel.getFrequency(), 50, getHeartbeatManager());
mBufferDispatcher.setListener(new NativeBufferProcessor());

//Setup the frequency mixer to the current source frequency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public PassThroughChannelSource(Listener<SourceEvent> listener, TunerController
super(listener, tunerChannel);
mTunerController = tunerController;
mBufferDispatcher = new Dispatcher<>("sdrtrunk pass-through channel " + tunerChannel.getFrequency(),
250, 50, getHeartbeatManager());
50, getHeartbeatManager());
mBufferDispatcher.setListener(new BufferProcessor());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class SDRplay
private static final Logger mLog = LoggerFactory.getLogger(SDRplay.class);

/**
* Foreign memory arenaallocation resource scope
* Foreign memory arena allocation resource scope
*/
private final Arena mArena = Arena.openShared();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,24 @@ public class CallbackFunctions

/**
* Constructs a callback functions for single-tuner use.
* @param arena for native memory allocation
* @param longLifecycleArena for native memory allocation for long lifecycle objects.
* @param deviceEventListener for device events
* @param streamListener for streaming samples
* @param streamCallbackListener for streaming events
*/
public CallbackFunctions(Arena arena, IDeviceEventListener deviceEventListener,
public CallbackFunctions(Arena longLifecycleArena, IDeviceEventListener deviceEventListener,
IStreamListener streamListener, IStreamCallbackListener streamCallbackListener)
{
//Create the event callback function
mDeviceEventAdapter = new DeviceEventAdapter(arena, deviceEventListener);
MemorySegment eventFunction = sdrplay_api_EventCallback_t.allocate(mDeviceEventAdapter, arena.scope());
mDeviceEventAdapter = new DeviceEventAdapter(deviceEventListener);
MemorySegment eventFunction = sdrplay_api_EventCallback_t.allocate(mDeviceEventAdapter, longLifecycleArena.scope());

//Create the stream A callback function
mStreamACallbackAdapter = new StreamCallbackAdapter(arena, streamListener, streamCallbackListener);
MemorySegment streamAFunction = sdrplay_api_StreamCallback_t.allocate(mStreamACallbackAdapter, arena.scope());
mStreamACallbackAdapter = new StreamCallbackAdapter(streamListener, streamCallbackListener);
MemorySegment streamAFunction = sdrplay_api_StreamCallback_t.allocate(mStreamACallbackAdapter, longLifecycleArena.scope());

//Create the callback functions union and populate the callback functions
mCallbackFunctionsMemorySegment = sdrplay_api_CallbackFnsT.allocate(arena);
mCallbackFunctionsMemorySegment = sdrplay_api_CallbackFnsT.allocate(longLifecycleArena);
sdrplay_api_CallbackFnsT.EventCbFn$set(mCallbackFunctionsMemorySegment, eventFunction);
sdrplay_api_CallbackFnsT.StreamACbFn$set(mCallbackFunctionsMemorySegment, streamAFunction);
}
Expand All @@ -72,15 +72,15 @@ public CallbackFunctions(Arena arena, IDeviceEventListener deviceEventListener,
IStreamCallbackListener streamCallbackListener)
{
//Create the event callback function
mDeviceEventAdapter = new DeviceEventAdapter(arena, deviceEventListener);
mDeviceEventAdapter = new DeviceEventAdapter(deviceEventListener);
MemorySegment eventFunction = sdrplay_api_EventCallback_t.allocate(mDeviceEventAdapter, arena.scope());

//Create the stream A callback function
mStreamACallbackAdapter = new StreamCallbackAdapter(arena, streamAListener, streamCallbackListener);
mStreamACallbackAdapter = new StreamCallbackAdapter(streamAListener, streamCallbackListener);
MemorySegment streamAFunction = sdrplay_api_StreamCallback_t.allocate(mStreamACallbackAdapter, arena.scope());

//Create the stream B callback function
mStreamBCallbackAdapter = new StreamCallbackAdapter(arena, streamBListener, streamCallbackListener);
mStreamBCallbackAdapter = new StreamCallbackAdapter(streamBListener, streamCallbackListener);
MemorySegment streamBFunction = sdrplay_api_StreamCallback_t.allocate(mStreamBCallbackAdapter, arena.scope());

//Create the callback functions union and populate the callback functions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,14 @@
public class DeviceEventAdapter implements sdrplay_api_EventCallback_t
{
private static final Logger mLog = LoggerFactory.getLogger(DeviceEventAdapter.class);

private Arena mArena;
private IDeviceEventListener mDeviceEventListener;

/**
* Constructs an instance.
* @param arena to use in creating foreign memory segments to access foreign structures.
* @param listener to receive translated device events.
*/
public DeviceEventAdapter(Arena arena, IDeviceEventListener listener)
public DeviceEventAdapter(IDeviceEventListener listener)
{
if(arena == null)
{
throw new IllegalArgumentException("Resource scope must be non-null");
}

mArena = arena;

setListener(listener);
}

Expand All @@ -75,35 +65,44 @@ public void setListener(IDeviceEventListener listener)
public void apply(int eventTypeId, int tunerSelectId, MemorySegment eventParametersPointer,
MemorySegment callbackContext)
{
MemorySegment memorySegment = sdrplay_api_EventParamsT.ofAddress(eventParametersPointer, mArena.scope());
EventType eventType = EventType.fromValue(eventTypeId);
TunerSelect tunerSelect = TunerSelect.fromValue(tunerSelectId);

switch(eventType)
try(Arena arena = Arena.openConfined())
{
case GAIN_CHANGE -> {
mDeviceEventListener.processGainChange(tunerSelect,
EventParametersFactory.createGainCallbackParameters(memorySegment));
}
case POWER_OVERLOAD_CHANGE -> {
mDeviceEventListener.processPowerOverload(tunerSelect,
EventParametersFactory.createPowerOverloadCallbackParameters(memorySegment));
}
case DEVICE_REMOVED -> {
mDeviceEventListener.processDeviceRemoval(tunerSelect);
}
case RSP_DUO_MODE_CHANGE -> {
mDeviceEventListener.processRspDuoModeChange(tunerSelect,
EventParametersFactory.createRspDuoModeCallbackParameters(memorySegment));
}
case UNKNOWN -> {
mLog.warn("Unknown device event callback ignored. Please contact the library developer as this may " +
"indicate a change to the SDRPlay API change. Tuner:" + tunerSelect + " Event Type ID:" +
eventTypeId);
mDeviceEventListener.processEvent(eventType, tunerSelect);
}
default -> {
throw new IllegalStateException("DeviceEventAdapter must be updated handle EventType." + eventType);
MemorySegment memorySegment = sdrplay_api_EventParamsT.ofAddress(eventParametersPointer, arena.scope());
EventType eventType = EventType.fromValue(eventTypeId);
TunerSelect tunerSelect = TunerSelect.fromValue(tunerSelectId);

switch(eventType)
{
case GAIN_CHANGE ->
{
mDeviceEventListener.processGainChange(tunerSelect,
EventParametersFactory.createGainCallbackParameters(memorySegment));
}
case POWER_OVERLOAD_CHANGE ->
{
mDeviceEventListener.processPowerOverload(tunerSelect,
EventParametersFactory.createPowerOverloadCallbackParameters(memorySegment));
}
case DEVICE_REMOVED ->
{
mDeviceEventListener.processDeviceRemoval(tunerSelect);
}
case RSP_DUO_MODE_CHANGE ->
{
mDeviceEventListener.processRspDuoModeChange(tunerSelect,
EventParametersFactory.createRspDuoModeCallbackParameters(memorySegment));
}
case UNKNOWN ->
{
mLog.warn("Unknown device event callback ignored. Please contact the library developer as this may " +
"indicate a change to the SDRPlay API change. Tuner:" + tunerSelect + " Event Type ID:" +
eventTypeId);
mDeviceEventListener.processEvent(eventType, tunerSelect);
}
default ->
{
throw new IllegalStateException("DeviceEventAdapter must be updated handle EventType." + eventType);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,16 @@
public class StreamCallbackAdapter implements sdrplay_api_StreamCallback_t
{
private static final Logger mLog = LoggerFactory.getLogger(StreamCallbackAdapter.class);
private Arena mArena;
private IStreamListener mStreamListener;
private IStreamCallbackListener mStreamCallbackListener;

/**
* Constructs an instance of the callback implementation
* @param arena for defining new foreign memory segments
* @param streamListener to receive transferred I/Q samples and event details
* @param listener to receive callback parameters
*/
public StreamCallbackAdapter(Arena arena, IStreamListener streamListener, IStreamCallbackListener listener)
public StreamCallbackAdapter(IStreamListener streamListener, IStreamCallbackListener listener)
{
if(arena == null)
{
throw new IllegalArgumentException("Arena must be non-null");
}

mArena = arena;
mStreamCallbackListener = listener;
setListener(streamListener);
}
Expand Down Expand Up @@ -81,24 +73,27 @@ public void apply(MemorySegment iSamplesPointer, MemorySegment qSamplesPointer,
{
if(mStreamListener != null || mStreamCallbackListener != null)
{
//Translate the callback parameters pointer to a memory segment and re-construct the parameters as a Java object
StreamCallbackParameters parameters = new StreamCallbackParameters(sdrplay_api_StreamCbParamsT
.ofAddress(parametersPointer, mArena.scope()));

if(mStreamCallbackListener != null)
try(Arena arena = Arena.openConfined())
{
mStreamCallbackListener.process(mStreamListener.getTunerSelect(), parameters, reset);
}
//Translate the callback parameters pointer to a memory segment and re-construct the parameters as a Java object
StreamCallbackParameters parameters = new StreamCallbackParameters(sdrplay_api_StreamCbParamsT
.ofAddress(parametersPointer, arena.scope()));

if(mStreamListener != null)
{
//Allocate memory segments from I/Q pointers, transfer from native to JVM array, and send to listener
long arrayByteSize = ValueLayout.JAVA_SHORT.byteSize() * sampleCount;
MemorySegment iSamples = MemorySegment.ofAddress(iSamplesPointer.address(), arrayByteSize, mArena.scope());
MemorySegment qSamples = MemorySegment.ofAddress(qSamplesPointer.address(), arrayByteSize, mArena.scope());
short[] i = iSamples.toArray(ValueLayout.JAVA_SHORT);
short[] q = qSamples.toArray(ValueLayout.JAVA_SHORT);
mStreamListener.processStream(i, q, parameters, Flag.evaluate(reset));
if(mStreamCallbackListener != null)
{
mStreamCallbackListener.process(mStreamListener.getTunerSelect(), parameters, reset);
}

if(mStreamListener != null)
{
//Allocate memory segments from I/Q pointers, transfer from native to JVM array, and send to listener
long arrayByteSize = ValueLayout.JAVA_SHORT.byteSize() * sampleCount;
MemorySegment iSamples = MemorySegment.ofAddress(iSamplesPointer.address(), arrayByteSize, arena.scope());
MemorySegment qSamples = MemorySegment.ofAddress(qSamplesPointer.address(), arrayByteSize, arena.scope());
short[] i = iSamples.toArray(ValueLayout.JAVA_SHORT);
short[] q = qSamples.toArray(ValueLayout.JAVA_SHORT);
mStreamListener.processStream(i, q, parameters, Flag.evaluate(reset));
}
}
}
}
Expand Down

0 comments on commit e1bc9ea

Please sign in to comment.