# TemplateMaker003

This is a simple spectral masking (cross-correlation) based approach. We extract and generate a template, and then cross-correlate with peak detection. We export the template and the parameters used to a .h (header file) for export on the CARACAL ARM M4F board.

We use this to generate and test out some new templates that will work better on the nepal data (hopefully)


In [1]:
ML_SR = 8000 # Target sampling rate
SPECD_FFT_LEN =  512 # Real FFT length (in the M4F - we use double of this on the PC as we don't do single-sided)
ML_BIN_AGG = 8 # Number of bins
ML_FLO = 800 # Low freq
ML_FHI = 950 # High freq
ML_FFT_STRIDE = 256 # Stride
FILEPREFIX = "templateMaker003_001" # what to save the output files as
THRESHOLDED = False # Threshold the template or not

In [2]:
import librosa
import pylab
import numpy as np

In [3]:
filename = "C:\\CloudData\\2024\\Nepal\\ML001\\20h00.wav"

startT = 493 # Time in seconds to extract a useful clip from
endT = 539
gtT = [1.0,12.4,23.0,28.7,34.6,40.4,44.2] # Ground Truth call times

aud,sr = librosa.load(filename,sr=ML_SR)
print(f"File Samples:{np.shape(aud)}, Rate:{sr}")
aud = aud[startT*sr:endT*sr]
print(f"Clipped Samples:{np.shape(aud)}, Rate:{sr}")


  aud,sr = librosa.load(filename,sr=ML_SR)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)


FileNotFoundError: [Errno 2] No such file or directory: 'C:\\CloudData\\2024\\Nepal\\ML001\\20h00.wav'

# Step 1:

Convert the wav file to FFT. We then extract out our "feature map" which is just the spectral magnitude bins. We do a simple boxcar aggregate, but we could use a triangular weighting quite easily as well.

In [None]:
def chunkToBins(chunk,fLo,fHi,numbins,sr):
    """convert a chunk (window) to slope.
    Provide the low and high frequencies in Hz for a spectral windowing
    numbins are the number of output bins between flo and fhi
    Provide the sample rate in Hz"""
    CMPLX_FFT_LEN = len(chunk)*2
    
    fS = np.fft.fft(chunk,n=CMPLX_FFT_LEN) # fft - note we double it for the cmplx fft
    fRes = sr/(CMPLX_FFT_LEN)   # frequency per cmplx bin
    #print(fRes)
    binLo = int(fLo/sr*CMPLX_FFT_LEN)
    binHi = int(fHi/sr*CMPLX_FFT_LEN)
    specSize = int((binHi-binLo)/numbins)
    binTotals = np.zeros(numbins)
    for k in range(numbins):
        dbSum = 0
        for j in range(specSize):
            idx = binLo + (k * numbins) + j
            dbVal = np.log10(np.abs(fS[idx]))
            dbSum += dbVal
        binTotals[k] = dbSum
    return binTotals
q = chunkToBins(aud[:SPECD_FFT_LEN],ML_FLO,ML_FHI,ML_BIN_AGG,ML_SR)
print(q)




# Step 2: Make a template

Here we just use an arbitrary call to build a template. We could do much better with average calls etc. We scale the template so it is zero-mean.

In [None]:
tList = []
idxStart = 718
idxEnd = 730
for idx in range(int(idxStart*ML_FFT_STRIDE),int(idxEnd*ML_FFT_STRIDE),int(ML_FFT_STRIDE)):
    clip = aud[idx:idx+SPECD_FFT_LEN]
    q = chunkToBins(clip,ML_FLO,ML_FHI,ML_BIN_AGG,ML_SR)
    tList.append(q)
print(len(tList))
tList = np.array(tList)

print(np.min(tList),np.max(tList))
# Thresholding
if THRESHOLDED:
    tList = (tList >0)*tList
# Scale the dB mag spec to +1/-1
tList = (tList-np.min(tList))
tList = (tList-np.max(tList)/2)
#tList = tList/np.max(tList)
#tList = tList *2.0 -1.0
print(np.min(tList),np.max(tList))

pylab.imshow(tList.T,aspect=10)
pylab.show()

# Step 3: Slide the window and correlate

We manually compute the correlation, so it is directly the same as our high-tech C code.

In [None]:
import scipy
qList = []
for idx in range(0,len(aud),int(ML_FFT_STRIDE)):
    clip = aud[idx:idx+SPECD_FFT_LEN]
    q = chunkToBins(clip,ML_FLO,ML_FHI,ML_BIN_AGG,ML_SR)
    
    qList.append(q)
qList = np.array(qList)
# Now we cross correlate
print(np.shape(qList),np.shape(tList))
xcorr = []
for offset in range(len(qList)-np.shape(tList)[0]):
    xcTotal = 0
    for tIdx in range(np.shape(tList)[0]):
        for bIdx in range(ML_BIN_AGG):
            xcTotal += qList[offset+tIdx][bIdx]*tList[tIdx][bIdx]
    xcorr.append(xcTotal)
xcorr = np.array(xcorr)


print(np.shape(xcorr))


pylab.subplot(311)
pylab.imshow(qList.T,aspect=50)
pylab.xlim(0,len(qList))
pylab.subplot(312)
pylab.specgram(aud,Fs=8000,NFFT=1024,noverlap=800)
pylab.ylim(800,1400)
pylab.scatter(gtT,np.ones(len(gtT))*1000,c='red',marker='*',s=80)
pylab.subplot(313)
pylab.plot((xcorr))
pylab.xlim(0,len(xcorr))
pylab.show()

# Step 4: Peak Detection

This is loosely based on https://stackoverflow.com/questions/22583391/peak-signal-detection-in-realtime-timeseries-data/54507140#54507140

