# Accuracy of Frequency Estimation
an experiment with Arduino Nano

## Import necessary modules

In [1]:
# handling exceptions
import sys
# communication via serial port
import serial
# interpretation & typecast of byte arrays
import struct
# timing
import time
# numpy for FFT
import numpy as np
# ipywidgets for interactive controls (buttons, etc)
import ipywidgets as widgets
# scipy for PDFs and interpolation
from scipy import interpolate, stats
# tool for peak detection
from detect_peaks import detect_peaks
# plotting library
from bokeh.plotting import figure, show
from bokeh.layouts import gridplot
from bokeh.models import ColumnDataSource, DataRange1d, Range1d, Plot,glyphs, HoverTool
from bokeh.io import output_notebook, push_notebook
from bokeh.palettes import Category20

In [2]:
# init bokeh plotting library for use in juypter
output_notebook()

In [3]:
source = ColumnDataSource(data=dict(x=[0], y=[0]))

## Define Class for Serial Communication with Arduino

In [4]:
class serial_reader:
    # ADC settings
    PRESCALER = 8
    V_REF = 5
    N_BIT = 10
    VAL_TO_VOLT = V_REF / 2**N_BIT
    # empirical frequency correction factor
    F_COR = 23.075
    # derived properties
    f_s = 16e6/PRESCALER/13/F_COR
    T_s = 1/f_s
    # settings
    samples_acq = 1024
    MAX_READS = 2*samples_acq
    
    def __init__(self):
        self.ser = serial.Serial(
                port='COM3',\
                #port='/dev/ttyUSB0',\
                baudrate=200000,\
                parity=serial.PARITY_NONE,\
                stopbits=serial.STOPBITS_ONE,\
                bytesize=serial.EIGHTBITS,\
                    timeout=0)
        
        
        self.ser.timeout = 0
        
        #data
        self.num_records = 0
        self.record = []
        
        print("connected to: " + self.ser.portstr)
        print("current timeout is {} [s]".format(self.ser.timeout))
    def flush_input_buffer(self):
        self.ser.reset_input_buffer()
    def take_record(self):
        #print('taking record') 
        samples_remaining = self.samples_acq
        # increase current record index
        self.idx_current_record = self.num_records
        # init variables to store data
        self.record.append({'values':[]})
        # try to read all required data
        try:
            # read number of requested samples
            for k in range(0,self.MAX_READS):
                byte = self.ser.read(1)
                #display('[pre-if] content: {}'.format(line)) 
                if byte == b'\xff':
                    # receive upper and lower bytes
                    byteHigh = self.ser.read(1)
                    byteLow = self.ser.read(1)
                    # unpack bytes 
                    data = struct.unpack('>H', byteHigh+byteLow)[0]
                    # compute voltage corresponding to ADC value
                    v_in = self.VAL_TO_VOLT * data
                    # append to record array
                    self.record[self.idx_current_record]['values'].append(v_in)
                    # decrease number of remaining samples
                    samples_remaining -=1
                    if samples_remaining == 0:
                        # stop record if enough samples have been collected
                        break
            # if the loop was finished on by samples_remaining == 0
            # append time to current record
            self.record[self.idx_current_record]['time'] = np.arange(0, len(self.record[self.idx_current_record]['values']))*self.T_s
            # increase number of records
            self.num_records +=1
            #print('done:')
        # if there is an exception during reception
        except:
            print('Receive exception', sys.exc_info()[0])
            # decrease current record index
            self.idx_current_record = self.num_records-1
            # remove record
            self.record.pop()
                
    def set_acq_samples(self, samples_acq):
        self.samples_acq = samples_acq
    def clear(self):
        self.num_records = 0
        self.record = []
    def close(self):
        self.ser.close()
        print('Closed connection')


In [5]:
sensor = serial_reader()

connected to: COM3
current timeout is 0 [s]


In [6]:
sensor.set_acq_samples(1024)
sensor.take_record()
sensor.num_records

1

## Create Record Control & Display GUI

create an Output ipywidget for displaying print or display from callback functions

In [7]:
outbox = widgets.Output()

Callback function to flush the serial interface input buffer

In [8]:
def on_click_flush_input_buffer(b):
    sensor.flush_input_buffer()

Callback function to take a single record

In [9]:
def on_click_record(b):
    sensor.take_record()
    label.value = 'Num records: {}'.format(sensor.num_records)
    update_plot(sensor.record[sensor.idx_current_record], sensor.idx_current_record)

Callback function to capture 128 records

In [10]:
@outbox.capture()
def on_click_record_128(b):
        for k in range(128):
            try:
                sensor.take_record()
                label.value = 'Num records: {}'.format(sensor.num_records)
                #time.sleep(0.1)
            except:
                    print('Exception')

