In [None]:
# Set the TkAgg backend for matplotlib
%matplotlib tk

# Import required packages
import threading
from threading import Thread
import time
import datetime
import re
import logging
import os
import serial as pyserial
import sys
import glob
import re
import json

from IPython.display import display, Markdown, clear_output, Image, HTML
import ipywidgets as widgets
from ipywidgets import interact, interact_manual, Layout, Box, Button, Label, FloatText, Textarea, Dropdown, IntText

import matplotlib
import matplotlib.figure as figure
import matplotlib.animation as animation
from matplotlib import rc
import matplotlib.dates as mdates
from matplotlib.ticker import FormatStrFormatter
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import matplotlib.pyplot as plt
import numpy as np

import ad7124_default_config

In [None]:
ad7124_config = ad7124_default_config.ad7124_register_settings

In [None]:
form_item_layout = Layout(
    display='flex',
    flex_flow='row',
    justify_content='space-between'
)

form_items = []

for register in ad7124_config:
    if (ad7124_config[register]["r/w"] == 'w') or\
    (ad7124_config[register]["r/w"] == 'rw'):
        form_items.append(Box([Label(value=register),
                               IntText(value=ad7124_config[register]["default_value"])],
                              layout=form_item_layout))


form = Box(form_items, layout=Layout(
    display='flex',
    flex_flow='column',
    border='solid 2px',
    align_items='stretch',
    width='50%'
))

form

In [None]:
for item in form_items:
    print(item)

In [None]:
my_list = ad7124_default_config.ad7124_register_settings

json = json.dumps(my_list)
    
with open('sample_config.py', "w+") as config_file:
    #config_file.write(str(my_list))
    config_file.write("ad7124_register_settings = " + str(json))

