In [1]:
import bokeh.plotting as bk
from bokeh.layouts import Row, WidgetBox, Column, gridplot
from bokeh.models import ColumnDataSource
from bokeh.models.widgets import Paragraph, TextInput, Select, RadioGroup, Panel, Tabs, DataTable, TableColumn, Button
from bokeh.application.handlers import FunctionHandler
from bokeh.application import Application
from bokeh.palettes import Category20

import numpy as np
import scipy.signal as signal

from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

bk.output_notebook()

In [33]:
class FilterDesignTool():
    
    iir_filters = {
        'Butterworth': 'butter',
        'Chebyshev I': 'cheby1',
        'Chebyshev II': 'cheby2',
        'Cauer/elliptic': 'ellip'
    }
    fir_windows = {
        'Boxcar': 'boxcar',
        'Triangle': 'triang',
        'Blackman': 'blackman',
        'Hamming': 'hamming',
        'Hanning': 'hann',
        'Bartlett': 'bartlett',
        'Flattop': 'flattop',
        'Parzen': 'parzen',
        'Bohman': 'bohman',
        'Blackman/harris': 'blackmanharris',
        'Nuttal': 'nuttall',
        'Bart/Hann': 'barthann',
        'Kaiser': 'kaiser'
    }
    
    source = ColumnDataSource({
        'ftype': [],
        'subtype': [],
        'fs': [],
        'nyquist': [],
        'wp': [],
        'ws': [],
        'gpass': [],
        'gstop': [],
        'b': [],
        'a': [],
        'order': [],
        'w': [],
        'h_freq': [],
        'h_phase': [],
        'x': [],
        'impulse': [],
        'step': []
    })
    
    plot_source = ColumnDataSource({
        'w': [],
        'h_freq': [],
        'h_phase': []
    })

    def __init__(self,
                 width=800,
                 height=600):
        
        bk.curdoc().clear()
                
        self.amp_response_fig = bk.figure(width=width, height=int(height/4))
        self.amp_response_fig.min_border_top = 0
        self.amp_response_fig.min_border_bottom = 0
        self.amp_response_fig.xaxis.visible = False
        self.amp_response_fig_lines = []
        self.phase_response_fig = bk.figure(width=width, height=int(height/4), x_range=self.amp_response_fig.x_range)
        self.phase_response_fig.min_border_top = 0
        self.phase_response_fig_lines = []
        
        self.impulse_response_fig = bk.figure(width=width, height=int(height/4))
        self.impulse_response_fig.min_border_top = 0
        self.impulse_response_fig.min_border_bottom = 0
        self.impulse_response_fig.xaxis.visible = False
        self.impulse_response_fig_lines = []
        self.step_response_fig = bk.figure(width=width, height=int(height/4), x_range=self.impulse_response_fig.x_range)
        self.step_response_fig.min_border_top = 0
        self.step_response_fig_lines = []
        
        self._init_controls()
               
        columns = [
            TableColumn(field=key, title=key) for key in self.source.data.keys() if key not in ['w',
                                                                                                'h_freq',
                                                                                                'h_phase',
                                                                                                'x',
                                                                                                'impulse',
                                                                                                'step']
        ]
        
        self.filter_table = DataTable(source=self.source, columns=columns, width=width)
        self.source.selected.on_change('indices', self._set_visible)
        
        filter_information_widgets = WidgetBox(Paragraph(text="Filter Information"))
        response_type_widgets = WidgetBox(self.compute_filter)
        design_method_widgets = WidgetBox(Paragraph(text="Design Method"),
                                          self.design_method, self.design_method_select)
        filter_order_widgets = WidgetBox(Paragraph(text="Filter Order"),
                                         self.specify_order,
                                         self.specify_order_input)
        options_widgets = WidgetBox(Paragraph(text="Options"),
                                    self.density_factor)
        frequency_specifications_widgets = WidgetBox(Paragraph(text="Frequency Specifications"),
                                                     self.freq_units, self.fs, self.fpass, self.fstop)
        magnitude_specifications_widgets = WidgetBox(Paragraph(text="Magnitude Specifications"),
                                                     self.mag_units, self.amp_pass, self.amp_stop)
        
        self.layout = Row(
            Column(gridplot([self.amp_response_fig],
                            [self.phase_response_fig],
                            [self.impulse_response_fig],
                            [self.step_response_fig]),
                   WidgetBox(Paragraph(text="Filter Information")),
                   WidgetBox(self.filter_table)),
            Column(filter_information_widgets,
                   response_type_widgets,
                   design_method_widgets,
                   filter_order_widgets),
            Column(options_widgets,
                   frequency_specifications_widgets,
                   magnitude_specifications_widgets)
        )
        
        
    def _init_controls(self):
        
        self.compute_filter = Button(label='COMPUTE FILTER')
        self.compute_filter.on_click(self._update_filter)
        
        self.design_method = RadioGroup(labels=["IIR", "FIR"], active=0)
        self.design_method_select = Select(options=["Butterworth"],
                                           value="Butterworth",
                                           title="Method:")
        
        self.specify_order = RadioGroup(labels=["Min Order", "Specify Order"], active=0)
        self.specify_order_input = TextInput(value='10')
        
        self.density_factor = TextInput(value="20", title="Density Factor")
        
        self.freq_units = Select(value='Normalised (0-1)', title='Units:', options=['Hz', 'Normalised (0-1)'])
        self.fs = TextInput(value='1.0', title='Fs:')
        self.fs.on_change('value', self.update)
        self.fpass = TextInput(value='0.1', title='Fpass:')
        self.fstop = TextInput(value='0.25', title='Fstop:')
        
        self.mag_units = Select(value='dB', title='Magnitude Units:', options=['dB'])
        self.amp_pass = TextInput(value='1.0', title='Apass:')
        self.amp_stop = TextInput(value='80.0', title='Astop:')
        
        controls = [self.design_method_select,
                    self.specify_order_input,
                    self.density_factor,
                    self.freq_units,
                    self.fs,
                    self.fpass,
                    self.fstop,
                    self.mag_units,
                    self.amp_pass,
                    self.amp_stop
                    ]
        radio_controls = [self.design_method,
                          self.specify_order]
        
        [n.on_change('value', self.update) for n in controls]
        [n.on_change('active', self.update) for n in radio_controls]
        self._update_parameters()
        
    def _update_parameters(self):
        active = self.design_method.active
        if self.design_method.labels[active] == 'IIR':
            self.design_method_select.options = [key for key in self.iir_filters.keys()]
            if self.design_method_select.value not in self.iir_filters.keys():
                self.design_method_select.value = 'Butterworth'
            self.specify_order_input.value = '10'
        elif self.design_method.labels[active] == 'FIR':
            self.design_method_select.options = [key for key in self.fir_windows.keys()]
            if self.design_method_select.value not in self.fir_windows.keys():
                self.design_method_select.value = 'Hanning'
            self.specify_order_input.value = '101'

    
    def _update_filter(self):
        
        update = {
            'ftype': self.design_method.labels[self.design_method.active],
            'subtype': self.design_method_select.value,
            'fs': float(self.fs.value),
            'nyquist': float(self.fs.value)/2,
            'wp': [float(value) for value in self.fpass.value.split(',')],
            'ws': [float(value) for value in self.fstop.value.split(',')],
            'gpass': float(self.amp_pass.value),
            'gstop': float(self.amp_stop.value),
            'order': int(self.specify_order_input.value) if self.specify_order.active == 1 else 'auto'
            }
        
        if len(update['wp']) == 1 and len(update['ws']) == 1:
            if update['wp'] > update['ws']:
                btype = 'highpass'
            elif update['wp'] < update['ws']:
                btype = 'lowpass'
            else:
                raise ValueError("Stop and Pass frequencies cannot be equal.")
        else:
            if update['wp'][0] > update['ws'][0] and update['wp'][1] < update['ws']:
                btype = 'bandpass'
            elif update['wp'][0] < update['ws'][0] and update['wp'][1] > update['ws']:
                btype = 'bandstop'
            else:
                raise ValueError("Stop frequencies should be inside or outside pass frequencies")
        
        if update['ftype'] == 'IIR':
            if self.specify_order.active == 0 or self.design_method_select.value == 'Bessel/Thomson':
                b, a = signal.iirdesign(wp=update['wp'],
                                        ws=update['ws'],
                                        gpass=update['gpass'],
                                        gstop=update['gstop'],
                                        ftype=self.iir_filters[update['subtype']])
                
            else:   
                b, a = signal.iirfilter(update['order'],
                                        Wn=update['wp'],
                                        rs=update['gstop'],
                                        rp=update['gpass'],
                                        ftype=self.iir_filters[update['subtype']],
                                        btype=btype)
        
        elif update['ftype'] == 'FIR':
            print(update)
            if btype == 'lowpass':
                b = signal.firwin(update['order'],
                              cutoff=update['ws'],
                              window=self.fir_windows[update['subtype']])
            elif btype == 'highpass':
                b = signal.firwin(update['order'], cutoff = 0.3, window = "hanning")
                b = -b
                b[update['order']/2] = b[update['order']/2] + 1
            elif btype == 'bandpass':
                f1 = signal.firwin(n, cutoff = 0.3, window = 'blackmanharris')
                f2 = - signal.firwin(n, cutoff = 0.5, window = 'blackmanharris')
                f2[update['order']/2] = f2[update['order']/2] + 1
                b = - (f1+f2)
                b[update['order']/2] = b[update['order']/2] + 1
                
            a = np.arange(1, 2)
                
        print(b, a)
                
        w, h = signal.freqz(b, a, worN=2048)
        impulse = np.repeat(0.0, 2048)
        impulse[0] = 1.0
        response = signal.lfilter(b, a, impulse)
        step = np.cumsum(response)
        x = np.linspace(0, 50, 2048)
        
        update['b'] = b.tolist()
        update['a'] = a.tolist()
        update['w'] = (w/max(w)).tolist()
        update['h_freq'] = (20*np.log10(abs(h))).tolist()
        update['h_phase'] = (np.unwrap(np.arctan(np.imag(h), np.real(h)))).tolist()
        update['x'] = x.tolist()
        update['impulse'] = response.tolist()
        update['step'] = step.tolist()
        
        new_source = self.source.data
        for key in new_source.keys():
            new_source[key].append(update[key])
            
        self.source.data = new_source
        
        line_color = Category20[20][(len(self.amp_response_fig_lines) + 1) % 8]
        
        self.amp_response_fig_lines.append(
            self.amp_response_fig.line(update['w'], update['h_freq'], visible=False, line_color=line_color)
        )
        self.phase_response_fig_lines.append(
            self.phase_response_fig.line(update['w'], update['h_phase'], visible=False, line_color=line_color)
        )
        self.impulse_response_fig_lines.append(
            self.impulse_response_fig.line(update['x'], update['impulse'], visible=False, line_color=line_color)
        )
        self.step_response_fig_lines.append(
            self.step_response_fig.line(update['x'], update['step'], visible=False, line_color=line_color)
        )
        
    def _set_visible(self, old, new, attr):
        for line_index, line in enumerate(self.amp_response_fig_lines):
            
            if line_index in self.source.selected['1d'].indices:
                self.amp_response_fig_lines[line_index].visible = True
                self.phase_response_fig_lines[line_index].visible = True
                self.impulse_response_fig_lines[line_index].visible = True
                self.step_response_fig_lines[line_index].visible = True
            else:
                self.amp_response_fig_lines[line_index].visible = False
                self.phase_response_fig_lines[line_index].visible = False
                self.impulse_response_fig_lines[line_index].visible = False
                self.step_response_fig_lines[line_index].visible = False
        
    def update(self, old, new, attr):
        self._update_parameters()
    
    def modify_doc(self, doc):
        doc.add_root(self.layout)
        
