# Si446x Device Direct Access Byte File (Dblk and Panic)

In [None]:
from __future__ import print_function
from builtins import *                  # python3 types
from time import sleep
from datetime import datetime
import struct as pystruct
from binascii import hexlify
import os.path

In [None]:
!pwd
%autosave 0
import sys
sys.path.append("../si446x/si446x")
%run '../si446x/si446x/notebooks/si446x_Device_Layer.ipynb'

In [None]:
import sys
sys.path.append("../tagnet/tagnet")
from tagmessages import TagMessage, TagPoll, TagGet, TagPut, TagDelete, TagHead
from tagnames import TagName
from tagtlv import TagTlv, TagTlvList, tlv_types

In [None]:
import datetime
print('Test Start Time: {}'.format(datetime.datetime.now()))
print('Si446x Radio Device Driver Version: {}'.format(si446x_device_version()))

In [None]:
sys.path.append("../tagfuse/tagfuse")
from Si446xFile import si446x_device_enable, file_get_bytes, file_put_bytes, file_update_attrs, dblk_put_note

##  Start up and Radio

In [None]:
radio = si446x_device_enable()

## File Data Verification

In [None]:
# default paramters
MAX_WAIT            = 10
MAX_RECV            = 255
MAX_PAYLOAD         = 254
MAX_RETRIES         = 10
RADIO_POWER         = 100
SHORT_DELAY         = 0
from datetime import datetime

In [None]:
BLOCK_SIZE = 512
def verify_file(radio, check_fname, dev, unit, skip=0):
    with open(check_fname, 'rb') as fd:
        start  = datetime.now()
        eof    = False
        offset  = 0
        while not eof:
            fd.seek(offset+skip)
            dbuf = bytearray(fd.read(BLOCK_SIZE))
            sbuf, eof = file_get_bytes(radio, dev, unit, BLOCK_SIZE, offset)
            if sbuf:
                for ix in range(len(sbuf)):
                    if sbuf[ix] != dbuf[ix]:
                        print(hexlify(sbuf))
                        print(hexlify(dbuf))
                        print("\r{} mismatch: offset:{}, {}/{}".format(datetime.now() - start,
                                                            offset+ix,
                                                            hexlify(sbuf[ix-4:ix+4]),
                                                            hexlify(dbuf[ix-4:ix+4])))
                        break
                offset += len(sbuf)
            elif not eof:
                print("\r{} {}".format(datetime.now() - start, 'timeout'), end="")
            sleep(0)
            print("\r{} {}".format(datetime.now() - start, offset), end="")
    print("\ntime: {}, offset: {}".format(datetime.now() - start, offset))

In [None]:
STOP

## Verify Dblk File

In [None]:
verify_file(radio, '/home/pi/o/mm/dblk.data', 'dblk', '0')

## Verify Panic File

In [None]:
verify_file(radio, '/home/pi/o/mm/panic.data', 'panic', '2', skip=512)

## Put Note

In [None]:
dblk_put_note(radio, 'hello')

## Update File Status

In [None]:
file_update_attrs(radio, ['panic', 'byte', 3], {})

In [None]:
file_update_attrs(radio, ['dblk', 'byte', '0'], {})

In [None]:
file_update_attrs(radio, ['dblk', 'note'], {})

## Extra

In [None]:
STOP

In [None]:
buf, eof = file_get_bytes(radio, 'dblk', 0, 512, 512*4)

In [None]:
print(len(buf), eof)

In [None]:
print(hexlify(buf))

In [None]:
BLOCK_SIZE = 512

with open('/home/pi/o/mm/dblk.data', 'rb') as fd:
    start  = datetime.now()
    eof    = False
    offset  = 0
    while not eof:
        fd.seek(offset)
        dbuf = bytearray(fd.read(BLOCK_SIZE))
        sbuf, eof = dblk_get_bytes(radio, BLOCK_SIZE, offset)
        if sbuf:
            for ix in range(len(sbuf)):
                if sbuf[ix] != dbuf[ix]:
                    print("\r{} mismatch: offset:{}, {}/{}".format(datetime.now() - start,
                                                        offset+ix,
                                                        hexlify(sbuf[ix-4:ix+4]),
                                                        hexlify(dbuf[ix-4:ix+4])))
                    break
            offset += len(sbuf)
        elif not eof:
            print("\r{} {}".format(datetime.now() - start, 'timeout'), end="")
        sleep(0)
        print("\r{} {}".format(datetime.now() - start, offset), end="")
print("\ntime: {}, offset: {}".format(datetime.now() - start, offset))

In [None]:
print(ix, offset)
print(hexlify(dbuf))
print(hexlify(sbuf))

In [None]:
buf, eof = file_get_bytes(radio, 'panic', 1, 2, 76799)

In [None]:
print(len(buf), eof)