In [None]:
# Class for Serial connectivity
class Serial:
    def __init__(self, channels):
        self.BAUD = 115200  # Serial Baud Rate
        self.serial = None
        self.serial_buffer = ""
        self.data_collection_running = False
        self.channels = channels
        self.serial_connected = False
        self.data_collection_in_progress = False
        
        # Serial port selection dropdown widget
        self.serial_ports = self.__serial_port_scan()
        
        if len(self.serial_ports) == 0:
            self.serial_ports.append("No Serial Cable Connected")
              
        self.selected_serial_port = self.serial_ports[0]
        
        
        self.serial_port_list = widgets.Dropdown(
                                options=self.serial_ports,
                                value=self.selected_serial_port,
                                description='Serial Port:')
        self.serial_port_list.observe(self.on_serial_port_change)
        
        # Serial port connect/disconnect button widget
        self.serial_connection_button = widgets.Button(description = 'Connect')
        self.serial_connection_button.on_click(self.on_serial_connection_button_clicked)
        
        # Serial port scan button widget
        self.serial_scan_button = widgets.Button(description = 'Scan')
        self.serial_scan_button.on_click(self.on_serial_scan_button_clicked)
        
        # Data collection start/stop button widget
        self.data_collect_button = widgets.Button(description = 'Start Data Collection')
        self.data_collect_button.on_click(self.on_data_collect_button_clicked)
        self.data_collect_button.disabled = True
        
        # Display the Serial widgets
        display(widgets.HBox([self.serial_port_list, self.serial_connection_button, self.serial_scan_button]))
        display(widgets.HBox([self.data_collect_button]))

        # Setup threading to continually read data from the serial port
        self.thread = threading.Thread(target=self.read_serial)
        self.thread.daemon = True
            
    def on_serial_connection_button_clicked(self, arg):
        if self.serial_connected:
            self.disconnect_serial()
        else:
            self.connect_serial(self.selected_serial_port, self.BAUD)
            
    def on_serial_scan_button_clicked(self, arg):
        self.serial_ports = self.__serial_port_scan()
        
        if len(self.serial_ports) == 0:
            self.serial_ports.append("No Serial Cable Connected")
              
        self.serial_port_list.options = self.serial_ports
            
    def on_data_collect_button_clicked(self, arg):
        if not self.data_collection_in_progress:
            self.data_collection_start()
            self.data_collection_in_progress = True
            self.data_collect_button.description = "Stop Data Collection"
        else:
            self.data_collection_stop()
            self.data_collection_in_progress = False
            self.data_collect_button.description = "Start Data Collection"

    # Function for opening a serial connection on the specified port
    def connect_serial(self, port, baud):
        """ The function initiates the Connection to the UART device with the specified Port.

        :param port: Serial port to connect to.
        :param baud: Baud rate of the serial connection.
        :returns:
            0 if the serial connection is successfully opened.
            -1 if the connection fails to open.
        """
        try:
            self.serial = pyserial.Serial(port, baud, timeout=0, writeTimeout=0)  # ensure non-blocking
            self.serial_connected = True
            self.serial_port_list.disabled = True
            self.serial_connection_button.disabled = True
            self.serial_scan_button.disabled = True
            
            # Start a thread to handle receiving serial data
            if not self.thread.is_alive():
                self.thread.start()

            # Kill any running processes
            self.__serial_ctrl_c()
            time.sleep(0.25)
            self.__login()
            return 0
        except Exception as e:
            logging.exception(e)
            print("Cant Open Specified Port")
            return -1
        
    def disconnect_serial(self):
        """ This function is for disconnecting and quitting the application.
            Sometimes the application throws a couple of errors while it is being shut down, the fix isn't out yet

        :returns:
            0 if the serial connection is successfully closed.
            -1 if the connection fails to close.
        """

        try:
            self.serial_connected = False
            self.serial.close()
            self.serial_port_list.disabled = False
            self.serial_scan_button.disabled = False
            self.data_collect_button.disabled = True
            self.serial_connection_button.description = "Connect"
            return 0

        except Exception as e:
            logging.exception(e)
            return -1
        
    def write_serial(self, string):
        string = string + "\n"
        self.serial.write(string.encode('utf-8'))
        
    def write_file_to_meerkat(self, file):
        self.serial.write(str("rm /root/python/" + file + "\n").encode('utf-8'))
        f = open(file, "r")
        for line in f:
            #Remove excess newline character since we will insert one line at a time
            line = line.rstrip('\n')
            #print(str("echo '" + line + "' >> /root/python/" + file + "\n").encode('utf-8'))
            self.serial.write(str("echo \"" + line + "\" >> /root/python/" + file + "\n").encode('utf-8'))
        f.close()

    def read_serial(self):
        # Infinite loop is okay since this is running in it's own thread.
        while True:
            # Only try to read the serial port if connected
            if self.serial_connected:
                try:
                    c = self.serial.read().decode('unicode_escape')  # attempt to read a character from Serial

                    # was anything read?
                    if len(c) == 0:
                        pass

                    # check if character is a delimeter
                    if c == '\r':
                        c = ''  # don't want returns. chuck it

                    if c == '\n':
                        self.serial_buffer += "\n"  # add the newline to the buffer

                        # Parse the received serial data since we've received a full line
                        if "login" in self.serial_buffer:
                            # Only want to enable buttons if we can confirm there is a good connection to the board
                            self.__serial_connection_established()
                            self.__login()
                        elif "-sh: root: not found" in self.serial_buffer:
                            # Only want to enable buttons if we can confirm there is a good connection to the board
                            self.__serial_connection_established()
                        elif "AD7124 error" in self.serial_buffer:
                            self.__serial_ctrl_c()
                        elif "channel" in self.serial_buffer \
                                and "timestamp" in self.serial_buffer \
                                and "voltage" in self.serial_buffer \
                                and "code" in self.serial_buffer:
                                    self.update_plot_data(self.serial_buffer)

                        self.serial_buffer = ""  # empty the buffer
                    else:
                        self.serial_buffer += c  # add to the buffer

                except Exception as e:
                        logging.exception(e)
                        pass
            
    def update_plot_data(self, data_string):
        """ Parse the data_string into key values and store the data into arrays

        :param data_string: String to parse into relevant data
        :return: Nothing
        """
        try:
            p = re.compile(r'[-+]?\d*\.\d+|\d+')
            channel, timestamp, voltage, code = p.findall(data_string)
                
            self.channels[channel]["voltages"].append(self.channels[channel]["voltage_conversion_func"](float(voltage)))
            self.channels[channel]["values"].append(self.channels[channel]["value_conversion_func"](float(voltage), float(code)))
            self.channels[channel]["timestamps"].append(datetime.datetime.utcnow().strftime('%M:%S.%f')[:-4])

            # Limit lists to the max elements value
            self.channels[channel]["timestamps"] = self.channels[channel]["timestamps"][-self.channels[channel]["max_elements"]:]
            self.channels[channel]["voltages"] = self.channels[channel]["voltages"][-self.channels[channel]["max_elements"]:]
            self.channels[channel]["values"] = self.channels[channel]["values"][-self.channels[channel]["max_elements"]:]

        except Exception as e:
            logging.exception(e)
            pass
            
    def on_serial_port_change(self, change):
        if change['type'] == 'change' and change['name'] == 'value':
            self.selected_serial_port = change['new']            

    def __serial_port_scan(self):
        """ Lists serial port names

        :raises EnvironmentError:
            On unsupported or unknown platforms
        :returns:
            A list of the serial ports available on the system
        """
        if sys.platform.startswith('win'):
            ports = ['COM%s' % (i + 1) for i in range(256)]
        elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
            # this excludes your current terminal "/dev/tty"
            ports = glob.glob('/dev/tty[A-Za-z]*')
        elif sys.platform.startswith('darwin'):
            ports = glob.glob('/dev/tty.*')
        else:
            raise EnvironmentError('Unsupported platform')

        result = []
        for port in ports:
            try:
                self.serial = pyserial.Serial(port)
                self.serial.close()
                result.append(port)
            except (OSError, pyserial.SerialException):
                pass

        return result
    
    def __serial_connection_established(self):
        self.serial_connection_button.disabled = False
        self.serial_connection_button.description = "Disconnect"
        self.data_collect_button.description = "Start Data Collection"
        self.data_collect_button.disabled = False
    
    def __serial_ctrl_c(self):
        """ Send a ctrl+c command to the terminal

        :return: Nothing
        """
        if self.serial_connected:
            self.serial.write("\x03\n".encode('utf-8'))
            
    def __login(self):
        """ Login to the device

        :return: Nothing
        """
        if self.serial_connected:
            self.serial.write("root\n".encode('utf-8'))
            
    def data_collection_start(self):
        """ Start the data collection script on the device.

        :return: Nothing
        """        
        self.data_collection_running = True
        self.serial.write("python /root/python/safari.py\n".encode('utf-8'))

    def data_collection_stop(self):
        """ Stop the data collection script, or any running script, on the device

        :return: Nothing
        """
        self.data_collection_running = False
        self.__serial_ctrl_c()
        
    # Function for starting the motor
    def data_collection_start_stop(self):
        
        if not self.data_collection_running:
            if self.serial_connected:
                self.data_collection_start()
                self.data_collect_button="Stop"
            else:
                print("Serial Port Error", "Serial Port Not Open.")
        else:
            if self.serial_connected:
                self.data_collection_start()
                self.btn_start_motor.configure(text="Start")
            else:
                print("Serial Port Error", "Serial Port Not Open.")

