# Aport class for I2C communication

This notebook contains my Aport class and a description of how to use it. It allows you to easily read and write commands to an Aport while allowing you a lot of control over integrating reads and writes into new and existing Python scripts.

The class has three dependencies, imported below: serial, serial.tools.list_ports, and time. Be sure to import them before using the class.

In [1]:
import serial
import serial.tools.list_ports
import time

The **Aport** class itself is contained entirely in the following cell. Run it once to initialize, then proceed to the examples below.

In [2]:
class Aport:
    def __init__(self, port=None):
        self.port = port
        self.register = None
        self.configure()
    
    def configure(self):
        """Configures flow control for Aport without opening."""
        self.ser = serial.Serial(
            baudrate = 115200,
            bytesize = serial.EIGHTBITS,
            parity = serial.PARITY_NONE,
            stopbits = serial.STOPBITS_ONE,
            timeout = 0.05, #2              #timeout block read
            xonxoff = False,     #disable software flow control
            rtscts = True,     #disable hardware (RTS/CTS) flow control
            dsrdtr = True)#,       #disable hardware (DSR/DTR) flow control
            #writeTimeout = 0) #2     #timeout for write
        return
    
    def flush_input(self):
        """Flushes the Aport input buffer."""
        self.ser.flushInput()
        return
    
    def flush_output(self):
        """Flushes the Aport output buffer."""
        self.ser.flushOutput()
        return
    
    def flush(self):
        """Flushes both the Aport input buffer and output buffer."""
        self.flush_input()
        self.flush_output()
    
    def is_open(self):
        """Returns 'True' if the Aport is open or 'False' if the Aport is closed."""
        return self.ser.isOpen()
    
    def open(self):
        """Opens connection to Aport. Raises an exception if connection fails."""
        if self.is_open():
            try: self.flush() #flush input and output buffer
            except: raise Exception('Failed to open port')
        elif self.port:
            self.ser.port = self.port
            try: self.ser.open()
            except: raise Exception('Failed to open port')
        else:
            raise Exception('Port not set')
        return
            
    def close(self):
        """Closes connection to Aport."""
        if self.is_open():
            self.ser.close()
        else: pass
        return
    
    def read(self, address, numBytes=1):
        """Reads the number of bytes specified by 'numBytes' in decimal (default value 1) beginning at the address specified by 'address' in hex"""
        if not self.is_open():
            raise Exception('Aport not open')
            return
        
        if numBytes > 255: numBytes = 255
            
        address = '/T~' + '{0:0>2}'.format(hex(address)[2:]).upper() + '\r'
        address = address.encode('latin 1')
        
        self.ser.write(address)
        time.sleep(0.05)
        response = self.ser.read(100).decode('latin_1')
        if '/MTC' not in response: raise Exception("Failed to set read address")
        self.flush()
        
        message = '/R' + '{0:0>3}'.format(numBytes) + '\r'
        message = message.encode('latin 1')
        
        self.ser.write(message)
        time.sleep(numBytes/255)
        response = self.ser.read(1000).decode('latin_1')
        
        if '/MRC' not in response: raise Exception("Failed to read memory")
        
        response = response[response.find('~')+1:response.rfind('~')+3].split('~')
        return response
    
    def write(self, address, data):
        """Writes one or more bytes contained in 'data' beginning at the address specified by 'address' in hex."""
        if not self.is_open():
            raise Exception('Aport not open')
            return
        self.flush() #initial flush
        address = '/T~' + '{0:0>2}'.format(hex(address)[2:]).upper()
        
        if isinstance(data,list):
            data = ''.join(['~' + '{0:0>2}'.format(hex(i)[2:]).upper() for i in data]) #formatting and adding separator, then concatenating
        else:
            data = '~' + '{0:0>2}'.format(hex(data)[2:]).upper() #formatting and adding separator
        
        message = address + data
        message = message.encode('latin 1')
        
        self.ser.write(message)
        time.sleep(0.05) #TODO: make this more dynamic
        response = self.ser.read(1000).decode('latin_1')
        
        if '/MTC' not in response or '*' not in response: raise Exception("Failed to write to memory")
        
        return
  
    def set_register(self, register): #example register = 0xA2
        """Sets device registry address to the value passed in 'register' in hex."""
        if not self.is_open():
            raise Exception('Aport not open')
            return
        if self.register == register:
            return
        else:
            self.register = register
            message = '/D' + '{0:0>2}'.format(hex(register)[2:]).upper() + '\r'
            message = message.encode('latin 1')
            
            self.ser.write(message)

            response = self.ser.read(100).decode('latin_1')
            if '*' not in response: raise Exception('Failed to set register')
            
            return
    
    def detect_port(self):
        """Finds Aport and sets port variable to 'COMX' where X is the COM port number. 'COMX' is then returned."""
        self.port = None
        for device in serial.tools.list_ports.comports():
            description = device.description
            if len(description) < 12: pass #ignores descriptions too short to be Aport
            elif description[0:5] == 'Aport': self.port = description[description.find('(')+1:description.rfind(')')]
        return self.port

