In [10]:
import serial
import time
import re
import logging

In [11]:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logging.basicConfig(filename='logtest.log', level=logging.DEBUG, format='%(asctime)s %(message)s')

In [12]:
class serial_device():
    """
    Parent class, handling basic communication with a device, 
    be it an Arnie robot, or a tool or anything else
    """
    
    def __init__(self, port_name, welcome_message="", welcome_message_delay=1, baudrate=115200, timeout=0.1, eol="\r"):
        
        """
        Initializes a devise to communicate through the serial port
        (which is now most likely a USB emulation of a serial port)
        
        Inputs:
            port_name 
                The name of the port through which a robot or a tool is going to communicate.
            welcome_message
                Each device upon initialization is likely to send some first message, usually
                containing name of the device connected. This message is called "welcome message".
                This variable is used to confirm that the actual device connected is what we think is connected.
                Currently unused.
            welcome_message_delay
                Time in seconds to wait for the device to send a welcome message.
                Default is 1 second.
                If time is too short, the welcome message may not fully appear, causing 
                the program to return an error during initialization.
            baudrate
                Speed of communication. For USB 115200 is good. Lower if communication becomes bad.
            timeout
                How long to wait (in seconds) for a device to respond before returning an error
                0.1 seconds is default.
            eol
                character to pass at the end of a line when sending something to the device.
                Default is '\r'
        """
        
        self.port_name = port_name
        logging.info("Port %s: Initialization of a device started.", self.port_name)
        logging.info("Port %s: Expected welcome message:  %s", self.port_name, welcome_message)
        self.eol=eol # Keeps End Of Line that the device expects to receive
        self.recent_message = "" # This variable contains the latest stuff received from the device
        self.description = {"class": serial_device, "message": welcome_message, "type": "", "mobile": False }

        self.openSerialPort(port_name, baudrate, timeout)
        
        # Reading welcome message from the device connected
        logging.info("Port %s: Welcome message received:", self.port_name)
        self.welcome_message = self.readAll(delay=welcome_message_delay)
        # Cleaning input buffer from some extra messages
        self.port.flushInput()

    
    def openSerialPort(self, port_name="", baudrate=115200, timeout=0.1):
        """
        Opens serial port
        """
        # Trying to use specifically provided port_name
        # Otherwise using whatever internal number instance may already have.
        if port_name != "":
            com_port = port_name
            self.port_name = port_name
        else:
            com_port = self.port_name
        # Make sure port is closed
        self.close()
        # Opening robot instance
        self.port = serial.Serial(com_port, baudrate, timeout=timeout)
        logging.info("Port %s: Opened.", self.port_name)
        logging.info("Port %s: baudrate=%s.", self.port_name, baudrate)
        logging.info("Port %s: timeout=%s.", self.port_name, timeout)
        
        
    def close(self):
        """
        Will try to close Arnie port if it is open
        """
        try:
            self.port.close()
            logging.info("Port %s: Closed successfully", self.port_name)
        except:
            logging.info("Port %s: Attempted to close, unsuccessfully", self.port_name)

    
    def findDeviceInList(self, names_list):
        """
        Compares provided names_list with the welcome message, returns 
        list index for the right name.
            Inputs:
                names_list
                    Contains list of welcome messages, or fractions of them, to be
                    compared to the actual welcome message received from the device.
                    One of the name should match fully or partially the welcome message
                    of currently connected device
                    
            Returns:
                right_index
                    Index of a welcome message in names_list, that was matched to
                    the actual welcome message obtained from the device.
                    If nothing was matched, returns None.
        """
        right_index = None
        for name in names_list:
            if re.search(pattern=name, string=self.welcome_message):
                logging.info("Port %s: Successfully matched pattern", self.port_name)
                logging.info(name)
                logging.info("To the welcome message: ")
                logging.info(self.welcome_message)
                
                right_index = names_list.index(name)
        return right_index
    
    
    def write(self, expression, eol=None):
        """
        Sending an expression to a device. Expression is in str format.
        Proper end of line will be sent. If eol specified here, it will be sent
        Otherwise the one specified during initialization will be sent.
        """
        # Cleaning input buffer (so the buffer will contain only response of a device 
        # to the command send within this function)
        self.port.flushInput()
        # Strip all EOL characters for consistency
        expression = expression.strip()
        if eol:
            eol_to_send = eol
        else:
            eol_to_send = self.eol
        # Add end of line
        expression = expression + eol_to_send
        # Encode to binary
        expr_enc = expression.encode()
        
        logging.info("Port %s: Sending message: ", self.port_name)
        logging.info(expression)
        # Writing to the device (robot or a tool)
        self.port.write(expr_enc)
        
        
    # TODO: Inline this function so that people don't circumvent logging by using this function. 
    def _read(self, number_of_bytes=1):
        """
        Same functionality as Serial.read()
        """
        return self.port.read(number_of_bytes).decode("utf-8")
    
    def readAll(self, delay=0.1):
        """
        Function will wait until everything that the device send is read
        """
        # Give time for device to respond
        time.sleep(delay)
        # Wait forever for device to return something
        message=self._read()
        # Continue reading until device output buffer is empty
        while self.port.inWaiting():
            message += self._read()
        
        logging.info("Port %s: Function readAll(): Received message: ", self.port_name)
        logging.info(message)
        return message
    
    def readBufferUntilMatch(self, pattern):
        """
        This function will monitor serial port buffer, until the "pattern" occurs.
        
        Inputs:
            - pattern - any string; put something that is expected to return from serial port
            
        Returns:
            Everything which was read from the buffer before the pattern occurred, including the pattern.
        """
        
        full_message = ""
        while True:
            message = self.port.readline().decode("utf-8")
            full_message += message
            if re.search(pattern=pattern, string=full_message):
                self.recent_message = full_message
                break
        logging.info("Port %s: Function readBufferUntilMatch(): Received message: ", self.port_name)
        logging.info(full_message)
        logging.info("It was successfully matched with pattern %s", pattern)
        return full_message

    # TODO: Rename this into "write", and rename "write"into "write_ignore_response".
    def writeAndWait(self, expression, eol=None, confirm_message='ok\n'):
        """
        Function will write an expression to the device and wait for the proper response.
        
        Use this function to make the devise perform a physical operation and
        make sure program continues after the operation is physically completed.
        
        Function will return an output message
        """
        self.write(expression, eol)
        self.recent_message = self.readBufferUntilMatch(pattern=confirm_message)
        return self.recent_message