In [None]:
class Data:   
    def __init__(self, channels):
        self.channels = channels
        self.update_interval = 1  # Time (ms) between polling/animation updates 
        
        self.nplots = len(channels)
        self.plot_title_fontsize = 12
        self.plot_label_fontsize = 12
        self.tick_count = 5
        self.props = dict(boxstyle='round', facecolor='wheat', alpha=0.5)
          
        #self.fig, (self.ax1, self.ax2, self.ax3)  = plt.subplots(3, 1, figsize=(8, 8))
        self.fig, axs = plt.subplots(self.nplots, 1, figsize=(8, 8))
        for channel in self.channels:
            if self.nplots <= 1:
                self.channels[channel]["axis"] = axs
            else:
                self.channels[channel]["axis"] = axs[0]
                axs = np.delete(axs, 0)
            
            self.channels[channel]["axis"].set_title(self.channels[channel]["plot_title"], fontsize=12)
            self.channels[channel]["axis"].set_ylabel(self.channels[channel]["y_label"], color=self.channels[channel]["axes_color"])
            self.channels[channel]["axis"].tick_params(axis='y', labelcolor=self.channels[channel]["axes_color"])
            self.channels[channel]["axis"].xaxis.set_major_locator(plt.MaxNLocator(self.tick_count))
            self.channels[channel]["annotation"] = self.channels[channel]["axis"].annotate("", xy=(0, 0), xytext=(-20, 20),
                                                                                           textcoords="offset points",
                                                                                           bbox=dict(boxstyle="round", fc="w"),
                                                                                           arrowprops=dict(arrowstyle="->"))
            
        self.fig.subplots_adjust(hspace=0.9, right=0.85)

        # https://bit.ly/2AWOqiu
        self.fig.canvas.mpl_connect("motion_notify_event", self.hover)
        
        self.ani = animation.FuncAnimation(self.fig,
                                           self.animate,
                                           interval=self.update_interval)

    # https://bit.ly/2AWOqiu
    # Function for updating the hovering annotations
    def update_annot(self, axis, ind):
        for channel in self.channels:
            if axis == self.channels[channel]["axis"]:
                x, y = self.channels[channel]["line"].get_data()
                self.channels[channel]["annot_ax_xy"] = (x[ind["ind"][0]], y[ind["ind"][0]])
                self.channels[channel]["annotation_text"] = "{}".format(y[ind["ind"][0]])

    # Callback for the hover event
    def hover(self, event):
        for channel in self.channels:
            try:
                if event.inaxes == self.channels[channel]["axis"]:
                    if self.channels[channel]["line"]:
                        cont, ind = self.channels[channel]["line"].contains(event)
                        if cont:
                            self.update_annot(self.channels[channel]["axis"], ind)
                            self.channels[channel]["annotation_visible"] = True
                        else:
                            if self.channels[channel]["annotation"].get_visible():
                                self.channels[channel]["annotation_visible"] = False

            except IndexError:
                print("Uh-Oh")
        
    def animate(self, i):                
        for channel in self.channels:
            self.channels[channel]["axis"].clear()
            self.channels[channel]["axis"].set_title(self.channels[channel]["plot_title"], fontsize=12)
            self.channels[channel]["axis"].set_ylabel(self.channels[channel]["y_label"], color=self.channels[channel]["axes_color"])
            self.channels[channel]["axis"].yaxis.set_major_formatter(FormatStrFormatter('%.4f'))
            self.channels[channel]["axis"].xaxis.set_major_locator(plt.MaxNLocator(self.tick_count))
            
            try:
                if len(self.channels[channel]["voltages"]) > 0:
                    self.channels[channel]["axis"].plot(self.channels[channel]["timestamps"], 
                                                        self.channels[channel]["voltages"], 
                                                        color=self.channels[channel]["axes_color"])
                    self.channels[channel]["line"] = self.channels[channel]["axis"].get_lines()[0]
                    self.channels[channel]["annotation"] = self.channels[channel]["axis"].annotate("", xy=(0, 0), 
                                                                                                   xytext=(-20, 20), 
                                                                                                   textcoords="offset points",
                                                                                                   bbox=dict(boxstyle="round", 
                                                                                                             fc="w"),
                                                                                                   arrowprops=dict(arrowstyle="->"))
                    self.channels[channel]["annotation"].set_visible(self.channels[channel]["annotation_visible"])
                    self.channels[channel]["annotation"].xy = self.channels[channel]["annot_ax_xy"]
                    self.channels[channel]["annotation"].set_text(self.channels[channel]["annotation_text"])
                    self.channels[channel]["annotation"].get_bbox_patch().set_alpha(0.4)
                    try:
                        ax_min = min(self.channels[channel]["voltages"])
                        ax_mean = sum(self.channels[channel]["voltages"]) / len(self.channels[channel]["voltages"])
                        ax_max = max(self.channels[channel]["voltages"])
                        ax_last = self.channels[channel]["voltages"][-1]
                        ax_annotation = '\n'.join((
                            r'Max=%.4f' % (ax_max,),
                            r'Mean=%.4f' % (ax_mean,),
                            r'Min=%.4f' % (ax_min,),
                            r'Last=%.4f' % (ax_last,)))

                        self.channels[channel]["axis"].text(1.01, 0.7, ax_annotation, 
                                                            transform=self.channels[channel]["axis"].transAxes, 
                                                            fontsize=10, verticalalignment='top', bbox=self.props)
                    except ValueError:
                        print("Dimension Error")
                        print(f'Timestamps Len: {len(self.channels[channel]["timestamps"])}')
                        print(f'Voltages Len: {len(self.channels[channel]["voltages"])}')
                    
            except Exception as e:
                logging.exception(e)
                print("Dimension Error 1")
                print(f'Timestamps Len: {len(self.channels[channel]["timestamps"])}')
                print(f'Voltages Len: {len(self.channels[channel]["voltages"])}')

