-
Notifications
You must be signed in to change notification settings - Fork 240
/
PolyphaseChannelManager.java
525 lines (479 loc) · 21.8 KB
/
PolyphaseChannelManager.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
/*
* *****************************************************************************
* Copyright (C) 2014-2024 Dennis Sheirer
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>
* ****************************************************************************
*/
package io.github.dsheirer.dsp.filter.channelizer;
import io.github.dsheirer.buffer.INativeBuffer;
import io.github.dsheirer.buffer.INativeBufferProvider;
import io.github.dsheirer.controller.channel.event.ChannelStopProcessingRequest;
import io.github.dsheirer.dsp.filter.design.FilterDesignException;
import io.github.dsheirer.eventbus.MyEventBus;
import io.github.dsheirer.log.LoggingSuppressor;
import io.github.dsheirer.sample.Broadcaster;
import io.github.dsheirer.sample.Listener;
import io.github.dsheirer.sample.complex.InterleavedComplexSamples;
import io.github.dsheirer.source.ISourceEventProcessor;
import io.github.dsheirer.source.SourceEvent;
import io.github.dsheirer.source.SourceException;
import io.github.dsheirer.source.tuner.TunerController;
import io.github.dsheirer.source.tuner.channel.TunerChannel;
import io.github.dsheirer.source.tuner.channel.TunerChannelSource;
import io.github.dsheirer.util.Dispatcher;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.math3.util.FastMath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Polyphase Channel Manager is a DDC channel manager and complex buffer queue/processor for a tuner. This class
* provides DDC polyphase channel sources and wraps a polyphase channelizer processing sample buffers produced by
* the tuner and distributing channelized sample buffers to each allocated DDC polyphase channel source. This
* class is responsible for monitoring the tuner for changes in center frequency and/or sample rate and updating
* active DDC polyphase channel sources accordingly. This class also monitors source event requests and
* notifications received from active DDC polyphase channel sources to adjust sample streams as required.
*
* Channel bandwidth and channel count are determined by the sample rate of the baseband buffer stream provider. This
* class is currently designed to provide channels each with a minimum usable bandwidth of 12.5 kHz and oversampled by
* 2.0 to a minimum of 25.0 kHz channel sample rate. If the baseband stream provider sample rate is not evenly
* divisible by 12.5 kHz channels for an even number of channels, the channel bandwidth will be increased.
*
* Note: add this channel manager as a source event listener to the complex buffer provider to ensure this manager
* adapts to changes in source frequency and sample rate.
*/
public class PolyphaseChannelManager implements ISourceEventProcessor
{
private static final DecimalFormat FREQUENCY_FORMAT = new DecimalFormat("0.00000");
private static final LoggingSuppressor LOGGING_SUPPRESSOR = new LoggingSuppressor(LoggerFactory.getLogger(PolyphaseChannelManager.class));
private final static Logger mLog = LoggerFactory.getLogger(PolyphaseChannelManager.class);
private static final double MINIMUM_CHANNEL_BANDWIDTH = 25000.0;
private static final double CHANNEL_OVERSAMPLING = 2.0;
private static final int POLYPHASE_CHANNELIZER_TAPS_PER_CHANNEL = 9;
private Broadcaster<SourceEvent> mSourceEventBroadcaster = new Broadcaster<>();
private INativeBufferProvider mNativeBufferProvider;
private List<PolyphaseChannelSource> mChannelSources = new CopyOnWriteArrayList<>();
private ChannelCalculator mChannelCalculator;
private SynthesisFilterManager mFilterManager = new SynthesisFilterManager();
private ComplexPolyphaseChannelizerM2 mPolyphaseChannelizer;
private ChannelSourceEventListener mChannelSourceEventListener = new ChannelSourceEventListener();
private NativeBufferReceiver mNativeBufferReceiver = new NativeBufferReceiver();
private Dispatcher mBufferDispatcher;
private Map<Integer,float[]> mOutputProcessorFilters = new HashMap<>();
private boolean mRunning = true;
/**
* Creates a polyphase channel manager instance.
*
* @param nativeBufferProvider (ie tuner) that supports register/deregister for reusable baseband sample buffer
* streams
* @param frequency of the baseband complex buffer sample stream (ie center frequency)
* @param sampleRate of the baseband complex buffer sample stream
*/
public PolyphaseChannelManager(INativeBufferProvider nativeBufferProvider, long frequency, double sampleRate)
{
if(nativeBufferProvider == null)
{
throw new IllegalArgumentException("Complex buffer provider argument cannot be null");
}
mNativeBufferProvider = nativeBufferProvider;
int channelCount = (int)(sampleRate / MINIMUM_CHANNEL_BANDWIDTH);
//Ensure channel count is an even integer since we're using a 2x oversampling polyphase channelizer
if(channelCount % 2 != 0)
{
channelCount--;
}
mChannelCalculator = new ChannelCalculator(sampleRate, channelCount, frequency, CHANNEL_OVERSAMPLING);
mBufferDispatcher = new Dispatcher("sdrtrunk polyphase buffer processor", 10);
mBufferDispatcher.setListener(mNativeBufferReceiver);
}
/**
* Creates a polyphase channel manager for the tuner controller
*
* @param tunerController for a tuner that provides a baseband complex buffer stream.
*/
public PolyphaseChannelManager(TunerController tunerController)
{
this(tunerController, tunerController.getFrequency(), tunerController.getSampleRate());
}
/**
* Provides a description of the state of this manager.
*/
public String getStateDescription()
{
StringBuilder sb = new StringBuilder();
sb.append("Polyphase Channel Manager Providing [").append(mChannelSources.size()).append("] Channels");
sb.append("\n\t").append(mChannelCalculator);
for(PolyphaseChannelSource pcs: mChannelSources)
{
List<Integer> indexes = pcs.getOutputProcessorIndexes();
double sampleRate = pcs.getSampleRate();
long indexCenterFrequency = pcs.getIndexCenterFrequency();
long appliedFrequencyOffset = pcs.getFrequencyOffset();
long requestedCenterFrequency = pcs.getFrequency();
sb.append("\n\tPolyphase | Tuner SR:").append(FREQUENCY_FORMAT.format(pcs.getTunerSampleRate() / 1E6d));
sb.append(" CF:").append(FREQUENCY_FORMAT.format(pcs.getTunerCenterFrequency() / 1E6d));
sb.append(" BW: ").append(FREQUENCY_FORMAT.format(sampleRate / 1E6d));
sb.append(" | Channel CF: ").append(FREQUENCY_FORMAT.format(indexCenterFrequency / 1E6d));
sb.append(" REQUESTED CF: ").append(FREQUENCY_FORMAT.format(requestedCenterFrequency / 1E6d));
sb.append(" MIXER:").append(FREQUENCY_FORMAT.format(appliedFrequencyOffset / 1E6d));
sb.append(" | Polyphase Indices: ").append(indexes);
sb.append(" HASH:").append(Integer.toHexString(pcs.hashCode()).toUpperCase());
}
return sb.toString();
}
public void stopAllChannels()
{
mRunning = false;
List<TunerChannelSource> toStop = new ArrayList<>(mChannelSources);
for(TunerChannelSource tunerChannelSource: toStop)
{
MyEventBus.getGlobalEventBus().post(new ChannelStopProcessingRequest(tunerChannelSource));
}
}
/**
* Signals to all provisioned tuner channel sources that the source complex buffer provider has an error and can
* no longer provide channels, so that the tuner channel source can notify the consumer of the error state.
*/
public void setErrorMessage(String errorMessage)
{
for(TunerChannelSource tunerChannelSource: mChannelSources)
{
tunerChannelSource.setError(errorMessage);
}
}
/**
* Current channel bandwidth/spacing.
*/
public double getChannelBandwidth()
{
return mChannelCalculator.getChannelBandwidth();
}
/**
* Provides a Digital Drop Channel (DDC) for the specified tuner channel or returns null if the channel can't be
* sourced due to the current center frequency and/or sample rate.
* @param tunerChannel specifying center frequency and bandwidth.
* @return source or null.
*/
public TunerChannelSource getChannel(TunerChannel tunerChannel)
{
PolyphaseChannelSource channelSource = null;
if(mRunning)
{
try
{
channelSource = new PolyphaseChannelSource(tunerChannel, mChannelCalculator, mFilterManager,
mChannelSourceEventListener);
mChannelSources.add(channelSource);
}
catch(IllegalArgumentException iae)
{
LOGGING_SUPPRESSOR.error(iae.getMessage(), 3, "Couldn't allocate channel. " + iae.getMessage());
channelSource = null;
}
}
return channelSource;
}
/**
* Starts/adds the channel source to receive channelized sample buffers, registering with the tuner to receive
* sample buffers when this is the first channel.
*
* @param channelSource to start
*/
private void startChannelSource(PolyphaseChannelSource channelSource)
{
synchronized(mBufferDispatcher)
{
//Note: the polyphase channel source has already been added to the mChannelSources in getChannel() method
checkChannelizerConfiguration();
mPolyphaseChannelizer.addChannel(channelSource);
mSourceEventBroadcaster.broadcast(SourceEvent.channelCountChange(getTunerChannelCount()));
//If this is the first channel, register to start the sample buffers flowing
if(mPolyphaseChannelizer.getRegisteredChannelCount() == 1)
{
mNativeBufferProvider.addBufferListener(mBufferDispatcher);
mPolyphaseChannelizer.start();
mBufferDispatcher.start();
}
}
}
/**
* Stops/removes the channel source from receiving channelized sample buffers and deregisters from the tuner
* when this is the last channel being sourced.
*
* @param channelSource to stop
*/
private void stopChannelSource(PolyphaseChannelSource channelSource)
{
synchronized(mBufferDispatcher)
{
mChannelSources.remove(channelSource);
mPolyphaseChannelizer.removeChannel(channelSource);
mSourceEventBroadcaster.broadcast(SourceEvent.channelCountChange(getTunerChannelCount()));
//If this is the last/only channel, deregister to stop the sample buffers
if(mPolyphaseChannelizer.getRegisteredChannelCount() == 0)
{
mNativeBufferProvider.removeBufferListener(mBufferDispatcher);
mBufferDispatcher.stop();
mPolyphaseChannelizer.stop();
}
}
try
{
//Broadcast a stop sample stream notification in case this was a forced-stop so consumers are aware
channelSource.process(SourceEvent.stopSampleStreamNotification(channelSource));
}
catch(SourceException se)
{
//Do nothing
}
}
/**
* Process source events received from the source (ie tuner controller) for frequency and sample rate change
* notifications.
* @param sourceEvent to process
* @throws SourceException
*/
@Override
public void process(SourceEvent sourceEvent) throws SourceException
{
switch(sourceEvent.getEvent())
{
case NOTIFICATION_FREQUENCY_CHANGE:
mNativeBufferReceiver.receive(sourceEvent);
break;
case NOTIFICATION_SAMPLE_RATE_CHANGE:
//Update channel calculator immediately so that channels can be allocated
double sampleRate = sourceEvent.getValue().doubleValue();
int channelCount = ComplexPolyphaseChannelizerM2.getChannelCount(sampleRate);
mChannelCalculator.setRates(sampleRate, channelCount);
break;
case NOTIFICATION_FREQUENCY_AND_SAMPLE_RATE_LOCKED:
case NOTIFICATION_FREQUENCY_AND_SAMPLE_RATE_UNLOCKED:
case NOTIFICATION_FREQUENCY_CORRECTION_CHANGE:
case NOTIFICATION_RECORDING_FILE_LOADED:
//no-op
break;
default:
mLog.info("Unrecognized source event: " + sourceEvent);
break;
}
}
/**
* Creates or updates the channelizer to process the incoming sample rate and updates any channel processors.
*
* Note: this method should only be invoked on the mBufferProcessor thread or prior to starting the mBufferProcessor.
* Sample rate source events will normally arrive via the incoming complex buffer stream from the mBufferProcessor
* and will be handled as they arrive.
*/
private void checkChannelizerConfiguration()
{
//Channel calculator is always in sync with the tuner's current sample rate
double tunerSampleRate = mChannelCalculator.getSampleRate();
//If the channelizer is not setup, or setup to the wrong sample rate, recreate it
if(mPolyphaseChannelizer == null || FastMath.abs(mPolyphaseChannelizer.getSampleRate() - tunerSampleRate) > 0.5)
{
if(mPolyphaseChannelizer != null && mPolyphaseChannelizer.getRegisteredChannelCount() > 0)
{
throw new IllegalStateException("Polyphase Channelizer cannot be changed to a new sample rate while " +
"channels are currently sourced. Ensure you remove all tuner channels before changing tuner " +
"sample rate. Current channel count:" +
(mPolyphaseChannelizer != null ? mPolyphaseChannelizer.getRegisteredChannelCount() : "0"));
}
try
{
mPolyphaseChannelizer = new ComplexPolyphaseChannelizerM2(tunerSampleRate,
POLYPHASE_CHANNELIZER_TAPS_PER_CHANNEL);
}
catch(IllegalArgumentException iae)
{
mLog.error("Could not create polyphase channelizer for sample rate [" + tunerSampleRate + "]", iae);
}
catch(FilterDesignException fde)
{
mLog.error("Could not create filter for polyphase channelizer for sample rate [" + tunerSampleRate + "]", fde);
}
//Clear any previous channel synthesis filters so they can be recreated for the new channel sample rate
mOutputProcessorFilters.clear();
}
}
/**
* Updates each of the output processors for any changes in the tuner's center frequency or sample rate, which
* would cause the output processors to change the polyphase channelizer results channel(s) that the processor is
* consuming
*/
private void updateOutputProcessors()
{
for(PolyphaseChannelSource channelSource: mChannelSources)
{
try
{
channelSource.updateOutputProcessor(mChannelCalculator, mFilterManager);
}
catch(IllegalArgumentException iae)
{
mLog.error("Error updating polyphase channel source output processor following tuner frequency or " +
"sample rate change");
stopChannelSource(channelSource);
}
}
}
/**
* Sorted set of currently sourced tuner channels being provided by this channel manager. The set is ordered by
* frequency (low to high).
*/
public SortedSet<TunerChannel> getTunerChannels()
{
SortedSet<TunerChannel> tunerChannels = new TreeSet<>();
for(PolyphaseChannelSource channelSource: mChannelSources)
{
tunerChannels.add(channelSource.getTunerChannel());
}
return tunerChannels;
}
/**
* Count of currently sourced tuner channels
*/
public int getTunerChannelCount()
{
return mChannelSources.size();
}
/**
* Adds the listener to receive source events
*/
public void addSourceEventListener(Listener<SourceEvent> listener)
{
mSourceEventBroadcaster.addListener(listener);
}
/**
* Removes the listener from receiving source events
*/
public void removeSourceEventListener(Listener<SourceEvent> listener)
{
mSourceEventBroadcaster.removeListener(listener);
}
/**
* Internal class for handling requests for start/stop sample stream from polyphase channel sources
*/
private class ChannelSourceEventListener implements Listener<SourceEvent>
{
@Override
public void receive(SourceEvent sourceEvent)
{
switch(sourceEvent.getEvent())
{
case REQUEST_START_SAMPLE_STREAM:
if(sourceEvent.hasSource() && sourceEvent.getSource() instanceof PolyphaseChannelSource)
{
startChannelSource((PolyphaseChannelSource)sourceEvent.getSource());
}
else
{
mLog.error("Request to start sample stream for unrecognized source: " +
(sourceEvent.hasSource() ? sourceEvent.getSource().getClass() : "null source"));
}
break;
case REQUEST_STOP_SAMPLE_STREAM:
if(sourceEvent.hasSource() && sourceEvent.getSource() instanceof PolyphaseChannelSource channelSource)
{
stopChannelSource(channelSource);
channelSource.dispose();
}
else
{
mLog.error("Request to stop sample stream for unrecognized source: " +
(sourceEvent.hasSource() ? sourceEvent.getSource().getClass() : "null source"));
}
break;
case NOTIFICATION_MEASURED_FREQUENCY_ERROR_SYNC_LOCKED:
//Rebroadcast so that the tuner source can process this event
mSourceEventBroadcaster.broadcast(sourceEvent);
break;
default:
mLog.error("Received unrecognized source event from polyphase channel source [" +
sourceEvent.getEvent() + "]");
break;
}
}
}
/**
* Processes the incoming buffer stream from the provider and transfers the buffers to the polyphase channelizer.
*
* This monitor incorporates a source event handler that queues a center frequency update so that it can be
* handled on the buffer processing thread, avoiding having to lock on the output processor thread. Since we
* anticipate that these two threads will contend for access to this update required flag, we use an update lock
* to protect access to the flag.
*/
public class NativeBufferReceiver implements Listener<INativeBuffer>
{
private boolean mOutputProcessorUpdateRequired = false;
/**
* Processes tuner center frequency change source events to flag when output processors need updating.
* @param event that affects configuration of the channelizer (frequency or sample rate change events)
*/
public void receive(SourceEvent event)
{
long frequency = event.getValue().longValue();
if(mChannelCalculator.getCenterFrequency() != frequency)
{
//Update the channel calculator frequency so that it's ready when the output processor update occurs
mChannelCalculator.setCenterFrequency(frequency);
mOutputProcessorUpdateRequired = true;
}
}
/**
* Process native buffer streams and update polyphase output channels when the parent tuner center
* frequency changes.
* @param nativeBuffer of sample to process.
*/
@Override
public void receive(INativeBuffer nativeBuffer)
{
if(mOutputProcessorUpdateRequired)
{
try
{
updateOutputProcessors();
}
catch(Exception e)
{
mLog.error("Error updating polyphase channel output processors");
}
mOutputProcessorUpdateRequired = false;
}
if(mPolyphaseChannelizer != null)
{
Iterator<InterleavedComplexSamples> iterator = nativeBuffer.iteratorInterleaved();
while(iterator.hasNext())
{
try
{
mPolyphaseChannelizer.receive(iterator.next());
}
catch(Throwable throwable)
{
mLog.error("Error", throwable);
}
}
}
}
}
}