Skip to content

Commit

Permalink
#1353 Resolves issue where polyphase channelizer would mis-tune chann…
Browse files Browse the repository at this point in the history
…els after a period of time resulting in idle decoding channels.
  • Loading branch information
Dennis Sheirer committed Dec 17, 2022
1 parent efe36b0 commit 62f9f6a
Show file tree
Hide file tree
Showing 10 changed files with 435 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@
import io.github.dsheirer.buffer.INativeBufferProvider;
import io.github.dsheirer.buffer.NativeBufferPoisonPill;
import io.github.dsheirer.controller.channel.event.ChannelStopProcessingRequest;
import io.github.dsheirer.dsp.filter.FilterFactory;
import io.github.dsheirer.dsp.filter.channelizer.output.IPolyphaseChannelOutputProcessor;
import io.github.dsheirer.dsp.filter.channelizer.output.OneChannelOutputProcessor;
import io.github.dsheirer.dsp.filter.channelizer.output.TwoChannelOutputProcessor;
import io.github.dsheirer.dsp.filter.design.FilterDesignException;
import io.github.dsheirer.eventbus.MyEventBus;
import io.github.dsheirer.sample.Broadcaster;
Expand All @@ -44,10 +40,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.math3.util.FastMath;
import org.slf4j.Logger;
Expand Down Expand Up @@ -75,12 +69,12 @@ public class PolyphaseChannelManager implements ISourceEventProcessor
private static final double MINIMUM_CHANNEL_BANDWIDTH = 25000.0;
private static final double CHANNEL_OVERSAMPLING = 2.0;
private static final int POLYPHASE_CHANNELIZER_TAPS_PER_CHANNEL = 9;
private static final int POLYPHASE_SYNTHESIZER_TAPS_PER_CHANNEL = 9;

private Broadcaster<SourceEvent> mSourceEventBroadcaster = new Broadcaster<>();
private INativeBufferProvider mNativeBufferProvider;
private List<PolyphaseChannelSource> mChannelSources = new CopyOnWriteArrayList<>();
private ChannelCalculator mChannelCalculator;
private SynthesisFilterManager mFilterManager = new SynthesisFilterManager();
private ComplexPolyphaseChannelizerM2 mPolyphaseChannelizer;
private ChannelSourceEventListener mChannelSourceEventListener = new ChannelSourceEventListener();
private NativeBufferReceiver mNativeBufferReceiver = new NativeBufferReceiver();
Expand Down Expand Up @@ -181,63 +175,22 @@ public TunerChannelSource getChannel(TunerChannel tunerChannel)

if(mRunning)
{
List<Integer> polyphaseIndexes = mChannelCalculator.getChannelIndexes(tunerChannel);

IPolyphaseChannelOutputProcessor outputProcessor = getOutputProcessor(polyphaseIndexes);

if(outputProcessor != null)
try
{
long centerFrequency = mChannelCalculator.getCenterFrequencyForIndexes(polyphaseIndexes);

try
{
channelSource = new PolyphaseChannelSource(tunerChannel, outputProcessor, mChannelSourceEventListener,
mChannelCalculator.getChannelSampleRate(), centerFrequency);
channelSource = new PolyphaseChannelSource(tunerChannel, mChannelCalculator, mFilterManager,
mChannelSourceEventListener);

mChannelSources.add(channelSource);
}
catch(FilterDesignException fde)
{
mLog.debug("Couldn't design final output low pass filter for polyphase channel source");
}
mChannelSources.add(channelSource);
}
catch(IllegalArgumentException iae)
{
mLog.debug("Couldn't design final output low pass filter for polyphase channel source");
}
}

return channelSource;
}

/**
* Creates a processor to process the channelizer channel indexes into a composite output stream providing
* channelized complex sample buffers to a registered source listener.
* @param indexes to target by the output processor
* @return output processor compatible with the number of indexes to monitor
*/
private IPolyphaseChannelOutputProcessor getOutputProcessor(List<Integer> indexes)
{
switch(indexes.size())
{
case 1:
return new OneChannelOutputProcessor(mChannelCalculator.getChannelSampleRate(), indexes,
mChannelCalculator.getChannelCount());
case 2:
try
{
float[] filter = getOutputProcessorFilter(2);
return new TwoChannelOutputProcessor(mChannelCalculator.getChannelSampleRate(), indexes, filter,
mChannelCalculator.getChannelCount());
}
catch(FilterDesignException fde)
{
mLog.error("Error designing 2 channel synthesis filter for output processor");
}
default:
//TODO: create output processor for greater than 2 input channels
mLog.error("Request to create an output processor for unexpected channel index size:" + indexes.size());
mLog.info(mChannelCalculator.toString());
return null;
}
}

/**
* Starts/adds the channel source to receive channelized sample buffers, registering with the tuner to receive
* sample buffers when this is the first channel.
Expand All @@ -249,7 +202,6 @@ private void startChannelSource(PolyphaseChannelSource channelSource)
synchronized(mBufferDispatcher)
{
//Note: the polyphase channel source has already been added to the mChannelSources in getChannel() method

checkChannelizerConfiguration();

mPolyphaseChannelizer.addChannel(channelSource);
Expand Down Expand Up @@ -311,10 +263,6 @@ public void process(SourceEvent sourceEvent) throws SourceException
switch(sourceEvent.getEvent())
{
case NOTIFICATION_FREQUENCY_CHANGE:
//Update channel calculator immediately so that channels can be allocated
mChannelCalculator.setCenterFrequency(sourceEvent.getValue().longValue());

//Defer channelizer configuration changes to be handled on the buffer processor thread
mNativeBufferReceiver.receive(sourceEvent);
break;
case NOTIFICATION_SAMPLE_RATE_CHANGE:
Expand Down Expand Up @@ -381,80 +329,22 @@ private void checkChannelizerConfiguration()
* Updates each of the output processors for any changes in the tuner's center frequency or sample rate, which
* would cause the output processors to change the polyphase channelizer results channel(s) that the processor is
* consuming
*
* @param sourceEvent (optional-can be null) to broadcast to each output processor following the update
*/
private void updateOutputProcessors(SourceEvent sourceEvent)
private void updateOutputProcessors()
{
for(PolyphaseChannelSource channelSource: mChannelSources)
{
updateOutputProcessor(channelSource);

//Send the non-null source event to each channel source
if(sourceEvent != null)
{
try
{
channelSource.process(sourceEvent);
}
catch(SourceException se)
{
mLog.error("Error while notifying polyphase channel source of a source event", se);
}
}
}
}

