In [None]:
%gui asyncio
import io
import queue
import asyncio
import functools
import threading
import ipywidgets as widgets
from ipywidgets import *
from IPython.display import display
import time
import re
import cv2
import serial
import sys
import glob
import numpy as np
import PIL
import pandas as pd
import plotly.graph_objects as go
from PIL import Image

In [None]:
class CameraStream:
    captureObject=None
    
    def __init__(self):
        self.captureObject = cv2.VideoCapture(0)
    
    def get_frame(self):
        ret, img = self.captureObject.read()
        rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        return self.image_to_byte_array(PIL.Image.fromarray(rgb))
    #https://stackoverflow.com/questions/2659312/how-do-i-convert-a-numpy-array-to-and-display-an-image
    def image_to_byte_array(self, image:Image):
      imgByteArr = io.BytesIO()
      image.save(imgByteArr, format='PNG')
      imgByteArr = imgByteArr.getvalue()
      return imgByteArr

In [None]:
## CONSTANTS ##
CMDPROMPT='>>: '
LINETERM='\r\n'
currentposition=[0,0,0]
myport = None
stream = CameraStream()
tasq = queue.Queue() #big brain

In [None]:
#https://stackoverflow.com/questions/12090503/listing-available-com-ports-with-python
def get_ports():
    """ Lists serial port names

        :raises EnvironmentError:
            On unsupported or unknown platforms
        :returns:
            A list of the serial ports available on the system
    """
    if sys.platform.startswith('win'):
        ports = ['COM%s' % (i + 1) for i in range(256)]
    elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
        # this excludes your current terminal "/dev/tty"
        ports = glob.glob('/dev/tty[A-Za-z]*')
    elif sys.platform.startswith('darwin'):
        ports = glob.glob('/dev/tty.*')
    else:
        raise EnvironmentError('Unsupported platform')

    result = []
    for port in ports:
        try:
            s = serial.Serial(port)
            s.close()
            result.append(port)
        except (OSError, serial.SerialException):
            pass
    return result

In [None]:
def list_ports():
    print('Available serial ports: ')
    ports=get_ports()
    for port in ports:
        print(ports.index(port), end=':')
        print(port)
def try_connect(portindex):
    myport = None
    try:
        myport = serial.Serial(
            port=get_ports()[portindex],
            baudrate=250000,
            timeout=0,
            parity=serial.PARITY_NONE,
            stopbits=serial.STOPBITS_ONE,
            bytesize=serial.EIGHTBITS
        )
        print('Sucessfully connected to port '+myport.port)
        return myport
    except (OSError, serial.SerialException):
        print('ERROR: ', end='')
        if myport.is_open:
            print('Port already open')
        elif myport.port not in portlist:
            print('Port '+myport.port+' not found')
        else:
            print('Could not connect to port')
        pass

In [None]:
myport=try_connect(1)

In [None]:
def read_serial():
    return myport.readline().decode().rstrip()

def contains_ok(line):
    if line == 'ok':
        return True
    return False

def handle_input(sender):
    userinput=sender.value
    sender.value=''
    if userinput in macros:
        terminalout.append_display_data("MACRO: "+userinput)
        macros[userinput]()
    else:
        send_gcode(userinput, False)
def send_gcode(gcode='', wait=False):
    if wait:
        userin.disabled=True
        userin.placeholder='Waiting for OK'
    raw = gcode.upper()
    machineout.append_display_data('SEND: '+raw)
    sInput = (raw + LINETERM) #append terminator string
    bInput = sInput.encode() #convert string to bytes
    myport.write(bInput) #send the command
    #tasq.task_done()

def current_milli_time():
    return round(time.time() * 1000)

def is_m114_feedback(printeroutput):
    if(len(re.findall("(?:[^E][:])(\d{1,3}[.]\d{4})", printeroutput))==3):
        return True
    return False

def parse_coords(printeroutput):
    return list(map(float, re.findall("(?:[^E][:])(\d{1,3}[.]\d{4})",printeroutput)))

def get_human_coords(printeroutput):
    coordlist = parse_coords(printeroutput)
    return 'X:'+str(coordlist[0])+' Y:'+str(coordlist[1])+' Z:'+str(coordlist[2])

In [None]:
## MACRO DEFINITIONS ##
def show_help(b=None):
    """Shows this help"""
    terminalout.append_display_data('Available macros:')
    for macro in macros.items():
        terminalout.append_display_data(macro[0]+': '+macro[1].__doc__)

