Skip to content

Commit

Permalink
make test_Decimator_to_SSS_detector work with recorded test data
Browse files Browse the repository at this point in the history
  • Loading branch information
catkira committed Apr 28, 2023
1 parent 6222105 commit d4156ba
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 17 deletions.
4 changes: 3 additions & 1 deletion hdl/Decimator_to_SSS_detector.sv
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ module Decimator_to_SSS_detector
parameter TAP_FILE_2 = "",
parameter NFFT = 8,
parameter MULT_REUSE = 0,
parameter INITIAL_DETECTION_SHIFT = 4,

localparam FFT_LEN = 2 ** NFFT,
localparam CIC_RATE = FFT_LEN / 128,
Expand Down Expand Up @@ -101,7 +102,8 @@ PSS_detector #(
.TAP_FILE_1(TAP_FILE_1),
.TAP_FILE_2(TAP_FILE_2),
.ALGO(ALGO),
.MULT_REUSE(MULT_REUSE)
.MULT_REUSE(MULT_REUSE),
.INITIAL_DETECTION_SHIFT(INITIAL_DETECTION_SHIFT)
)
PSS_detector_i(
.clk_i(clk_i),
Expand Down
2 changes: 1 addition & 1 deletion tests/test_Decimator_Correlator_PeakDetector_FFT.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def test(IN_DW, OUT_DW, TAP_DW, ALGO, WINDOW_LEN, HALF_CP_ADVANCE, NFFT, USE_TAP
parameters['MULT_REUSE'] = MULT_REUSE
parameters['INITIAL_DETECTION_SHIFT'] = INITIAL_DETECTION_SHIFT
parameters_no_taps = parameters.copy()
folder = 'Decimator_to_FFT_' + '_'.join(('{}={}'.format(*i) for i in parameters_no_taps.items()))
folder = 'Decimator_to_FFT_' + '_'.join(('{}={}'.format(*i) for i in parameters_no_taps.items())) + '_' + FILE
sim_build= os.path.join('sim_build', folder)
os.environ['TEST_FILE'] = FILE

Expand Down
62 changes: 47 additions & 15 deletions tests/test_Decimator_to_SSS_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,38 @@ def fft_dbs(self, fft_signal):
@cocotb.test()
async def simple_test(dut):
tb = TB(dut)
FILE = '../../tests/' + os.environ['TEST_FILE'] + '.sigmf-data'
CFO = int(os.getenv('CFO'))
handle = sigmf.sigmffile.fromfile('../../tests/30720KSPS_dl_signal.sigmf-data')
handle = sigmf.sigmffile.fromfile(FILE)
waveform = handle.read_samples()
fs = 30720000
fs = handle.get_global_field(sigmf.SigMFFile.SAMPLE_RATE_KEY)
NFFT = tb.NFFT
FFT_LEN = 2 ** NFFT

if os.environ['TEST_FILE'] == '30720KSPS_dl_signal':
expected_N_id_1 = 69
expected_N_id_2 = 2
MAX_TX = 2000 * FFT_LEN // 256
elif os.environ['TEST_FILE'] == '772850KHz_3840KSPS_low_gain':
expected_N_id_1 = 291
expected_N_id_2 = 0
MAX_TX = 5000 * FFT_LEN // 256
delta_f = -4e3
waveform = waveform * np.exp(-1j*(2*np.pi*delta_f/fs*np.arange(waveform.shape[0])))
else:
file_string = os.environ['TEST_FILE']
assert False, f'test file {file_string} is not supported'

print(f'CFO = {CFO} Hz')
waveform *= np.exp(np.arange(len(waveform)) * 1j * 2 * np.pi * CFO / fs)
waveform /= max(waveform.real.max(), waveform.imag.max())
dec_factor = 2048 // (2 ** tb.NFFT)
waveform = scipy.signal.decimate(waveform, dec_factor, ftype='fir') # decimate to 3.840 MSPS
waveform /= max(waveform.real.max(), waveform.imag.max())
dec_factor = int((2048 * fs // 30720000) // (2 ** tb.NFFT))
assert dec_factor != 0, f'NFFT = {tb.NFFT} and fs = {fs} is not possible!'
print(f'test_file = {FILE} with {len(waveform)} samples')
print(f'sample_rate = {fs}, decimation_factor = {dec_factor}')
if dec_factor > 1:
fs = fs // dec_factor
waveform = scipy.signal.decimate(waveform, dec_factor, ftype='fir') # decimate to 3.840 MSPS (if NFFT = 8)
waveform *= (2 ** (tb.IN_DW // 2 - 1) - 1)
waveform = waveform.real.astype(int) + 1j * waveform.imag.astype(int)

Expand All @@ -94,10 +116,7 @@ async def simple_test(dut):
received_PBCH = []
received_SSS = []

NFFT = tb.NFFT
FFT_LEN = 2 ** NFFT
MAX_CLK_CNT = 100000 * FFT_LEN // 256
MAX_TX = 2000 * FFT_LEN // 256
if NFFT == 8:
DETECTOR_LATENCY = 18
elif NFFT == 9:
Expand Down Expand Up @@ -150,19 +169,21 @@ async def simple_test(dut):
received_SSS.append(_twos_comp(dut.m_axis_out_tdata.value.integer & (2**(FFT_OUT_DW//2) - 1), FFT_OUT_DW//2)
+ 1j * _twos_comp((dut.m_axis_out_tdata.value.integer>>(FFT_OUT_DW//2)) & (2**(FFT_OUT_DW//2) - 1), FFT_OUT_DW//2))

assert clk_cnt < MAX_CLK_CNT, "timeout, did not receive enough data"
assert len(received_SSS) == SSS_LEN

print(f'detected N_id_1 = {detected_N_id_1}')
print(f'detected N_id = {detected_N_id}')
assert detected_N_id == 209
assert detected_N_id_1 == 69
expected_N_id = expected_N_id_1 * 3 + expected_N_id_2
assert detected_N_id == expected_N_id
assert detected_N_id_1 == expected_N_id_1

corr = np.zeros(335)
for i in range(335):
sss = py3gpp.nrSSS(i)
sss = py3gpp.nrSSS(i * 3 + expected_N_id_2)
corr[i] = np.abs(np.vdot(sss, received_SSS))
detected_NID1 = np.argmax(corr)
assert detected_NID1 == 209
assert detected_NID1 == expected_N_id_1


# bit growth inside PSS_correlator is a lot, be careful to not make OUT_DW too small !
Expand All @@ -176,7 +197,8 @@ async def simple_test(dut):
@pytest.mark.parametrize("USE_TAP_FILE", [1])
@pytest.mark.parametrize("NFFT", [8])
@pytest.mark.parametrize("MULT_REUSE", [0])
def test(IN_DW, OUT_DW, TAP_DW, ALGO, WINDOW_LEN, CFO, HALF_CP_ADVANCE, USE_TAP_FILE, NFFT, MULT_REUSE):
@pytest.mark.parametrize("INITIAL_DETECTION_SHIFT", [4])
def test(IN_DW, OUT_DW, TAP_DW, ALGO, WINDOW_LEN, CFO, HALF_CP_ADVANCE, USE_TAP_FILE, NFFT, MULT_REUSE, INITIAL_DETECTION_SHIFT, FILE = '30720KSPS_dl_signal'):
dut = 'Decimator_to_SSS_detector'
module = os.path.splitext(os.path.basename(__file__))[0]
toplevel = dut
Expand Down Expand Up @@ -237,11 +259,13 @@ def test(IN_DW, OUT_DW, TAP_DW, ALGO, WINDOW_LEN, CFO, HALF_CP_ADVANCE, USE_TAP_
parameters['USE_TAP_FILE'] = USE_TAP_FILE
parameters['NFFT'] = NFFT
parameters['MULT_REUSE'] = MULT_REUSE
parameters['INITIAL_DETECTION_SHIFT'] = INITIAL_DETECTION_SHIFT
os.environ['CFO'] = str(CFO)
parameters_dirname = parameters.copy()
parameters_dirname['CFO'] = CFO
folder = '_'.join(('{}={}'.format(*i) for i in parameters_dirname.items()))
folder = '_'.join(('{}={}'.format(*i) for i in parameters_dirname.items())) + '_' + FILE
sim_build='sim_build/' + folder
os.environ['TEST_FILE'] = FILE

FFT_LEN = 2 ** NFFT
CP_LEN = 18 * FFT_LEN // 256
Expand Down Expand Up @@ -283,7 +307,15 @@ def test(IN_DW, OUT_DW, TAP_DW, ALGO, WINDOW_LEN, CFO, HALF_CP_ADVANCE, USE_TAP_
waves=True
)

@pytest.mark.parametrize("FILE", ["772850KHz_3840KSPS_low_gain"])
def test_recording(FILE):
test(IN_DW = 32, OUT_DW = 32, TAP_DW = 32, ALGO = 0, WINDOW_LEN = 8, CFO=0, HALF_CP_ADVANCE = 1, NFFT = 8, USE_TAP_FILE = 1,
MULT_REUSE = 0, INITIAL_DETECTION_SHIFT = 3, FILE = FILE)

if __name__ == '__main__':
os.environ['PLOTS'] = '1'
os.environ['SIM'] = 'verilator'
test(IN_DW = 32, OUT_DW = 32, TAP_DW = 32, ALGO = 0, WINDOW_LEN = 8, CFO=0, HALF_CP_ADVANCE = 1, USE_TAP_FILE = 1, NFFT = 9, MULT_REUSE = 0)
test(IN_DW = 32, OUT_DW = 32, TAP_DW = 32, ALGO = 0, WINDOW_LEN = 8, CFO=0, HALF_CP_ADVANCE = 1, USE_TAP_FILE = 1, NFFT = 8,
MULT_REUSE = 0, INITIAL_DETECTION_SHIFT = 3, FILE = '772850KHz_3840KSPS_low_gain')
# test(IN_DW = 32, OUT_DW = 32, TAP_DW = 32, ALGO = 0, WINDOW_LEN = 8, CFO=0, HALF_CP_ADVANCE = 1, USE_TAP_FILE = 1, NFFT = 9,
# MULT_REUSE = 0, INITIAL_DETECTION_SHIFT = 4)

0 comments on commit d4156ba

Please sign in to comment.