In [None]:
# Reference: https://github.com/labrad/servers/blob/master/ADR/compressor_server.py

import serial

# codes for compressor variables
HASHCODES = {
    # misc
    'CODE_SUM': 0x2B0D, # Firmware checksum
    'MEM_LOSS': 0x801A, # TRUE if nonvolatile memory was lost
    'CPU_TEMP': 0x3574, # CPU temperature (0.1 C)
    'BATT_OK': 0xA37A, # TRUE if clock OK
    'BATT_LOW': 0x0B8B, # TRUE if clock battery low
    'COMP_MINUTES': 0x454C, # Elapsed compressor minutes
    'MOTOR_CURR_A': 0x638B, # Compressor motor current draw, in Amps

    # temperatures
    'TEMP_TNTH_DEG': 0x0D8F, # temperatures (0.1 C)
    'TEMP_TNTH_DEG_MINS': 0x6E58, # minimum temps seen (0.1 C)
    'TEMP_TNTH_DEG_MAXES': 0x8A1C, # maximum temps seen (0.1 C)
    'TEMP_ERR_ANY': 0x6E2D, # TRUE if any temperature sensor has failed

    # pressures
    'PRES_TNTH_PSI': 0xAA50, # low/high side pressures (0.1 PSIA)
    'PRES_TNTH_PSI_MINS': 0x5E0B, # minimum pressures seen (0.1 PSIA)
    'PRES_TNTH_PSI_MAXES': 0x7A62, # maximum pressures seen (0.1 PSIA)
    'PRES_ERR_ANY': 0xF82B, # TRUE if any pressure sensor has failed
    'H_ALP': 0xBB94, # average low-side pressure (0.1 PSIA)
    'H_AHP': 0x7E90, # average high-side pressure (0.1 PSIA)
    'H_ADP': 0x319C, # average delta pressure (0.1 PSIA)
    'H_DPAC': 0x66FA, # 1st deriv. of high side pressure, "bounce" (0.1 PSIA)

    'CLR_TEMP_PRES_MMMARKERS': 0xD3DB, # reset pres/temp min/max markers

    # compressor control and status
    'EV_START_COMP_REM': 0xD501, # start compressor
    'EV_STOP_COMP_REM': 0xC598, # stop compressor
    'COMP_ON': 0x5F95, # TRUE if compressor is on
    'ERR_CODE_STATUS': 0x65A4, # non-zero value indicates an error code
    }

# SMDP functions (Sycon Multi-Drop Protocol)
def checksum(data):
    """Compute checksum for Sycon Multi Drop Protocol."""
    ck = sum(data) % 256
    cksum1 = (ck >> 4) + 0x30
    cksum2 = (ck & 0xF) + 0x30
    return [cksum1, cksum2]

def stuff(data):
    """Escape the data to be sent to compressor."""
    XLATE = {0x02: 0x30, 0x0D: 0x31, 0x07: 0x32}
    out = []
    for c in data:
        if c in XLATE:
            out.extend([ESC, XLATE[c]])
        else:
            out.append(c)
    return out

def unstuff(data):
    """Unescape data coming back from the compressor."""
    XLATE = {0x30: 0x02, 0x31: 0x0D, 0x32: 0x07}
    out = []
    escape = False
    for c in data:
        if escape:
            out.append(XLATE[c])
            escape = False
        elif c == ESC:
            escape = True
        else:
            out.append(c)
    return out

def pack(data):
    """Make a packet to send data to the compressor.
    
    We use the Sycon Multi Drop Protocol (see docs on skynet).
    """
    chk = checksum([ADDR, CMD_RSP] + data)
    pkt = [ADDR, CMD_RSP] + stuff(data)
    return [STX] + pkt + chk + [CR]

def unpack(response):
    """pull binary data out of SMDP response packet"""
    if isinstance(response, str):
        response = [ord(c) for c in response]
    if response[-1] == CR:
        response = response[:-1]
    rsp = response[2] & 0xF # response code (see SMDP docs)
    # drop 3 byte header (STX, ADDR, CMD_RSP) and 2 byte checksum
    data = unstuff(response[3:-2])
    return data


class CP2800(object):
    """
    CP2800 commpressor instrument driver
    """
    
    def __init_(self, port):
        self.instr = serial.Serial(port)
    
    def __enter__(self):
        return self.instr
    
    def __exit(self, type, value, traceback):
        self.instr.close()
        
    def packet_for_read(self, hashcode, index=0):
        """Make a packet to read a variable."""
        return pack([0x63] + self.toBytes(hashcode, count=2) + [index])

    def packet_for_write(self, hashcode, value, index=0):
        """Make a packet to write a variable."""
        return pack([0x61] + self.toBytes(hashcode, count=2) + [index] + self.toBytes(value))

    def getValue(self, resp):
        """Get an integer value from a response packet."""
        data = unpack(resp)
        return self.fromBytes(data[-4:])

    def toBytes(self, n, count=4):
        """Turn an int into a list of bytes."""
        return [(n >> (8*i)) & 0xFF for i in reversed(range(count))]

    def fromBytes(self, b, count=4):
        """Turn a list of bytes into an int."""
        return sum(d << (8*i) for d, i in zip(b, reversed(range(count))))
    
    
    # Use key and array index to read or write data dictionary value from/to compressor
    def read_dictionary_value(self, key, index=0):
        """ 
        Read data dictionary value from compressor
        Args: 
            key : key in HASHCODES dictionary
            index: index of array, if there is an array.
        """
        packet_to_send = self.packet_for_read(HASHCODES[key], index)
        self.instr.write(packet_to_send)
        packet_returned = self.instr.read_until('\r')
        return self.getValue(packet_returned)
   
    def write_dictionary_value(self, key, value, index=0):
        """ 
        write value to compressor
        Args: 
            key : key in HASHCODES dictionary
            index: index of array, if there is an array.
        """
        packet_to_send = self.packet_for_write(HASHCODES[key], value, index)
        self.instr.write(packet_to_send)
    
    

In [None]:
# test code
with cp = CP2800('COM1'):
    cp.read_dictionary_value('TEMP_TNTH_DEG', index=3)
    