/**
* Updates the polyphase channel source's output processor due to a change in the center frequency or sample
* rate for the source providing sample buffers to the polyphase channelizer, or whenever the DDC channel's
* center tuned frequency changes.
*
* @param channelSource that requires an update to its output processor
*/
private void updateOutputProcessor(PolyphaseChannelSource channelSource)
{
try
{
//If a change in sample rate or center frequency makes this channel no longer viable, then the channel
//calculator will throw an IllegalArgException ... handled below
List<Integer> indexes = mChannelCalculator.getChannelIndexes(channelSource.getTunerChannel());

long centerFrequency = mChannelCalculator.getCenterFrequencyForIndexes(indexes);

//If the indexes size is the same then update the current processor, otherwise create a new one
IPolyphaseChannelOutputProcessor outputProcessor = channelSource.getPolyphaseChannelOutputProcessor();

if(outputProcessor != null && outputProcessor.getInputChannelCount() == indexes.size())
try
{
channelSource.getPolyphaseChannelOutputProcessor().setPolyphaseChannelIndices(indexes);
channelSource.setFrequency(centerFrequency);

if(indexes.size() > 1)
{
try
{
float[] filter = getOutputProcessorFilter(indexes.size());
channelSource.getPolyphaseChannelOutputProcessor().setSynthesisFilter(filter);
}
catch(FilterDesignException fde)
{
mLog.error("Error creating an updated synthesis filter for the channel output processor");
}
}
channelSource.updateOutputProcessor(mChannelCalculator, mFilterManager);
}
else
catch(IllegalArgumentException iae)
{
channelSource.setPolyphaseChannelOutputProcessor(getOutputProcessor(indexes), centerFrequency);
mLog.error("Error updating polyphase channel source output processor following tuner frequency or " +
"sample rate change");
stopChannelSource(channelSource);
}
}
catch(IllegalArgumentException iae)
{
mLog.error("Error updating polyphase channel source - can't determine output channel indexes for " +
"updated tuner center frequency and sample rate. Stopping channel source", iae);

stopChannelSource(channelSource);
}
}

/**
Expand Down Expand Up @@ -497,29 +387,6 @@ public void removeSourceEventListener(Listener<SourceEvent> listener)
mSourceEventBroadcaster.removeListener(listener);
}

/**
* Generates (or reuses) an output processor filter for the specified number of channels. Each
* filter is created only once and stored in a map for reuse. This map is cleared anytime that the
* input sample rate changes, so that the filters can be recreated with the new channel sample rate.
* @param channels count
* @return filter
* @throws FilterDesignException if the filter cannot be designed to specification (-6 dB band edge)
*/
private float[] getOutputProcessorFilter(int channels) throws FilterDesignException
{
float[] taps = mOutputProcessorFilters.get(channels);

if(taps == null)
{
taps = FilterFactory.getSincM2Synthesizer(mChannelCalculator.getChannelSampleRate(),
mChannelCalculator.getChannelBandwidth(), channels, POLYPHASE_SYNTHESIZER_TAPS_PER_CHANNEL);

mOutputProcessorFilters.put(channels, taps);
}

return taps;
}

/**
* Internal class for handling requests for start/stop sample stream from polyphase channel sources
*/
Expand Down Expand Up @@ -584,36 +451,32 @@ public void receive(SourceEvent sourceEvent)
*/
public class NativeBufferReceiver implements Listener<INativeBuffer>
{
private Queue<SourceEvent> mQueuedSourceEvents = new ConcurrentLinkedQueue<>();
private boolean mOutputProcessorUpdateRequired = false;

/**
* Queues the source event for deferred execution on the buffer processing thread.
* @param event that affects configuration of the channelizer (frequency or sample rate change events)
*/
public void receive(SourceEvent event)
{
mQueuedSourceEvents.offer(event);
long frequency = event.getValue().longValue();

if(mChannelCalculator.getCenterFrequency() != frequency)
{
mChannelCalculator.setCenterFrequency(frequency);
mOutputProcessorUpdateRequired = true;
}
}

@Override
public void receive(INativeBuffer nativeBuffer)
{
try
{
//Process any queued source events before processing the buffers
SourceEvent queuedSourceEvent = mQueuedSourceEvents.poll();

while(queuedSourceEvent != null)
if(mOutputProcessorUpdateRequired)
{
switch(queuedSourceEvent.getEvent())
{
case NOTIFICATION_FREQUENCY_CHANGE:
//Don't send the tuner's frequency change event down to the channels - it would cause chaos
updateOutputProcessors(null);
break;
}

queuedSourceEvent = mQueuedSourceEvents.poll();
updateOutputProcessors();
mOutputProcessorUpdateRequired = false;
}

if(mPolyphaseChannelizer != null)
Expand Down

0 comments on commit 62f9f6a

Please sign in to comment.