In [1]:
import struct
from protocol import *

In [2]:
# https://github.com/Turb0Pasca1
# documentation from 24.05.2023
# https://www.mrc-systems.de/downloads/de/laser-strahlstabilisierung/BA-digital-communication-interface_ver8_2.pdf

In [10]:
def get_formatter_str(fields):
    # build struct format string
    # high byte first
    fmt = '>'  
    for field in fields:
        if field not in RETURN_VALUE_STRUCT_MAP:
            raise ValueError(f'Field {field} not in return_value_struct_map')
        fmt += RETURN_VALUE_STRUCT_MAP[field]
    return fmt

In [13]:
command = 'S1S'
fields = COMMAND_RESPONSE_MAP[command]
fmt = get_formatter_str(fields)
length = struct.calcsize(fmt)
struct.calcsize(fmt)


25

In [31]:
len(test_GER)

fields = self.command_response_map[command]
    
        # build struct format string
        fmt = '>'
        for field in fields:
            if field not in self.return_value_struct_map:
                raise ValueError(f'Field {field} not in return_value_struct_map')
            fmt += self.return_value_struct_map[field]

7

In [3]:
test_S1S       = b'\x00;\x01\x00\x00\x06\x00\x06\x00,\x00\x08\x00\x06\x00%\x13\xcb\x13\xc7\x13\xc7\x13\xc0;'
test_S1S2      =                  b"\x06\x00\x06\x00,\x00\x08\x00\x06\x00'\x13\xcc\x13\xc7\x13\xc7\x13\xc4;"
test_GID       = b'\x00;MRC DIG-AD-DA D0941BA1281 E2-Digital-V031-10256;'
test_GER       = b'\x00;CLS\xf9;'

In [38]:
class ProtocolDecoder:
    # communication protocol information
    command_parameter_struct_map = COMMAND_PARAMETER_STRUCT_MAP
    return_value_struct_map      = RETURN_VALUE_STRUCT_MAP
    ascii_keys                   = ASCII_KEYS
    command_response_map         = COMMAND_RESPONSE_MAP
    error_code_map               = ERROR_CODE_MAP
    error_description_map        = ERROR_DESCRIPTION_MAP

    def __init__(self, connection):
        self.connection = connection

    def send_command(self, cmd: bytes):
        self.connection.write(cmd)

    def receive(self, size: int) -> bytes:
        return self.connection.read(size)
        
    def decode_response(self, reply, command):
        '''
        decode_response()
        - decodes command response

        return dict of command return keys and values
        '''
        # check command validity
        if command not in self.command_response_map:
            raise ValueError(f'Unknown command {command}')
    
        fields = self.command_response_map[command]
    
        # build struct format string
        fmt = '>'
        for field in fields:
            if field not in self.return_value_struct_map:
                raise ValueError(f'Field {field} not in return_value_struct_map')
            fmt += self.return_value_struct_map[field]

        # check reply length with expected length
        if struct.calcsize(fmt) != len(reply):
            raise ValueError(f'Length of reply {len(reply)} byte does not match length of expected "{command}" length of {struct.calcsize(fmt)} byte.')
          
        # unpack
        unpacked = struct.unpack(fmt, reply)
        result = dict(zip(fields, unpacked))
    
        # handle special cases of keys
        for key, val in result.items():
            # StatusFlag: convert to bit dictionary
            if key == 'StatusFlag':
                bits = format(val, '08b')
                result['StatusFlag'] = dict(zip(self.command_response_map.get('StatusFlag', []), bits))
                continue
    
            # decode fields received as ascii
            if key in self.ascii_keys:
                if isinstance(val, (bytes, bytearray)):
                    result[key] = val.rstrip(b'\x00').decode('ascii')
                elif isinstance(val, int) and 0 <= val <= 127:
                    result[key] = chr(val)
                continue
    
            # map error code
            if key == 'e':
                code = val
                if isinstance(code, (bytes, bytearray)) and len(code) == 1:
                    code = code[0]
    
                error_name = self.error_code_map.get(code, f'UnknownError_0x{code:02X}')
                error_description = self.error_description_map.get(code, 'No description available.')
    
                result['e'] = {
                    'ErrorCode': f'0x{code:02X}',
                    'ErrorName': error_name,
                    'ErrorDescription': error_description
                }
                continue
    
        return result


In [40]:
decoder = ProtocolDecoder()
decoder.decode_response(test_S1S, 'S1S;')

25


{'fe': 0,
 'semi_fe': 59,
 'StatusFlag': {'EF': '0',
  'A2': '0',
  'A1': '0',
  'OnOff2': '0',
  'OnOff1': '0',
  'Adj2': '0',
  'Adj1': '0',
  'PF': '1'},
 'ResByte': 0,
 'DX1': 6,
 'DY1': 6,
 'DI1': 44,
 'DX2': 8,
 'DY2': 6,
 'DI2': 37,
 'RX1': 5067,
 'RY1': 5063,
 'RX2': 5063,
 'RY2': 5056,
 'semi_end': 59}

In [72]:
res = decode_response(test_GER, 'GER;', return_values_struct_map, command_response_map, ASCII_KEYS)

In [65]:
res['Device_id'].decode('ascii')

'MRC DIG-AD-DA D0941BA1281 E2-Digital-V031-10256'

In [73]:
print(res)

{'fe': 0, 'semi_fe': 59, 'CMD': 'CLS', 'e': b'\xf9', 'semi_end': 59}
