In [None]:
!pip install crcmod

Collecting crcmod
  Downloading crcmod-1.7.tar.gz (89 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/89.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: crcmod
  Building wheel for crcmod (setup.py) ... [?25l[?25hdone
  Created wheel for crcmod: filename=crcmod-1.7-cp310-cp310-linux_x86_64.whl size=31407 sha256=692f7aa9b0c19bcf2a2da15ecb951a9e3f72abc8593ee94f741c6d5a08b58aaa
  Stored in directory: /root/.cache/pip/wheels/85/4c/07/72215c529bd59d67e3dac29711d7aba1b692f543c808ba9e86
Successfully built crcmod
Installing collected packages: crcmod
Successfully installed crcmod-1.7


In [None]:
import os
import yaml
import datetime
import pytz
import crcmod.predefined
import pandas as pd
import numpy as np

In [None]:
def read_bit_file(path, file_name):
  '''
  Utility function to read Honeywell bitstreams.
  '''
  temp_array = []
  with open(path+file_name, "rb") as file:
    for line in file:
      decode_line = line.decode().strip()
      if len(decode_line)==64:
        temp_array.append(decode_line)
    return temp_array, file_name[:-4]

def byte_extraction(bitstring):
  byte_array = []
  for i in range(8):
    l_idx = i*8
    r_idx = l_idx + 8
    _byte = bitstring[l_idx:r_idx]
    byte_array.append(_byte)
  return byte_array

def byte_decode(bitstring, preamble='1111111111111110'):
  '''
  Accepts a bit string containing 8 bytes from GNU Radio
  Verifies preamble, and returns device ID and status bytes
  '''
  byte_array = []
  for i in range(8):
    l_idx = i*8
    r_idx = l_idx + 8
    _byte = bitstring[l_idx:r_idx]
    byte_array.append(_byte)

  preamble_bits = byte_array[0] + byte_array[1]
  device_id = byte_array[2] + byte_array[3] + byte_array[4]
  status = byte_array[5]
  crc_byte_0 = byte_array[6]
  crc_byte_1 = byte_array[7]
  crc_pass = crc_check(device_id, status, crc_byte_0, crc_byte_1)
  if preamble==preamble_bits and crc_pass:
    return device_id, status
  else:
    return

def create_event_timestamp(tz='US/Eastern'):
  '''
  Input pytz timezone as a string.
  Returns timestamp of event.
  '''
  format = "%Y-%m-%d %H:%M:%S"
  now = datetime.datetime.now(pytz.timezone(tz))
  return now.strftime(format)

def crc_check(_device_id, _status, _crc_byte_0, _crc_byte_1,
              crc_alg='crc-16-buypass'):

  crc16buypass_func = crcmod.predefined.mkCrcFun(crc_alg)
  int_check = []
  for i in range(0, len(_device_id), 8):
    int_check.append(int(_device_id[i : i + 8], 2))
  int_check.append(int(_status[:8], 2))

  crc_check_0 = int(_crc_byte_0[:8], 2)
  crc_check_1 = int(_crc_byte_1[:8], 2)

  int_check = []

  for i in range(0, len(_device_id), 8):
    int_check.append(int(_device_id[i : i + 8], 2))
  int_check.append(int(_status[:8], 2))
  int_check

  crc_check_0 = int(_crc_byte_0[:8], 2)
  crc_check_1 = int(_crc_byte_1[:8], 2)

  crc = crc16buypass_func(bytearray(int_check))
  if crc & 0xFF == crc_check_1 and (crc & 0xFF00)>>8 == crc_check_0:
    # print('CRC passed')
    return True
  else:
    # TODO log failed CRC checks
    return False

In [None]:
path = '/content/drive/MyDrive/honeywell_test/bitstreams/'
files = os.listdir(path)

In [None]:
bit_array, _name = read_bit_file(path, files[0])
name_array = [_name]*len(bit_array)

for _file in files[1:]:
  temp_bit_array, temp_name = read_bit_file(path, _file)
  bit_array += temp_bit_array
  name_array += [temp_name]*len(temp_bit_array)

In [None]:
df = pd.DataFrame(data={'bit_stream':bit_array, 'file_name':name_array})

In [None]:
l_idx = 54
r_idx = l_idx+8
df.bit_stream.iloc[0][l_idx:r_idx]

'10001000'

In [None]:
extracted_byte_array = []

for item in df.bit_stream:
  extracted_byte_array.append(byte_extraction(item))

byte_cols = ['byte_0', 'byte_1', 'byte_2', 'byte_3', 'byte_4',
             'byte_5', 'byte_6', 'byte_7']
byte_df = pd.DataFrame(np.vstack(extracted_byte_array), columns=byte_cols)

df = df.join(byte_df)

df['preamble'] = df.byte_0+df.byte_1
df['device_id'] = df.byte_2 + df.byte_3 + df.byte_4
df['status'] = df.byte_5
df['crc'] = df.byte_6 + df.byte_7

In [None]:
garage_df = df[(df.file_name=='garage_open_close')].copy()
back_df = df[(df.file_name=='back_door_open_close')].copy()

In [None]:
open_close_df = df[(df.file_name=='back_door_open_close') | (df.file_name=='garage_open_close') |
                   (df.file_name=='back_window_left_open_close') |
                   (df.file_name=='back_window_right_open_close') |
                   (df.file_name=='front_door_open_close')].copy()

In [None]:
open_close_df.groupby('file_name').status.value_counts()

file_name                     status  
back_door_open_close          10100000    35
                              10000000    24
back_window_left_open_close   00100000    36
                              00000000    25
back_window_right_open_close  10000000    35
                              00000000    25
                              10001000     1
front_door_open_close         10100000    35
                              10000000    26
garage_open_close             10100000    34
                              10000000    24
Name: status, dtype: int64

In [None]:
df.status.value_counts()

10100000    115
10000000    109
00000000     50
00100000     36
11100000     10
10001000      1
Name: status, dtype: int64

In [None]:
open_close_df[open_close_df.file_name=='back_window_left_open_close'].status.value_counts()

00100000    36
00000000    25
Name: status, dtype: int64

Example of reading df and producing message

In [None]:
yaml_path = '/content/drive/MyDrive/honeywell_test/device_details.yaml'
with open(yaml_path, 'r') as stream:
    byte_details = yaml.safe_load(stream)

In [None]:
df_idx = np.random.randint(df.shape[0])
device_message = f'Device: {byte_details[df.iloc[df_idx].device_id]}'
status_message = f'Status: {byte_details[df.iloc[df_idx].status]}'

print(device_message, '\n'+status_message)

Device: garage door 
Status: open


In [None]:
gnu_bits = df.bit_stream[300]

if len(gnu_bits)==64:
  device_id, status = byte_decode(gnu_bits)

  device_message = f'Device: {byte_details[device_id]}'
  status_message = f'Status: {byte_details[status]}'

  print(create_event_timestamp())
  print(device_message, '\n'+status_message)

2023-11-24 16:44:53
Device: back window 
Status: right or left side closed