In [None]:
# Function for formatting channel 0 voltage for plotting
def convert_channel_0_voltage(voltage):
    return round(float(voltage), 4)

# Function for formatting channel 4 voltage for plotting
def convert_channel_4_voltage(voltage):
    return round(float(voltage) * 1000, 4)

# Function for formatting channel 6 voltage for plotting
def convert_channel_6_voltage(voltage):
    return round(float(voltage), 4)

# Function for converting channel 0 voltage to an accelerometer value
def convert_accelerometer(voltage, code):
    del code
    
    vcc = 3.3
    return round((voltage - vcc / 2) * 1 / 0.640, 3)

# Function for converting channel 4 code to a temperature value
def convert_temperature(voltage, code):
    del voltage
    
    r_rtd = (code * 5110) / ((2**24) * 16)
    temp = (r_rtd - 100) / 0.385
    return round(temp, 2)

# Function for converting channel 6 voltage to a pressure value
def convert_pressure(voltage, code):
    del code
    
    vcc = 3.3
    r1 = 132000
    r2 = 100000
    return round(((190.0 * voltage * (r1/r2)) / vcc - 38), 3)


# Create a dictionary containing all of the enabled channels
channels = {"0":{}, "4":{}, "6":{}}

#default fields for each channel
for channel in channels:
    channels[channel]["max_elements"] = 300
    channels[channel]["timestamps"]=[]
    channels[channel]["voltages"]=[]
    channels[channel]["values"]=[]
    channels[channel]["axis"] = None
    channels[channel]["annotation"] = None
    channels[channel]["annotation_visible"] = False
    channels[channel]["annotation_text"] = ""
    channels[channel]["line"] = None
    channels[channel]["annot_ax_xy"] = None
    channels[channel]["axes_color"] = 'tab:black'
    channels[channel]["plot_title"] = ""
    channels[channel]["y_label"] = ""
    channels[channel]["voltage_format_func"] = None
    channels[channel]["value_conversion_func"] = None