def probe_grid(b=None):
    """Returns array of probed coordinate"""
    
def probe_point(b=None):
    """Returns the probed coordinate"""
    tasq.put(functools.partial(send_gcode, gcode='M401')) #deploy probe
    tasq.put(functools.partial(send_gcode, gcode='G38.2 Z0')) #probe downwards
    tasq.put(functools.partial(send_gcode, gcode='M402')) #stow probe
    tasq.put(functools.partial(send_gcode, gcode='G00 Z40'))

def home(b=None):
    """Auto Home XYZ"""
    tasq.put(functools.partial(send_gcode, gcode='G28'))

def deploy_probe(b=None):
    """deploy_probe the probe"""
    tasq.put(functools.partial(send_gcode, gcode='M401'))

def stow_probe(b=None):
    """stow_probe the probe"""
    tasq.put(functools.partial(send_gcode, gcode='M402'))
    
def enable_fan(b=None):
    """Set case fan speed to MAX"""
    tasq.put(functools.partial(send_gcode, gcode='M106 P2 S255'))
    
def disable_fan(b=None):
    """Disable all fans"""
    tasq.put(functools.partial(send_gcode, gcode='M106 P2 S0'))
    
def get_pos(b=None):
    """Report the current tool position"""
    tasq.put(functools.partial(send_gcode, gcode='M114'))
    
def disable_motors(b=None):
    """Disables all the motors"""
    tasq.put(functools.partial(send_gcode, gcode='M18'))

def set_relative(b=None):
    """Set all axes to relative"""
    tasq.put(functools.partial(send_gcode, gcode='G91'))

def set_absolute(b=None):
    """Set all axes to asbolute"""
    tasq.put(functools.partial(send_gcode, gcode='G90'))
        
def do_jog(b=None, _pos=[0,0,0]):
    """Jogs machine"""
    tasq.put(functools.partial(send_gcode,
                               gcode='G91'))
    tasq.put(functools.partial(send_gcode,
                               gcode=str(
                                   'G00 X'+str(_pos[0]*jogslider.value)+
                                   ' Y'+str(_pos[1]*jogslider.value)+
                                   ' Z'+str(_pos[2]*jogslider.value))))
    tasq.put(functools.partial(send_gcode,
                               gcode='G90'))
    
## MACRO DICTIONARY
macros = {
    'help':show_help,
    'probe':probe_point,
    'grid':probe_grid,
    'home':home,
    'deploy':deploy_probe,
    'stow':stow_probe,
    'fon':enable_fan,
    'foff':disable_fan,
    'pos':get_pos,
    'moff':disable_motors,
    'rel':set_relative,
    'abs':set_absolute
}

In [None]:
## INPUT HOOK FUNCTIONS ##

## INPUT HOOK DICTIONARY##


In [None]:
globalpadding='0px 0px 0px 0px'
globalmargin='0px 0px 0px 0px'
machineout = Output(
    layout={
            'width':'50%',
            'height':'100%',
            'border': '1px solid white',
            'overflow':'scroll',
            'object-fit':'cover',
            'object-position':'center',
            'margin':globalmargin,
            'padding':globalpadding
           }
)
terminalout = Output(
    layout={
            'width':'100%',
            'height':'100%',
            'border': '1px solid white',
            'overflow':'scroll',
            'object-fit':'cover',
            'object-position':'center',
            'margin':globalmargin,
            'padding':globalpadding
           }
)
userin = Text(
    layout={
            'width':'100%',
            'height':'5%',
            'border': '1px solid white',
            'description':str(CMDPROMPT),
            'placeholder':'Type a command here',
            'object-fit':'cover',
            'object-position':'center',
            'margin':globalmargin,
            'padding':globalpadding
           },
    value='',
    placeholder='Type a command here',
    disabled=False
)
buttongrid = GridspecLayout(4, 3,
                            layout=Layout(
                                width='100%',
                                height='25%',
                                grid_gap='5%',
                                object_fit='cover',
                                object_position='center',
                                margin=globalmargin,
                                padding=globalpadding
                            )
                           )