But made simpler to work neatly on the MCU.

In [None]:
WIN_LENGTH = 15
WIN_HOP = 1
WIN_THRESHOLD = 8.0
WIN_ALPHA = 0.00025
INIT_WINDEV = 10.0
WIN_ALPHA_MEAN = 0.1

dets = []
scaled_vals = []
for k in range(WIN_LENGTH):
        dets.append(0)
        scaled_vals.append(0)

winDev=INIT_WINDEV
alpha = WIN_ALPHA
alpha_mean = WIN_ALPHA_MEAN
winMean = np.mean(xcorr[:WIN_LENGTH]) # initialize
for idx in range(0,len(xcorr)-WIN_LENGTH,WIN_HOP):
    # This is our circular window
    extract = np.array(xcorr[idx:idx+WIN_LENGTH])
    winDev = (alpha*np.std(extract)) + (1-alpha)*winDev
    winMean = (alpha_mean*np.mean(extract))+(1-alpha_mean)*winMean
    det = 0
    if (extract[-1] -winMean)/winDev > WIN_THRESHOLD:
        det = 1
    scaled_vals.append((extract[-1] -winMean)/winDev)
    dets.append(det)
pylab.subplot(311)
pylab.plot(xcorr)
pylab.subplot(312)
pylab.plot(scaled_vals)
pylab.subplot(313)
pylab.plot(dets)

pylab.show()

# Step 5: Export Template to C

And now we can dump all these parameters into a header file so that we can easily adjust things on the PC, and they will automagically update the caracal!

In [None]:
import datetime

CMPLX_FFT_LEN = SPECD_FFT_LEN*2
binLo = int(ML_FLO/ML_SR*CMPLX_FFT_LEN)
binHi = int(ML_FHI/ML_SR*CMPLX_FFT_LEN)
specSize = int((binHi-binLo)/ML_BIN_AGG)
DATECODE = datetime.datetime.now()

print("#ifndef SPEC_DETECT_TEMPLATE_H")
print("#define SPEC_DETECT_TEMPLATE_H")
print("// AUTOGENERATED PARAMETERS FROM CARACALRT>EvoRT_004>TEMPLATEMAKER003.IPYNB")
print(f"// TIMESTAMP:{DATECODE}")
print("")
print("// Sampling Rate:")
print(f"const uint32_t SPECD_SR = {ML_SR};")
print("// FFT Size (REAL), single sided - equivalent to 1/2 length of a complex FFT:")
print(f"const uint32_t SPECD_FFT_LEN = {SPECD_FFT_LEN};")
print("// FFT Stride:")
print(f"const uint32_t SPECD_FFT_STRIDE = {ML_FFT_STRIDE};")
print("// Number of bins in template/cross-correlation:")
print(f"const uint32_t SPECD_XC_NUM_BINS = {ML_BIN_AGG};")
print("// Bin index for low frequency (NB: Double this to get the unpacked index into the FFT array)")
print(f"const uint32_t SPECD_FFT_BIN_LO = {binLo};")
print("// Bin index for high frequency (NB: Double this to get the unpacked index into the FFT array)")
print(f"const uint32_t SPECD_FFT_BIN_HI = {binHi};")
print("// How many FFT bins to aggregate into one XC bin")
print(f"const uint32_t SPECD_SPEC_SIZE = {specSize};")
print("// How many FFTs to stack for cross correlation")
print(f"const uint32_t SPECD_NUM_FFT_STRIDES = {np.shape(tList)[0]};")
print("// Threshold for peak detection (number of standard deviations)")
print(f"const float32_t SPECD_XC_THRESHOLD = {WIN_THRESHOLD};")
print("// Number of samples in sliding window for peak detection")
print(f"const uint32_t SPECD_XC_BUF_LEN = {WIN_LENGTH};")
print("// Smoothing rate in the window")
print(f"const float32_t SPECD_XC_WIN_ALPHA = {WIN_ALPHA};")
print("// Initial Window SD value")
print(f"const float32_t SPECD_XC_WIN_SDINIT = {INIT_WINDEV};")
print("// Smoothing factor for mean")
print(f"const float32_t SPECD_XC_WIN_ALPHA_MEAN = {WIN_ALPHA_MEAN};")

print("")
print("// XC template")
print(f"const float32_t SPECD_template [{np.shape(tList)[0]}] [{np.shape(tList)[1]}] = {{")
for k in range(np.shape(tList)[0]):
    print("\t{")
    for j in range(np.shape(tList)[1]):
        if (j<np.shape(tList)[1]-1):
            print("\t",tList[k][j],",")
        else:
            print("\t",tList[k][j])
    if (k<np.shape(tList)[0]-1):
        print("\t},")
    else:
        print("\t}")
print("};")
print("#endif")




# Step 6: Export template to pickle

In [None]:

import pickle
templateObj = {}
templateObj["DateCode"]=DATECODE
templateObj['tList']=tList
templateObj['ML_SR']=ML_SR
templateObj['SPECD_FFT_LEN']=SPECD_FFT_LEN
templateObj['ML_BIN_AGG']=ML_BIN_AGG
templateObj['ML_FLO']=ML_FLO
templateObj['ML_FHI']=ML_FHI
templateObj['ML_FFT_STRIDE']=ML_FFT_STRIDE
templateObj['WIN_LENGTH']=WIN_LENGTH
templateObj["WIN_ALPHA"]=WIN_ALPHA
templateObj["INIT_WINDEV"]=INIT_WINDEV
templateObj["WIN_ALPHA_MEAN"]=WIN_ALPHA_MEAN
with open(FILEPREFIX+".pkl",'wb') as f:
    pickle.dump(templateObj,f)