In [None]:
print(hexlify(buf))

## Get Chip Status

In [None]:
print(radio.get_chip_status())

## Get Image Directory

In [None]:
#<node_id>   "tag"  "sd"  0  "img"
image_manager_name = TagName (TagTlv(tlv_types.NODE_ID, -1),
                              'tag',
                              'sd',
                              TagTlv(0),
                              TagTlv('img'))
dir_info = TagGet(image_manager_name)
#print(dir_info.name)
dir_msg = dir_info.build()
#print(len(dir_msg),hexlify(dir_msg))
si446x_device_send_msg(radio, dir_msg, RADIO_POWER);
rsp_buf, rssi, status = si446x_device_receive_msg(radio, MAX_RECV, MAX_WAIT)
if (rsp_buf):
#    print(len(rsp_buf),hexlify(rsp_buf))
    rsp_obj = TagMessage(rsp_buf)
    for x in range(0, 8, 2):
        print("state: {}, {}".format(rsp_obj.payload[x+1].value(), rsp_obj.payload[x]))
else:
    print('timeout')

In [None]:
#"tag"	"sys"	<node_id>	"which"
def get_version(which):
    sys_name = TagName (TagTlv(tlv_types.NODE_ID, -1),
                        'tag',
                        'sys',
                        TagTlv(which))
    sys_obj = TagGet(sys_name)
#    print(sys_obj.name)
    get_msg = sys_obj.build()
    si446x_device_send_msg(radio, get_msg, RADIO_POWER);
    rsp_buf, rssi, status = si446x_device_receive_msg(radio, MAX_RECV, 5)
    if(rsp_buf):
#        print(hexlify(rsp_buf))
        rsp_obj = TagMessage(rsp_buf)
        print("{}: {:^10} state: {}, {}".format(rsp_obj.header.options.param.error_code, which, rsp_obj.payload[1].value(), rsp_obj.payload[0]))

In [None]:
get_version('active')
get_version('backup')
get_version('golden')
get_version('nib')
get_version('running')

In [None]:
#"tag"	"sys"	<node_id>	"which"
def set_version(which, version):
    set_name = TagName (TagTlv(tlv_types.NODE_ID, -1),
                        'tag',
                        'sys',
                        TagTlv(which),
                        TagTlv(tlv_types.VERSION, version))
    set_obj = TagPut(set_name)
#    print(set_obj.name)
    set_msg = set_obj.build()
    si446x_device_send_msg(radio, set_msg, RADIO_POWER);
    rsp_buf, rssi, status = si446x_device_receive_msg(radio, MAX_RECV, 5)
    if(rsp_buf):
#        print(hexlify(rsp_buf))
        rsp_obj = TagMessage(rsp_buf)
#        print(rsp_obj.header)
        if (rsp_obj.payload):
            print("{}: state: {}, {}".format(rsp_obj.header.options.param.error_code, rsp_obj.payload[1].value(), rsp_obj.payload[0]))
        else:
            print("{}".format(rsp_obj.header.options.param.error_code))

In [None]:
set_version('active', (118, 16, 0))

In [None]:
set_version('backup', (32, 16, 0))

In [None]:
set_version('running', (125, 1, 0))

## Interactive Group Properties

In [None]:
from si446xdef import *


def si446x_device_group_fetch_and_decode(group):
    gname = radio_config_group_ids.decoding[group]
    g_s = radio_config_groups[radio_config_group_ids.build(gname)]
    gid = radio_config_group_ids.build(gname)
    p = si446x_device_get_property(radio, gname, 0, g_s.sizeof())
    print(g_s, insert_space(p))
    #print(gname, len(p), hexlify(p))
    print(radio_display_structs[g_s](g_s, p))
    return None


def si446x_device_command_fetch_and_decode(cmd):
    cname = radio_status_cmd_ids.decoding[cmd]
    cfunc, cstr = radio_status_commands[radio_config_cmd_ids.build(cname)]
#    print(cname, cmd)
    if (cfunc):
        cmd = cfunc(cname)
        if (cmd):
#            print(cfunc, hexlify(cmd), cstr)
            radio.spi.command(cmd, cstr)
            rsp = radio.spi.response(cstr.sizeof(), cstr.name)
            if (rsp):
#                print(cstr, radio_display_structs[cstr])
                print(cstr, insert_space(rsp))
                print(radio_display_structs[cstr](cstr, rsp))
            else:
                print('no response')
        else:
            print('no command')
    else:
        print('no function')
    return None

In [None]:
interact(si446x_device_group_fetch_and_decode, group=radio_config_group_ids.encoding)

## Interactive  Command Status Responses

In [None]:
interact(si446x_device_command_fetch_and_decode, cmd=radio_status_cmd_ids.encoding)

In [None]:
from datetime import datetime
datetime.now()