# Configure Channel 0
channels["0"]["axes_color"] = 'tab:blue'
channels["0"]["plot_title"] = "Channel 0"
channels["0"]["y_label"] = 'Voltage (V)'
channels["0"]["voltage_format_func"] = convert_channel_0_voltage
channels["0"]["value_conversion_func"] = convert_accelerometer

# Configure Channel 4
channels["4"]["axes_color"] = 'tab:red'
channels["4"]["plot_title"] = "Channel 4"
channels["4"]["y_label"] = 'Voltage (V)'
channels["4"]["voltage_format_func"] = convert_channel_4_voltage
channels["4"]["value_conversion_func"] = convert_temperature

# Configure Channel 6
channels["6"]["axes_color"] = 'tab:green'
channels["6"]["plot_title"] = "Channel 6"
channels["6"]["y_label"] = 'Voltage (V)'
channels["6"]["voltage_format_func"] = convert_channel_6_voltage
channels["6"]["value_conversion_func"] = convert_pressure
    
#Initialize the classes with the dictionary containing the channel information
serial = Serial(channels)
data = Data(channels)

In [None]:
serial.write_file_to_meerkat("ad7124_default_config.py")

In [None]:
serial.write_file_to_meerkat("ad7124.py")

In [None]:
serial.write_file_to_meerkat("safari.py")