Skip to content

Commit

Permalink
#1506 Experiment #3. Updates dispatcher to use single-thread thread p…
Browse files Browse the repository at this point in the history
…ool with batch processing of samples.
  • Loading branch information
Dennis Sheirer committed Mar 26, 2023
1 parent 70338db commit 66e6306
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 300 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* *****************************************************************************
* Copyright (C) 2014-2022 Dennis Sheirer
* Copyright (C) 2014-2023 Dennis Sheirer
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand All @@ -24,7 +24,6 @@
import io.github.dsheirer.util.Dispatcher;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.math3.util.FastMath;
import org.jtransforms.fft.FloatFFT_1D;
Expand Down Expand Up @@ -69,8 +68,8 @@ public class ComplexPolyphaseChannelizerM2 extends AbstractComplexPolyphaseChann
*/
private static final int PROCESSED_CHANNEL_RESULTS_THRESHOLD = 1024;

//Sized at 152 buffers a second where max = 5 seconds
private IFFTProcessorDispatcher mIFFTProcessorDispatcher = new IFFTProcessorDispatcher(5 * 152);
//Sized to process 1/20th batch of 152 buffers a second, at 40 times per second
private IFFTProcessorDispatcher mIFFTProcessorDispatcher = new IFFTProcessorDispatcher(152 / 20, 25);
private FloatFFT_1D mFFT;
private float[] mInlineSamples;
private float[] mInlineFilter;
Expand Down Expand Up @@ -407,25 +406,32 @@ private void init(float[] coefficients)
*/
public class IFFTProcessorDispatcher extends Dispatcher<List<float[]>>
{
public IFFTProcessorDispatcher(int maximumSize)
public IFFTProcessorDispatcher(int batchSize, long interval)
{
super(maximumSize, "sdrtrunk polyphase ifft processor", Collections.emptyList());
super("sdrtrunk polyphase ifft processor", batchSize, interval);

//We create a listener interface to receive the batched channel results arrays from the scheduled thread pool
//dispatcher thread that is part of this continuous buffer processor. We perform an IFFT on each
//channel results array contained in each results buffer and then dispatch the buffer
//so that it can be distributed to each channel listener.
setListener(list -> {
List<float[]> processedChannelResults = new ArrayList<>();
try
{
List<float[]> processedChannelResults = new ArrayList<>();

for(float[] channelResults: list)
{
//Rotate each of the channels to the correct phase using the IFFT
mFFT.complexInverse(channelResults, true);
processedChannelResults.add(channelResults);
}

for(float[] channelResults: list)
dispatch(processedChannelResults);
}
catch(Throwable t)
{
//Rotate each of the channels to the correct phase using the IFFT
mFFT.complexInverse(channelResults, true);
processedChannelResults.add(channelResults);
mLog.error("Error during IFFT and dispatch of processed channel results", t);
}

dispatch(processedChannelResults);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import io.github.dsheirer.buffer.INativeBuffer;
import io.github.dsheirer.buffer.INativeBufferProvider;
import io.github.dsheirer.buffer.NativeBufferPoisonPill;
import io.github.dsheirer.controller.channel.event.ChannelStopProcessingRequest;
import io.github.dsheirer.dsp.filter.design.FilterDesignException;
import io.github.dsheirer.eventbus.MyEventBus;
Expand Down Expand Up @@ -110,8 +109,7 @@ public PolyphaseChannelManager(INativeBufferProvider nativeBufferProvider, long
}

mChannelCalculator = new ChannelCalculator(sampleRate, channelCount, frequency, CHANNEL_OVERSAMPLING);
mBufferDispatcher = new Dispatcher(500, "sdrtrunk polyphase buffer processor",
new NativeBufferPoisonPill());
mBufferDispatcher = new Dispatcher("sdrtrunk polyphase buffer processor", 50, 10);
mBufferDispatcher.setListener(mNativeBufferReceiver);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.github.dsheirer.sample.Listener;
import io.github.dsheirer.sample.complex.ComplexSamples;
import io.github.dsheirer.source.SourceEvent;
import io.github.dsheirer.source.tuner.channel.StreamProcessorWithHeartbeat;
import io.github.dsheirer.source.tuner.channel.TunerChannel;
import io.github.dsheirer.source.tuner.channel.TunerChannelSource;
import java.util.ArrayList;
Expand All @@ -42,7 +41,7 @@ public class PolyphaseChannelSource extends TunerChannelSource implements Listen
{
private Logger mLog = LoggerFactory.getLogger(PolyphaseChannelSource.class);
private IPolyphaseChannelOutputProcessor mPolyphaseChannelOutputProcessor;
private StreamProcessorWithHeartbeat<ComplexSamples> mStreamHeartbeatProcessor;
private Listener<ComplexSamples> mSamplesListener;
private double mChannelSampleRate;
private long mIndexCenterFrequency;
private List<Integer> mOutputProcessorIndexes = new ArrayList<>();
Expand All @@ -64,7 +63,6 @@ public PolyphaseChannelSource(TunerChannel tunerChannel, ChannelCalculator chann
{
super(producerSourceEventListener, tunerChannel);
mChannelSampleRate = channelCalculator.getChannelSampleRate();
mStreamHeartbeatProcessor = new StreamProcessorWithHeartbeat<>(getHeartbeatManager(), HEARTBEAT_INTERVAL_MS);
doUpdateOutputProcessor(channelCalculator, filterManager);
}

Expand Down Expand Up @@ -108,8 +106,6 @@ public void start()
{
super.start();

mStreamHeartbeatProcessor.start();

if(mPolyphaseChannelOutputProcessor != null)
{
mPolyphaseChannelOutputProcessor.start();
Expand All @@ -125,8 +121,6 @@ public void stop()
{
mPolyphaseChannelOutputProcessor.stop();
}

mStreamHeartbeatProcessor.stop();
}

/**
Expand All @@ -135,14 +129,24 @@ public void stop()
@Override
public void setListener(final Listener<ComplexSamples> listener)
{
mStreamHeartbeatProcessor.setListener(listener);
mSamplesListener = listener;
mPolyphaseChannelOutputProcessor.setListener(this);
}

@Override
public void receive(ComplexSamples complexSamples)
{
mStreamHeartbeatProcessor.receive(complexSamples);
if(mSamplesListener != null)
{
try
{
mSamplesListener.receive(complexSamples);
}
catch(Throwable t)
{
mLog.error("Error dispatching complex samples to listener [" + mSamplesListener.getClass() + "]", t);
}
}
}

/**
Expand Down Expand Up @@ -209,6 +213,7 @@ public void doUpdateOutputProcessor(ChannelCalculator channelCalculator, Synthes
if(mPolyphaseChannelOutputProcessor != null)
{
mPolyphaseChannelOutputProcessor.setListener(null);
mPolyphaseChannelOutputProcessor.setHeartbeatManager(null);
mPolyphaseChannelOutputProcessor.stop();
}

Expand Down Expand Up @@ -249,6 +254,14 @@ public void doUpdateOutputProcessor(ChannelCalculator channelCalculator, Synthes
}
}

/**
* Register this channel's heartbeat manager to receive heartbeat commands on the output processor's dispatch interval
*/
if(mPolyphaseChannelOutputProcessor != null)
{
mPolyphaseChannelOutputProcessor.setHeartbeatManager(getHeartbeatManager());
}

//Unlikely, but if we had an error designing a synthesis filter, throw an exception
if(errorMessage != null)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* *****************************************************************************
* Copyright (C) 2014-2022 Dennis Sheirer
* Copyright (C) 2014-2023 Dennis Sheirer
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand All @@ -20,8 +20,8 @@

import io.github.dsheirer.sample.Listener;
import io.github.dsheirer.sample.complex.ComplexSamples;
import io.github.dsheirer.source.heartbeat.HeartbeatManager;
import io.github.dsheirer.util.Dispatcher;
import java.util.Collections;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,8 +46,31 @@ public abstract class ChannelOutputProcessor implements IPolyphaseChannelOutputP
public ChannelOutputProcessor(int inputChannelCount, double sampleRate)
{
mInputChannelCount = inputChannelCount;
mChannelResultsDispatcher = new Dispatcher<>((int)sampleRate, "sdrtrunk polyphase channel", Collections.emptyList());
mChannelResultsDispatcher.setListener(floats -> process(floats));
//Process 1/10th of the sample rate per second at a rate of 20 times a second (200% of anticipated rate)
mChannelResultsDispatcher = new Dispatcher("sdrtrunk polyphase channel", (int)(sampleRate / 10), 50);
mChannelResultsDispatcher.setListener(floats -> {
try
{
process(floats);
}
catch(Throwable t)
{
mLog.error("Error processing channel results", t);
}
});
}

/**
* Sets the heartbeat manager to receive commands from the dispatcher thread on the dispatching interval.
* @param heartbeatManager to be commanded.
*/
@Override
public void setHeartbeatManager(HeartbeatManager heartbeatManager)
{
if(mChannelResultsDispatcher != null)
{
mChannelResultsDispatcher.setHeartbeatManager(heartbeatManager);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.github.dsheirer.sample.Listener;
import io.github.dsheirer.sample.complex.ComplexSamples;
import io.github.dsheirer.source.heartbeat.HeartbeatManager;
import java.util.List;

public interface IPolyphaseChannelOutputProcessor
Expand Down Expand Up @@ -50,6 +51,12 @@ public interface IPolyphaseChannelOutputProcessor
*/
void setListener(Listener<ComplexSamples> listener);

/**
* Sets a heartbeat manager to receive queues on the dispatching interval.
* @param heartbeatManager to be commanded.
*/
void setHeartbeatManager(HeartbeatManager heartbeatManager);

/**
* Sets the desired frequency offset from center. The samples will be mixed with an oscillator set to this offset
* frequency to produce an output where the desired signal is centered in the passband.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* *****************************************************************************
* Copyright (C) 2014-2022 Dennis Sheirer
* Copyright (C) 2014-2023 Dennis Sheirer
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand All @@ -25,9 +25,6 @@
import io.github.dsheirer.util.Dispatcher;
import io.github.dsheirer.util.StringUtils;
import io.github.dsheirer.util.TimeStamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
Expand All @@ -36,6 +33,8 @@
import java.nio.file.StandardOpenOption;
import java.util.EnumSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Binary recorder module for recording demodulated bit/byte streams
Expand All @@ -50,8 +49,7 @@ public class BinaryRecorder extends Module implements IByteBufferListener
private final static Logger mLog = LoggerFactory.getLogger(BinaryRecorder.class);
private static final int MAX_RECORDING_BYTE_SIZE = 524288; //500 kB

private Dispatcher<ByteBuffer> mBufferProcessor = new Dispatcher<>(500,
"sdrtrunk binary recorder", ByteBuffer.allocate(0));
private Dispatcher<ByteBuffer> mBufferProcessor = new Dispatcher<>("sdrtrunk binary recorder", 200, 250);
private AtomicBoolean mRunning = new AtomicBoolean();
private Path mBaseRecordingPath;
private String mRecordingIdentifier;
Expand Down Expand Up @@ -96,7 +94,7 @@ public void stop()
{
if(mBufferProcessor != null)
{
mBufferProcessor.flushAndStop();
mBufferProcessor.stop();
mBufferProcessor.setListener(null);

try
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* *****************************************************************************
* Copyright (C) 2014-2022 Dennis Sheirer
* Copyright (C) 2014-2023 Dennis Sheirer
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -45,9 +45,7 @@ public class ComplexSamplesWaveRecorder extends Module implements IComplexSample
{
private final static Logger mLog = LoggerFactory.getLogger(ComplexSamplesWaveRecorder.class);

private Dispatcher<ComplexSamples> mBufferProcessor = new Dispatcher<>(500,
"sdrtrunk complex wave recorder", new ComplexSamples(new float[0], new float[0], System.currentTimeMillis()));

private Dispatcher<ComplexSamples> mBufferProcessor = new Dispatcher<>("sdrtrunk complex wave recorder", 100, 250);
private AtomicBoolean mRunning = new AtomicBoolean();
private BufferWaveWriter mWriter;
private String mFilePrefix;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* *****************************************************************************
* Copyright (C) 2014-2022 Dennis Sheirer
* Copyright (C) 2014-2023 Dennis Sheirer
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand All @@ -19,7 +19,6 @@
package io.github.dsheirer.record.wave;

import io.github.dsheirer.buffer.INativeBuffer;
import io.github.dsheirer.buffer.NativeBufferPoisonPill;
import io.github.dsheirer.module.Module;
import io.github.dsheirer.sample.ConversionUtils;
import io.github.dsheirer.sample.Listener;
Expand All @@ -29,16 +28,16 @@
import io.github.dsheirer.util.Dispatcher;
import io.github.dsheirer.util.ThreadPool;
import io.github.dsheirer.util.TimeStamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sound.sampled.AudioFormat;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sound.sampled.AudioFormat;

/**
* WAVE audio recorder module for recording complex (I&Q) samples to a wave file
Expand All @@ -48,8 +47,7 @@ public class NativeBufferWaveRecorder extends Module implements Listener<INative
private static final Logger mLog = LoggerFactory.getLogger(ComplexSamplesWaveRecorder.class);
private static final long STATUS_UPDATE_BYTE_INTERVAL = 1_048_576;
private static final long MAX_RECORDING_SIZE = (long)Integer.MAX_VALUE * 2l;
private Dispatcher<INativeBuffer> mBufferProcessor = new Dispatcher<>(500,
"sdrtrunk native buffer wave recorder", new NativeBufferPoisonPill());
private Dispatcher<INativeBuffer> mBufferProcessor = new Dispatcher<>("sdrtrunk native buffer wave recorder", 100, 250);

private AtomicBoolean mRunning = new AtomicBoolean();
private NativeBufferWaveWriter mWriter;
Expand Down

0 comments on commit 66e6306

Please sign in to comment.