Callback function to clear recorded data

In [11]:
def on_click_clear(b):
    sensor.clear()
    label.value = 'Num records: {}'.format(sensor.num_records)
    make_clear_plot()

Function to update plot

In [12]:
def update_plot(record, number):
        #source.stream(new_data)
        y_data = record['values']
        x_data = record['time']
        new_source = ColumnDataSource(dict(x=x_data, y=y_data))
        new_glyph = glyphs.Line(x='x', y='y', line_color=Category20[20][np.mod(number,20)])
        fig.add_glyph(new_source, new_glyph)
        new_glyph = glyphs.Circle(x='x', y='y', line_color=Category20[20][np.mod(number,20)])
        fig.add_glyph(new_source, new_glyph)
        push_notebook(handle=t)

Function to create a empty plot

In [13]:
def make_clear_plot():
    renderers = fig.select(dict(type=GlyphRenderer))
    for r in renderers:
        r.remove
        push_notebook(handle=t)

Create the GUI components using ipywidgets

In [14]:
# button for flushing input buffer
btn_flush = widgets.Button(
    description='Flush Input Buffer',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Click me',
    icon='check'
)
btn_flush.on_click(on_click_flush_input_buffer)            
            
# button for taking a single record
btn_record = widgets.Button(
    description='Start Record',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Click me',
    icon='check'
)
btn_record.on_click(on_click_record)

# button for taking 128 records
btn_record_128 = widgets.Button(
    description='Get 128 Records',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Click me',
    icon='check'
)
btn_record_128.on_click(on_click_record_128)

# button for clearing record buffer
btn_clear = widgets.Button(
    description='Clear',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Click me',
    icon='check'
)
btn_clear.on_click(on_click_clear)

# label for displaying number of available records
label = widgets.Label(
    value='Num records: {}'.format(sensor.num_records)
)

# layout of all widgets
hbox = widgets.HBox([btn_flush, btn_record, btn_record_128, btn_clear, label])
vbox = widgets.VBox([hbox, outbox])

Create a bokeh figure

In [15]:
fig = figure(y_range=[-0.5,5.5])
fig.width = 800
fig.height = 200
fig.xaxis.axis_label = 'time [s]'
fig.yaxis.axis_label = 'ADC voltage [V]'

## Show the GUI

In [19]:
outbox.clear_output()
display(vbox)
t = show(fig, notebook_handle=True)

