These classes form the backbone of Alicat operations through serial commmands

In [None]:
try:
    import serial
except ImportError:
    print("An error occurred while attempting to import the pyserial backend. Please check your installation and try again.")


In [None]:
class Serial_Connection(object):
    """The serial connection object from which other classes inherit port and baud settings as
    well as reading and writing capabilities. Preprocessing for GP firmware outputs is performed
    using the remove_characters function"""
    
    # A dictionary storing open serial ports for comparison with new connections being opened
    open_ports = {}
    
    def __init__(self, port='/dev/ttyUSB0', baud=19200):
        
        self.port, self.baud = port, baud
        self.devices = {}
        
        # Checks for the existence of an identical port to avoid multiple instances
        # creates a new one if none exists
        if port in Serial_Connection.open_ports:
            self.connection = Serial_Connection.open_ports[port]
        else:
            self.connection = serial.Serial(port, baud, timeout=2.0)
            Serial_Connection.open_ports[port] = self.connection
        
        self.open = True
        
    def _test_open(self):
        
        if not self.open:
            raise IOError(f"The connection to Alicats on port {self.port} not open")
    
    
    def flush(self):
        """Deletes all characters in the read/write buffer to avoid double messages or rewrites"""
        self._test_open()
        
        self.connection.flush()
        self.connection.flushInput()
        self.connection.flushOutput()

    def close(self):
        """Checks if the instance exists, clears the buffer, closes the connection, and removes
        itself from the dictionary of active ports"""
        if not self.open:
            return
        
        self.flush()
        
        self.connection.close()
        Serial_Connection.open_ports.pop(self.port, None)
        
        self.open = False
        
    def read(self):
        """Fast read method using byte arrays with a carriage return delimiter between lines.
        ~30x faster than a readline operation by reading individual characters and appending 
        them to the byte array. The byte array is then decoded into a string and returned"""
        self._test_open()
        
        line = bytearray()
        while True:
            c = self.connection.read(1)
            if c:
                line += c
                if line[-1] == ord('\r'):
                    break
            else:
                break
        
        return line.decode('ascii').strip()
    
    def write(self, ID, command):
        """Writes the input command for a device with the stated ID by encoding to ascii
        and sending through the serial connection write command. Reads out the return parsing 
        multiple lines until the buffer is clear."""
        command = str(ID) + str(command) + '\r'
        command = command.encode('ascii')
        self.connection.write(command)
        response = []
        response.append(self.read())
        while True:
            if response[-1] != '':
                response.append(self.read())
            else:
                return response[:-1]
            
    def remove_characters(self, array, *args):
        """Removes any set of string characters from an array in place. Used for
        clearing the '/x08' string from arrays returned from the read and write commands"""
        
        filteredarray = []
        
        if isinstance(array[0], list):
            for i in range(len(array)):
                for j in range(len(args)):
                    temp = list(filter((args[j]).__ne__, array[i]))
                filteredarray.append(temp)
        else:
            for i in range(len(args)):
                filteredarray.append(list(filter((args[i]).__ne__, array)))
        del array
        return filteredarray
    
    
    
    

In [None]:
class MassFlowMeter(Serial_Connection):
    """Base class which obtains relevant gas"""
    def __init__(self, ID :str ='A', port : str ='/dev/ttyUSB0', baud : int =19200):
        super().__init__(port, baud)
        self.ID, self.port, self.baud = ID, port, baud
        self.fetch_gas_list()
        self.fetch_firmware_version()
        self.data_format()
    
    def fetch_gas_list(self):
        self.gas_list = {}
        gases = [i.split() for i in self.write(self.ID, '??G*')]
        gases = self.remove_characters(gases, '\x08')
        for i in range(len(gases)):
            self.gas_list[gases[i][2]] = int(gases[i][1][1:])
        
    
    def fetch_firmware_version(self):
        manufacturer_data = [i.split() for i in self.write(self.ID, '??M*')]
        self.manufacturer_data = self.remove_characters(manufacturer_data,'\x08')
        self.firmware_version = self.manufacturer_data[-1][-1]
        
    def data_format(self):
        self.data = []
        outputs = [i.split() for i in self.write(self.ID,'??D*')]
        outputs = self.remove_characters(outputs,'\x08')
        typloc, namloc = 4, 3
        if self.firmware_version[:2] == 'GP':
            typloc, namloc = 3, 2
        for i in range(2,len(outputs)):
            if outputs[i][typloc] != 'string' or outputs[i][namloc] == 'Gas':
                self.data.append([outputs[i][1][1:], outputs[i][2], outputs[i][-1]])
        
        
        

In [None]:
AAA = MassFlowMeter(ID='A', port='COM5', baud=19200)

In [None]:
AAA.write('A','U')

In [None]:
AAA.outputs

In [None]:
AAA.outputs

In [None]:
AAA.data

In [None]:
data = []
for i in range(2,len(out)):
    if out[i][3] != 'string' or out[i][2] == 'Gas':
        data.append([out[i][1][1:], out[i][2]])
        
data

In [None]:
BBB=Serial_Connection(port='COM5", baud=19200')

In [None]:
AAA.gas_list

In [None]:
BBB.write('A','??G*')

In [None]:
list(AAA.gas_list)[int('06')]

In [None]:
AAA.firmware_version


In [None]:
AAA.gas_list['H2']

In [None]:
AAA.flush()
AAA.close()

In [None]:
BBB.close()