bk.curdoc().clear()
p = FilterDesignTool(width=780)
handler = FunctionHandler(p.modify_doc)
app = Application(handler)

bk.show(app)

ERROR:bokeh.server.protocol_handler:error handling message Message 'PATCH-DOC' (revision 1): TypeError("unsupported operand type(s) for -: 'str' and 'int'")


{'ftype': 'FIR', 'subtype': 'Bartlett', 'fs': 1.0, 'nyquist': 0.5, 'wp': [0.1], 'ws': [0.25], 'gpass': 1.0, 'gstop': 80.0, 'order': 'auto'}
[9.50490445e-10 1.04553949e-08 5.22769745e-08 1.56830923e-07
 3.13661847e-07 4.39126585e-07 4.39126585e-07 3.13661847e-07
 1.56830923e-07 5.22769745e-08 1.04553949e-08 9.50490445e-10] [ 1.00000000e+00 -8.65576868e+00  3.42610015e+01 -8.18239359e+01
  1.30963428e+02 -1.47456072e+02  1.19143821e+02 -6.90666726e+01
  2.81437693e+01 -7.67599972e+00  1.26092261e+00 -9.44919631e-02]
{'ftype': 'FIR', 'subtype': 'Hanning', 'fs': 1.0, 'nyquist': 0.5, 'wp': [0.1], 'ws': [0.25], 'gpass': 1.0, 'gstop': 80.0, 'order': 101}
[ 0.00000000e+00  4.53204480e-06 -3.84226595e-20 -4.24122715e-05
 -1.08698379e-04 -1.22401229e-04  1.24459737e-18  2.49085519e-04
  4.68722033e-04  4.27297421e-04 -9.30602034e-19 -6.62211072e-04
 -1.13515295e-03 -9.59480621e-04  1.76672250e-18  1.32543641e-03
  2.17279905e-03  1.76736279e-03 -2.79800267e-18 -2.29389324e-03
 -3.66576419e-03 -2

  output = mkl_fft.rfft_numpy(a, n=n, axis=axis)
  out = out_full[ind]


{'ftype': 'FIR', 'subtype': 'Hanning', 'fs': 1.0, 'nyquist': 0.5, 'wp': [0.1], 'ws': [0.25], 'gpass': 1.0, 'gstop': 80.0, 'order': 101}
[ 0.00000000e+00  4.53204480e-06 -3.84226595e-20 -4.24122715e-05
 -1.08698379e-04 -1.22401229e-04  1.24459737e-18  2.49085519e-04
  4.68722033e-04  4.27297421e-04 -9.30602034e-19 -6.62211072e-04
 -1.13515295e-03 -9.59480621e-04  1.76672250e-18  1.32543641e-03
  2.17279905e-03  1.76736279e-03 -2.79800267e-18 -2.29389324e-03
 -3.66576419e-03 -2.91557792e-03  3.95964342e-18  3.64571132e-03
  5.73695890e-03  4.50156377e-03 -5.17865459e-18 -5.50626012e-03
 -8.58985717e-03 -6.69173471e-03  6.37844123e-18  8.10355252e-03
  1.26066069e-02  9.80913037e-03 -7.48361633e-18 -1.19125276e-02
 -1.86145065e-02 -1.45828773e-02  8.42473767e-18  1.81138142e-02
  2.87912858e-02  2.30621105e-02 -9.14267113e-18 -3.06239302e-02
 -5.11887086e-02 -4.39140266e-02  9.59230633e-18  7.43616039e-02
  1.58526824e-01  2.24856118e-01  2.49999011e-01  2.24856118e-01
  1.58526824e-01  7

In [21]:
x = [1]

In [22]:
x.to_list()

AttributeError: 'list' object has no attribute 'to_list'

In [32]:
np.arange(1, 2).tolist()

[1]