buttongrid[0, 0] = fanbutton = Button(
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Disable Fan',
    icon='fan',
    layout=Layout(width='90%', height='90%')
)
buttongrid[0, 2] = motorbutton = Button(
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Disable Motors',
    icon='power-off',
    layout=Layout(width='90%', height='90%')
)
buttongrid[1, 0] = leftbutton = Button(
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Jog X-',
    icon='arrow-left',
    layout=Layout(width='90%', height='90%')
)
buttongrid[1, 2] = rightbutton = Button(
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Jog X+',
    icon='arrow-right',
    layout=Layout(width='90%', height='90%')
)
buttongrid[0, 1] = upbutton = Button(
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Jog Y+',
    icon='arrow-up',
    layout=Layout(width='90%', height='90%')
)
buttongrid[2, 1] = downbutton = Button(
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Jog Y-',
    icon='arrow-down',
    layout=Layout(width='90%', height='90%')
)
buttongrid[1, 1] = homebutton = Button(
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Home',
    icon='home',
    layout=Layout(width='90%', height='90%')
)
buttongrid[3, :2] = jogslider = widgets.FloatLogSlider(
    value=10,
    base=10,
    min=-2, # max exponent of base
    max=2, # min exponent of base
    step=1, # exponent step
    orientation='horizontal',
    continuous_update=True,
    description='Jog Step',
    readout_format='.2f'

)

##---------------- Buton Events ----------------##
userin.on_submit(handle_input)
leftbutton.on_click(functools.partial(do_jog, _pos=[-1.0,0.0,0.0]))
rightbutton.on_click(functools.partial(do_jog, _pos=[1.0,0.0,0.0]))
upbutton.on_click(functools.partial(do_jog, _pos=[0.0,1.0,0.0]))
downbutton.on_click(functools.partial(do_jog, _pos=[0.0,-1.0,0.0]))
homebutton.on_click(home)
fanbutton.on_click(disable_fan)
motorbutton.on_click(disable_motors)

webcam = widgets.Image(value=stream.get_frame(), format='PNG', 
                       layout=Layout(
                           width='100%',
                           height='100%',
                           object_fit='cover',
                           object_position='center',
                           margin='-5px 0px 0px 0px',
                           padding=globalpadding
                       )
                      )

## Surface plot matrix
z_data = pd.read_csv('https://raw.githubusercontent.com/plotly/datasets/master/api_docs/mt_bruno_elevation.csv')

surfaceplot = go.FigureWidget(
    data=[go.Surface(z=z_data.values)],
    layout=go.Layout(
        title='Mt Bruno Elevation', autosize=True,
        margin=dict(l=0, r=0, b=0, t=0)
    )
)

tabgroup = widgets.Tab(children=[terminalout, webcam, surfaceplot],
                       layout=Layout(
                           width='100%',
                           height='75%',
                           padding=globalpadding,
                           margin=globalmargin,
                           object_fit='cover',
                           object_position='center'
                       )
                      )
tabgroup.set_title(0, 'Terminal')
tabgroup.set_title(1, 'Webcam')
tabgroup.set_title(2, 'Surface')

rightside = VBox([tabgroup, buttongrid],
                 layout=Layout(
                     width='50%',
                     align_items='center',
                     padding=globalpadding,
                     margin=globalmargin,
                     object_fit='cover',
                     object_position='center'
                 )
                )

termgroup = HBox([machineout, rightside],
                 layout=Layout(
                     width='100%',
                     height='95%',
                     padding=globalpadding,
                     margin=globalmargin,
                     object_fit='cover',
                     object_position='center'
                 )
                )

controlgroup = VBox([termgroup, userin],
                    layout=Layout(
                        width='100%',
                        height='600px',
                        padding=globalpadding,
                        margin=globalmargin,
                       object_fit='cover',
                       object_position='center'
                    )
                   )

In [None]:
def _update():
    got_ok=True
    while True:
        #printer output echo
        serialdata = read_serial()
        if serialdata != '':
            machineout.append_display_data('RECV: '+serialdata)
            got_ok=contains_ok(serialdata)
            if is_m114_feedback(serialdata):
                terminalout.append_display_data('POSITION: '+get_human_coords(serialdata))
        #queue processing
        if not tasq.empty():
            task = tasq.get()
            terminalout.append_display_data(str(task))
            task()
        time.sleep(0.01)

def _getstream(frameRate):
    while True:
        webcam.value = stream.get_frame()
        time.sleep(1.0/frameRate)


In [None]:
myport.reset_input_buffer()
myport.reset_output_buffer()
machineout.clear_output()
terminalout.clear_output()

t_serialupdate = threading.Thread(target=_update, args=())
t_webcam = threading.Thread(target=_getstream, args=(15,))
t_serialupdate.start()
t_webcam.start()

display(controlgroup)

print('Enter your command in the box above. Press <ENTER> to send.')
print('Type "help" to see available shorcuts')


In [None]:
#killthreads==True

##### 