In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from src.stream.serialdevice import *
from src.saf import *
from threading import Thread
import itertools  
# Bokeh plotting tools
from bokeh.palettes import Dark2_5 as palette
from bokeh.models.sources import ColumnDataSource
from bokeh.plotting import figure
from bokeh.io import output_notebook, show, push_notebook
from bokeh.models import Panel, Tabs
from bokeh.layouts import column, gridplot

output_notebook()

In [3]:
esp = serialdevice()
if esp.set_serial(): esp.update_serial()
print (f'Device serial number: {esp.serialNumber}')

1 --- /dev/cu.Bluetooth-Incoming-Port
2 --- /dev/cu.usbserial-0174750B
3 --- /dev/cu.SLAB_USBtoUART


Multiple devices found, please select one:  3


Device serial number: 0174750B


In [4]:
# Self-explanatory
store_to_csv = True
raster = 0.05
buffer_length = 5
# Number of points to show
n_show = 500
# Set channels to monitor and calculate 
channels_to_monitor = {'y': {'1': {'clean_na': ['fill', 'inplace']},#,
                             '2': {'smooth': [3, 'same', 'inplace']},
                             #'8': {'clean_na': ['drop', 'other']},
                             '3': {'exponential_smoothing': [0.2, 'same']},
                             '4': {'exponential_smoothing': [0.1, 'same']},
                             '5': {'exponential_smoothing': [0.05, 'same']},
                             '6': {'exponential_smoothing': [0.03, 'same']},
                             '7': {'exponential_smoothing': [0.02, 'same']},
                             '8': {'exponential_smoothing': [0.01, 'same']},
                             '9': {'time_derivative': [1, 'same']},
                             '10': {'time_diff': [1, 'other']}
                            }}

if store_to_csv: path_to_store = join(getcwd(), 'csv_export.csv'); print (f'Saving stream to: {path_to_store}')

Saving stream to: /Users/macoscar/Documents/04_Projects/02_FabLab/01_SmartCitizen/01_Repositories/DataAnalysis/smartcitizen-iscape-data/notebooks/csv_export.csv


In [7]:
def process_data(data):
    data = data.apply(pd.to_numeric, errors='coerce')
    for channel in channels_to_monitor.keys():
        for process_number in channels_to_monitor[channel].keys():
            # Process and formula
            process = list(channels_to_monitor[channel][process_number])[0]
            formula = process + f"(data['{channel}'], channels_to_monitor['{channel}']['{process_number}']['{process}'][0])" 
            # Name for new channel depending on inplace or not
            if 'inplace' in channels_to_monitor[channel][process_number][process]: channel_new_name = channel
            else: channel_new_name = channel + '_' + process + '_' + str(channels_to_monitor[channel][process_number][process][0])
            # Calculate
            if data.empty: data[channel_new_name] = []
            else: data[channel_new_name] = eval(formula)
    return data

# Start the stream
esp.start_streaming(buffer_length = buffer_length, raster = raster)
# Create plot columnar data
plot_data = ColumnDataSource(data = process_data(esp.worker.example))
# Number of tabs
n_tabs = len(list(channels_to_monitor.keys()))
tabs = Tabs(tabs = [])
colors = itertools.cycle(palette)

for channel in channels_to_monitor.keys():
    gridplots = list()
    p = figure(background_fill_color="#fafafa", x_axis_type='datetime')
    gridplots.append([p])
    p.line(y = channel, x="index", source = plot_data, legend_label = channel)
    p.title.text = f'Streaming {channel}'
    p.yaxis.axis_label = f'{channel}'
    p.xaxis.axis_label = 'Timestamp'

    for process_number in channels_to_monitor[channel].keys():
        process = list(channels_to_monitor[channel][process_number])[0]
        # We have already plotted it if it was inplace
        if 'inplace' in channels_to_monitor[channel][process_number][process]: continue

        channel_name = channel + '_' + process + '_' + str(channels_to_monitor[channel][process_number][process][0])
        if 'same' in channels_to_monitor[channel][process_number][process]:
            p.line(y=channel_name, x="index", legend_label = channel_name, source = plot_data, color = next(colors))
        elif 'other' in channels_to_monitor[channel][process_number][process]:
            p = figure(background_fill_color="#fafafa", x_axis_type='datetime')
            p.line(y=channel_name, x="index", legend_label = channel_name, source = plot_data, color = next(colors))
            p.yaxis.axis_label = f'{channel_name}'
            p.xaxis.axis_label = 'Timestamp'
            gridplots.append([p])

    p.legend.location='top_left'
    p.legend.click_policy="hide"

    grid = gridplot(gridplots,  plot_width=1000, plot_height=500)
    tab = Panel(child=grid, title=channel)
    tabs.tabs.append(tab)

handle = show(tabs, notebook_handle=True)
stop_threads = False

def worker_call(id, device, stop):
    df_data = pd.DataFrame()
    
    while True:
        if not device.worker.output.empty():
            new_data = device.worker.output.get()
            if 'Time' in new_data.columns: new_data.rename(columns={'Time': 'index'}, inplace=True)
            new_data = new_data.set_index('index')

            if df_data.empty: df_data = new_data
            else: df_data = pd.concat([df_data, new_data], sort = False)
            
            # We process everything
            # processed_data = process_data(new_data)    
            # if df_data.empty: df_data = processed_data
            #else: df_data = pd.concat([df_data, processed_data], sort = False)
            
            # We only process what we show
            processed_data = process_data(df_data.tail(n_show))
            # Stream and processing
            plot_data.stream(processed_data, n_show)
            
            # Store to csv
            if store_to_csv: df_data.to_csv(path_to_store, sep = ",")
            
            # Update plot
            push_notebook(handle = handle)

            if stop(): print("Finished thread"); break

thread = Thread(target=worker_call, args=(id, esp, lambda: stop_threads))
thread.start()

Initialised serial worker for device on port /dev/cu.SLAB_USBtoUART. Buffering 5 samples


In [8]:
stop_threads = True
if esp.worker.is_alive():
    print ('Terminating device worker')
    esp.worker.terminate()
    esp.worker.join()

Terminating device worker