# Initialization and connection
You're now ready to use the **Aport** class. The first step is creating an Aport object and giving it a name.

In [3]:
aport1 = Aport()

The **Aport** class has a class variable called _port_ that stores the current COM port the Aport is occupying. We query it by calling it off our object.

In [4]:
port = aport1.port
print('Current port:', port)

Current port: None


By default, the **Aport** object initializes its port to _None_. Use the **detect_port()** method to auto-detect which COM port the Aport is currently occupying. The method returns the detected COM port, or _None_ if none is found, and stores this value in the _port_ variable.

In [5]:
detected_port = aport1.detect_port()
port = aport1.port
print('Detected port:',detected_port)
print('Current port:',port)

Detected port: COM14
Current port: COM14


You can also manually pass the port number into the **Aport** object when creating it to bypass port detection.

In [6]:
aport2 = Aport('COM7') #initializing aport2 with COM7
port = aport2.port
print('Current port:', port)

Current port: COM7


Once the **Aport** object has had its port number set, use the **open()** method to open the serial connection to the Aport. If the port number has not been set, the method throws an exception: "Port not set." If the port has been set and connection fails, the method throws an exception: "Failed to open port."

You can also use the **is_open()** method to check if the Aport is open.

In [7]:
aport1.open()
print('Aport 1 is open:',aport1.is_open())

Aport 1 is open: True


# Reading and writing

Now that we've opened our connection, we set our initial register using the **set_register()** method. Simply pass in the address in hex using the format '0x00'. The method then stores the value in the _register_ variable. If the Aport has not been opened or the register fails to write, the method throws an exception.

In [8]:
aport1.set_register(0xB0) #setting register to 0xB0
register = aport1.register
print('Current register:',register)

Current register: 176


To read data through the Aport, use the **read()** method. This method takes two arguments: the start address for the read, and the number of bytes to read. The number of bytes to read is capped at 255: if you enter a higher value, it will be truncated. The method will default to reading one byte if no value is given. The method returns the read bytes in a list, even if only one byte was requested.

In [9]:
readOne = aport1.read(0xFD) #reading one byte
readMultiple = aport1.read(0xFD,3) #reading three bytes
print('One byte read from 0xFD:',readOne)
print('Three bytes read from 0xFD:',readMultiple)

One byte read from 0xFD: ['FF']
Three bytes read from 0xFD: ['FF', 'FF', 'FF']


To write data through the Aport, use the **write()** method. This method takes two arguments: the start address for the write, and either a single byte to write, or a list of bytes. Note that you may have to write the password bytes before you are granted permission to write some addresses.

In [10]:
#writing single bytes
aport1.write(0xFD,0x01) #write 0x01 to address 0xFD
aport1.write(0xFE,0x23) #write 0x23 to address 0xFE
aport1.write(0xFF,0x45) #write 0x34 to address 0xFF

readValues = aport1.read(0xFD,3)
print('Individual write:',readValues)

#writing multiple bytes
aport1.write(0xFD,[0xAB,0xCD,0xEF]) #write 0x67 to address 0xFD, 0x89 to 0xFE, and 0xAB to 0xFF

readValues = aport1.read(0xFD,3)
print('Multiple write:',readValues)

Individual write: ['01', '23', '45']
Multiple write: ['AB', 'CD', 'EF']


# Closing the Aport

If you plan on reconnecting to an Aport without power cycling it, remember to close the Aport when you are done using it. Simply call the **close()** method to do so. It is often handy to incorporate this command at the end of a script.

In [11]:
aport1.close()
print('Aport 1 is open:',aport1.is_open())

Aport 1 is open: False