In [13]:
s = serial_device('COM4')

In [14]:
s.writeAndWait("G28 Z")

'echo:busy: processing\nX:0.00 Y:0.00 Z:0.00 E:0.00 Count X:0 Y:0 Z:0\nok\n'

In [15]:
s.welcome_message

'start\necho:Marlin 1.1.8\n\necho: Last Updated: 2017-12-25 12:00 | Author: (none, default config)\necho:Compiled: Dec 22 2019\necho: Free Memory: 5437  PlannerBufferBytes: 1232\necho:Hardcoded Default Settings Loaded\necho:  G21    ; Units in mm\n\necho:Filament settings: Disabled\necho:  M200 D3.00\necho:  M200 D0\necho:Steps per unit:\necho:  M92 X20.00 Y20.00 Z25.00 E500.00\necho:Maximum feedrates (units/s):\necho:  M203 X2000.00 Y2000.00 Z4000.00 E25.00\necho:Maximum Acceleration (units/s2):\necho:  M201 X10000 Y10000 Z1000 E10000\necho:Acceleration (units/s2): P<print_accel> R<retract_accel> T<travel_accel>\necho:  M204 P500.00 R10000.00 T500.00\necho:Advanced: S<min_feedrate> T<min_travel_feedrate> B<min_segment_time_us> X<max_xy_jerk> Z<max_z_jerk> E<max_e_jerk>\necho:  M205 S0.00 T0.00 B20000 X2.00 Y2.00 Z2.00 E5.00\necho:Home offset:\necho:  M206 X0.00 Y0.00 Z0.00\necho:PID settings:\necho:  M301 P22.20 I1.08 D114.00\necho:Z-Probe Offset (mm):\necho:  M851 Z0.00\n'

In [16]:
list_of_candiates = ['Pipettor', 'Gripper', 'Marlin']

In [17]:
s.findDeviceInList(list_of_candiates)

2

In [18]:
s.close()

In [23]:
[x for x in {'Arnie': 'Marlin', 'Pipettor_1000': 'Servo'}.values()]

['Marlin', 'Servo']

In [24]:
list({'Arnie': 'Marlin', 'Pipettor_1000': 'Servo'}.values())

['Marlin', 'Servo']

# Testing library

In [1]:
import low_level_comm as llc

In [2]:
s = llc.serial_device('COM3', welcome_message_delay=2)

In [3]:
s.welcome_message

"Arnie's stationary touch probe\r\nRev. 1.0, 11/20/2019\r\n"

In [4]:
s.writeAndWait("G28 Z")

KeyboardInterrupt: 

In [5]:
s.findDeviceInList(['Pipettor', 'Gripper', 'Marlin', 'probe'])

3

In [6]:
s.close()

In [2]:
ports_list = llc.listSerialPorts()

In [3]:
ports_list

['COM3', 'COM4']

In [4]:
llc.matchPortsWithDevices(ports_list, {'Arnie': 'Marlin', 'Pipettor_1000': 'Servo', 'Stationary_probe': 'probe'})

{'Stationary_probe': 'COM3', 'Arnie': 'COM4'}