In [338]:
import os, sys, inspect
import time

lib_folder = os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0], 'CapableRobot_USBHub_Driver')
lib_load = os.path.realpath(os.path.abspath(lib_folder))

if lib_load not in sys.path:
    sys.path.insert(0, lib_load)

import capablerobot_usbhub 

SIM = False

# hub = capablerobot_usbhub.USBHub()
# hub.i2c.enable()

try:
    hub = capablerobot_usbhub.USBHub()
    hub.i2c.enable()
except ValueError:
    SIM = True

In [339]:
import time

import paho.mqtt.client as mqtt
import bqplot as bq
import numpy as np
import pandas as pd

from IPython.display import display

def live_plot(title, fields, x="", y="", ylimit=None, history=50):
    global drop_count
    drop_count=0
    
    x_sc = bq.LinearScale()
    y_sc = bq.LinearScale()
    
    if ylimit is not None:
        y_sc.min = ylimit[0]
        y_sc.max = ylimit[1]

    ax_x = bq.Axis(label=x, scale=x_sc, grid_lines='solid')
    ax_y = bq.Axis(label=y, scale=y_sc, orientation='vertical', grid_lines='solid')

    df = pd.DataFrame(columns=['time']+fields)
        
    pts = bq.Lines(x=df.index, y=[df[f].values for f in fields], display_legend=True, scales={'x': x_sc, 'y': y_sc})
    fig = bq.Figure(axes=[ax_x, ax_y], marks=[pts], labels=fields, legend_location='top-left', title=title)
    
    def cb(data):
        global drop_count
        
        if len(df) > history:
            try:
                df.drop([drop_count], inplace=True)
                drop_count += 1
            except KeyError:
                pass
        
        df.loc[len(df)+drop_count] = [time.time()] + data
        
        with pts.hold_sync():
            pts.x = df.index 
            pts.y = [df[f] for f in fields]
    
    return fig, cb

In [340]:
def print_currents():
    last = int(time.time() * 1000)
    
    while True:
        now = int(time.time() * 1000)
        print(str(now - last).rjust(3), " ".join([("%.2f" % v).rjust(7) for v in hub.power.measurements()]))
        last = now
        time.sleep(0.5)

In [341]:
import ipywidgets as widgets
import copy
import random

out = widgets.Output()
display(out)

## Since UI uses 'Button' instead of 'ToggleButton' objects,
## default & current state of the buttons must be tracked.
## Events fire on click, and don't have 'new' vs 'old' states
## like ToggleButton objects do.

power_states = [True, True, True, True] 
data_states = ["on", "on", "on", "on"]

power_buttons = []
data_buttons = []

if not SIM:
    ## Get state upon connection and ignore last data state (it's the upstream link)
    power_states = hub.power.state()
    data_states = hub.data_state()[0:4]

@out.capture()
def power_change(button):
    
    port = int(button.description.split(" ")[1])
    off = (power_states[port-1] == False)
    
    if off:
        power_states[port-1] = True
        color = 'lightgray'
    else:
        power_states[port-1] = False
        color = 'orange'
    
    power_buttons[port-1].style.button_color = color
        
    if not SIM:
        if off:
            hub.power.enable(ports=[port])
        else:  
            hub.power.disable(ports=[port])
              

            
@out.capture()
def data_change(button):

    port = int(button.description.split(" ")[1])
    off = (data_states[port-1] == "off")
    
    if SIM:
        if off:
            print("DATA ENABLE {}".format(port))
            data_states[port-1] = "on"
        else:
            print("DATA DISABLE {}".format(port))
            data_states[port-1] = "off"
    else:
        if off:      
            hub.data_enable(ports=[port])
            data_states[port-1] = "on"
        else:  
            hub.data_disable(ports=[port])
            data_states[port-1] = "off"

def setup_ui():

    for idx in [1,2,3,4]:
        label = "Port {}".format(idx)
        
        button = widgets.Button(description=label, disabled=False, value=True)
        button.on_click(power_change)
        
        ## Set the initial button color based on the port power state
        if power_states[idx-1]:
            button.style.button_color = 'lightgray'
        else:
            button.style.button_color = 'orange'
            
        power_buttons.append(button)
        
        button = widgets.Button(description=label, disabled=False, value=True)
        button.on_click(data_change)
        data_buttons.append(button)
    
    power_label = widgets.Label(value="Power", layout=widgets.Layout(width='20%'))
    power_row   = widgets.HBox([power_label]+power_buttons)
    
    data_label  = widgets.Label(value="Data", layout=widgets.Layout(width='20%'))
    data_row    = widgets.HBox([data_label]+data_buttons)
    
    display(widgets.VBox([data_row, power_row]))        
    
def graph_currents(do_stop):
    fields = ["1", "2", "3", "4"]
    lim = 1700
    if SIM:
        lim = 10
    fig, cb = live_plot("Port Current", fields, x="Time (sec)", y="Current (mA)", ylimit=[0,lim])

    display(fig)
    
    while True:
        try:
            if SIM:
                cb([random.random(),random.random(),random.random(),random.random()])
            else:
                cb(hub.power.measurements())
                
                if len(data_buttons) < 4:
                    continue
                    
                speeds = hub.speeds()

                for idx,state in enumerate(hub.data_state()[0:4]):
                        
                    speed = speeds[idx]
                    color = "lightgray"
                
                    if speed == "high":
                        color = "white"
                    elif speed == "full":
                        color = "green"
                    elif speed == "low": 
                        color = "blue"
                    elif state == "off":
                        color = "orange"

                    data_buttons[idx].style.button_color = color
                        
        except ValueError as e:
            pass

        time.sleep(0.2)
        
        if do_stop():
            break

Output()

In [342]:
# print_currents()

setup_ui()

stop_threads = False

def stop():
    global stop_threads
    stop_threads = True

## Graphing must be done in a thread, if done in the main loop
## it will block event observation of the UI buttons
import threading
thread = threading.Thread(target=graph_currents, args=(lambda: stop_threads, ))
thread.start()

VBox(children=(HBox(children=(Label(value='Data', layout=Layout(width='20%')), Button(description='Port 1', st…

Figure(axes=[Axis(label='Time (sec)', scale=LinearScale()), Axis(label='Current (mA)', orientation='vertical',…

In [343]:
# stop()