VBox(children=(HBox(children=(Button(description='Flush Input Buffer', icon='check', style=ButtonStyle(), tool…

## Process Recordings

### Compute FFT & Detect Peaks

In [38]:
frequency_data = []
N_FFT = 8196

# window function used
window = np.hamming(sensor.samples_acq)
coherent_gain = np.sum(window)/sensor.samples_acq

# peak detection parameters
PD_MAX_V_UNCOR = 1 #[V]
PD_MIN_V_UNCOR = 0.05 #[V]


#compute FFT + peak detection for all recorded signals
for k in range(sensor.num_records):
    if len(sensor.record[k]['values']) == 0:
        continue
    data_windowed = window*np.array(sensor.record[k]['values'])
    current_fft = np.fft.fft(data_windowed, N_FFT) / sensor.samples_acq
    current_freq_vec = np.fft.fftfreq(N_FFT, sensor.T_s)
    peaks_idx = detect_peaks(np.abs(current_fft), mph=PD_MIN_V_UNCOR, mpd=PD_MAX_V_UNCOR, show=False)
    peaks_freq = current_freq_vec[peaks_idx]
    peaks_vals = current_fft[peaks_idx]
    frequency_data.append({'freqs':current_freq_vec,
                           'fft':current_fft,
                           'peaks_idx': peaks_idx,
                           'peaks_freq': peaks_freq,
                            'peaks_val' : peaks_vals})
    #display(peaks_freq)
    

### Show FFT

In [39]:
IDX_PLOT_FFT = 3

outbox2 = widgets.Output()

slider = widgets.IntSlider(
    value=0,
    min=0,
    max=len(frequency_data)-1,
    step=1,
    description='Dataset:',
    disabled=False,
    continuous_update=True,
    orientation='horizontal',
    readout=True,
    readout_format='d'
)

@outbox2.capture()
def slider_value_change(change):
    IDX_PLOT_FFT = change['new']
    new_source = dict(freq=np.fft.fftshift(frequency_data[IDX_PLOT_FFT]['freqs']),
                               val_V=np.fft.fftshift(np.abs(frequency_data[IDX_PLOT_FFT]['fft'])),
                               val_dBV = np.fft.fftshift(20*np.log10(np.abs(frequency_data[IDX_PLOT_FFT]['fft']))),
                               # voltage corrected by coherent gain
                               val_V_cor =np.fft.fftshift(np.abs(frequency_data[IDX_PLOT_FFT]['fft'])/coherent_gain))
    source.data = new_source
    push_notebook(handle=hnd_fft_plot)

slider.observe(slider_value_change, 'value')




fig_FFT = figure()
source = ColumnDataSource(dict(freq=np.fft.fftshift(frequency_data[IDX_PLOT_FFT]['freqs']),
                               val_V=np.fft.fftshift(np.abs(frequency_data[IDX_PLOT_FFT]['fft'])),
                               val_dBV = np.fft.fftshift(20*np.log10(np.abs(frequency_data[IDX_PLOT_FFT]['fft']))),
                               # voltage corrected by coherent gain
                               val_V_cor =np.fft.fftshift(np.abs(frequency_data[IDX_PLOT_FFT]['fft'])/coherent_gain)))

hover = HoverTool(tooltips=[
    ("index", "$index"),
    ("frequecy", "@freq [Hz]"),
    ("amplitude (uncorrected)", "@val_V [V]"),
    ("amplitude (corrected)", "@val_V_cor [V]"),
])


fig_FFT.line(x='freq',y='val_dBV', source=source)
fig_FFT.circle(x=frequency_data[IDX_PLOT_FFT]['peaks_freq'],
               y=20*np.log10(np.abs(frequency_data[IDX_PLOT_FFT]['peaks_val'])),
              color='red')

fig_FFT.xaxis.axis_label = 'Frequency [Hz]'
fig_FFT.yaxis.axis_label = 'Signal FFT [dBV]'
fig_FFT.y_range = Range1d(-100, 5)
fig_FFT.tools.append(hover)
fig_FFT.width = 1024
fig_FFT.height = 400

display(widgets.VBox([slider, outbox2]))
hnd_fft_plot = show(fig_FFT, notebook_handle=True)

VBox(children=(IntSlider(value=0, description='Dataset:', max=97), Output()))

## Compute Amplitude & Frequency of Detected Peaks
Using quadratic peak interpolation

In [30]:
EVAL_FREQ = 250 #[Hz]
A_TOL = 2 #[Hz]
NUM_POINTS_INTERPOLATION = 101

peak_data = []
for k in range(len(frequency_data)):
    peak_pos = frequency_data[k]['peaks_idx']
    peak_freq = frequency_data[k]['peaks_freq']
    peak_mask = np.isclose(peak_freq, EVAL_FREQ, atol=A_TOL)
    if  np.any(peak_mask):
        peak_idx = peak_pos[peak_mask]
        #display(peak_idx)
        # ensure that only one peak remains for analysis
        # if there is more than one peak
        if peak_idx.size > 1:
            # take the one which is closest to the analysis frequency
            peak_idx = np.min(np.abs(peak_freq[peak_idx]-EVAL_FREQ))
            
        # generate a set of neighbors
        neighbors_idx = peak_idx+np.array([-1,0,1])
        
        # interpolate the fft
        x = frequency_data[k]['freqs'][neighbors_idx]
        y = np.abs(frequency_data[k]['fft'][neighbors_idx])
        interpolation_fun = interpolate.interp1d(x,y,kind='quadratic')
        
        # get maximum from interpolation
        x = np.linspace( np.min(x),np.max(x), NUM_POINTS_INTERPOLATION)
        y = interpolation_fun(x)
        interpolated_peak_val = np.amax(y)
        interpolated_peak_freq = x[np.argmax(y)]
        
        # update frequency data dict
        # with the interpolation function
        peak_data.append(dict(peak_interpolation_fun = interpolation_fun,
                                     peak_interpolation_bins = neighbors_idx,
                                     interpolated_peak_freq = interpolated_peak_freq,
                                     interpolated_peak_val = interpolated_peak_val,
                                     idx_from_frequency_data = k))
        #display(neighbors_idx)

## Plot Interpolated Peaks

In [33]:
IDX_PLOT_PEAK = 1
NUM_POINTS = 101
idx_in_frequency_data = peak_data[IDX_PLOT_PEAK]['idx_from_frequency_data']

fig_interpolation = figure()

bin_start_interpolation = np.min(peak_data[IDX_PLOT_PEAK]['peak_interpolation_bins'])
bin_end_interpolation = np.max(peak_data[IDX_PLOT_PEAK]['peak_interpolation_bins'])

x = np.linspace( frequency_data[idx_in_frequency_data]['freqs'][bin_start_interpolation],
             frequency_data[idx_in_frequency_data]['freqs'][bin_end_interpolation],
             NUM_POINTS)
y = peak_data[IDX_PLOT_PEAK]['peak_interpolation_fun'](x)

fig_interpolation.line(x=x, y=y)

fig_interpolation.scatter(x=frequency_data[idx_in_frequency_data]['freqs'][peak_data[IDX_PLOT_PEAK]['peak_interpolation_bins']],
                          y=np.abs(frequency_data[idx_in_frequency_data]['fft'][peak_data[IDX_PLOT_PEAK]['peak_interpolation_bins']]))

fig_interpolation.scatter(x=peak_data[IDX_PLOT_PEAK]['interpolated_peak_freq'],
                         y=peak_data[IDX_PLOT_PEAK]['interpolated_peak_val'],
                         color='red')

hover_interpolation = HoverTool(tooltips=[
    ("index", "$index"),
    ("frequency", "$x [Hz]"),
    ("y", "$y"),
])

fig_interpolation.tools.append(hover_interpolation)
fig_interpolation.xaxis.axis_label = 'Frequency [Hz]'
fig_interpolation.yaxis.axis_label = 'FFT [V]'
fig_interpolation.width=1024
fig_interpolation.height = 400

show(fig_interpolation)

IndexError: list index out of range

## Statistical Analysis of all Peaks

Collect all peaks into lists

In [None]:
peak_freqs = np.array([peak_data[k]['interpolated_peak_freq'] for k in range(len(peak_data))])
peak_vals = np.array([peak_data[k]['interpolated_peak_val'] for k in range(len(peak_data))])

Estimate mean $\mu$ and variance $\sigma$ for detected frequencies

In [None]:
mean_freq = np.mean(peak_freqs)
var_freq = np.var(peak_freqs)

print('Frequency estimation: mean = {}, variance = {}'.format(mean_freq, var_freq))

Generate normal distribution for estimated parameters

In [None]:
pdf_freq_est = stats.norm(loc=mean_freq, scale=var_freq)

Estimate mean $\mu$ and variance $\sigma$ for detected amplitudes

In [None]:
mean_ampl = np.mean(peak_vals)
var_ampl = np.var(peak_vals)
print('Amplitude estimation: mean = {}, variance = {}'.format(mean_ampl, var_ampl))

Generate Rayleigh distribution for estimated parameters

In [None]:
pdf_amplitude_est = stats.rayleigh(scale=mean_ampl/np.sqrt(np.pi/2))

Compute Histograms

In [None]:
hist_freq, edges_freq = np.histogram(peak_freqs, 20, density=True)
hist_vals, edges_vals = np.histogram(peak_vals, 20, density=True)

### Plot Histogram and PDFs

In [None]:
FIG_HEIGHT = 300
FIG_WIDTH = 500


fig_hist_freqs = figure( title='Frequency Estimation')
fig_hist_freqs.quad(top=hist_freq, bottom=0, left=edges_freq[:-1], right=edges_freq[1:],
        fill_color="#036564", line_color="#033649")

x_pdf_freqs = np.linspace(np.min(edges_freq),np.max(edges_freq),100)
y_pdf_freqs = pdf_freq_est.pdf(x=x_pdf_freqs)
fig_hist_freqs.line(x=x_pdf_freqs, y=y_pdf_freqs)


fig_hist_vals = figure(title='Amplitude Estimation')
fig_hist_vals.quad(top=hist_vals, bottom=0, left=edges_vals[:-1], right=edges_vals[1:],
        fill_color="#036564", line_color="#033649")

x_pdf_ampl = np.linspace(np.min(edges_vals),np.max(edges_vals),100)
y_pdf_ampl = pdf_amplitude_est.pdf(x=x_pdf_ampl)
fig_hist_vals.line(x=x_pdf_ampl, y=y_pdf_ampl)

pdf_amplitude_est
# figure sizes
fig_hist_freqs.height = FIG_HEIGHT
fig_hist_vals.height = FIG_HEIGHT
fig_hist_freqs.width = FIG_WIDTH
fig_hist_vals.width = FIG_WIDTH

# axes labels
fig_hist_freqs.xaxis.axis_label = 'Fequency [Hz]'
fig_hist_freqs.yaxis.axis_label = 'Density Histogramm'

fig_hist_vals.xaxis.axis_label = 'Amplitude [V]'
fig_hist_vals.yaxis.axis_label = 'Density Histogramm'

grid_plot = gridplot([fig_hist_freqs, fig_hist_vals], ncols=2, nrows=1)

show(grid_plot)

# Utils

In [None]:
sensor.close()

In [None]:
2**8

In [None]:
2**16

In [None]:
struct.pack('<H',256)

In [None]:
255*255