In [1]:
from typing import Iterator

from openpilot.tools.lib.logreader import LogReader

IDENTIFIERS = [
  "0b9d9ff7c65286db/00000004--52d36c6566",
  "0b9d9ff7c65286db/00000005--db16a09ea1",
]


def collect_can_frames(identifier: str, bus: int) -> Iterator[tuple[float, int, bytes]]:
  t0 = None
  for evt in LogReader(identifier):
    which = evt.which()
    if which != 'can':
      if t0 is None and which != 'initData':
        t0 = evt.logMonoTime
      continue
    for can in evt.can:
      # if can.address != address:
      #   continue
      if can.src != bus:
        continue
      t = (evt.logMonoTime - t0) / 1e9
      yield t, can.address, can.dat


LANE_KEEP_ASSIST = 0x3F2
BUS = 2

frames = []
for identifier in IDENTIFIERS:
  frames.extend(collect_can_frames(identifier, BUS))

len(frames)



1444855

In [2]:
import pandas as pd

df = pd.DataFrame(frames, columns=['t', 'addr', 'dat']).drop_duplicates(subset=['addr', 'dat'])
df.head()

Unnamed: 0,t,addr,dat
0,-0.473895,973,"b'\x00\x06\x00\x00\x00""`\x8b'"
1,-0.473895,1037,b'\x00\x00\x00\x00\x00\x00\x00\x05'
2,-0.473895,520,b'\x1aP2\x00@222'
3,-0.473895,1416,b'\x00\xff\x00\x000U\x00\x06'
4,-0.473895,114,b'\x02\x00\x00\x00\x00'


In [3]:
checksum_byte_index = {
  0x2ED: 1,
  0x2F5: 1,
  0x305: 4,
  0x38D: 5,
  0x3CD: 7,
  0x3F2: 1,
  0x5CD: 1,
}
checksum_low_nibble = {0x38D, }

def get_checksum(dat: bytes, addr: int) -> int:
  if addr not in checksum_byte_index:
    raise ValueError(f"Unhandled address 0x{addr:0X}")

  byte = dat[checksum_byte_index[addr]]
  return byte & 0xF if addr in checksum_low_nibble else byte >> 4

def strip_checksum(dat: bytes, addr: int) -> bytearray:
  if addr not in checksum_byte_index:
    raise ValueError(f"Unhandled address 0x{addr:0X}")

  dat = bytearray(dat)
  if addr in checksum_low_nibble:
    dat[checksum_byte_index[addr]] &= 0xF0
  else:
    dat[checksum_byte_index[addr]] &= 0x0F
  return dat

# assume checksum is zeroed out
def calculate_checksum(dat: bytes, init: int = 0) -> int:
  agg = init
  for i in range(len(dat)):
    agg += (dat[i] & 0xF) + (dat[i] >> 4)
  return (8 - agg) & 0xF

In [4]:
def predict_init(data: pd.Series, addr: int) -> int:
  actual_checksum = data.apply(lambda dat: get_checksum(dat, addr))

  for init in range(0x10):
    def calculate(dat):
      return calculate_checksum(strip_checksum(dat, addr), init)

    calculated_checksum = data.apply(calculate)
    if (actual_checksum == calculated_checksum).all():
      return init
  raise ValueError(f"Could not find init for address 0x{addr:0X}")


skip = (0x305, 0x38D)

print("checksum_init = {")
for addr in checksum_byte_index.keys():
  if addr in skip:
    continue
  init = predict_init(df[df.addr == addr].dat, addr)
  print(f"  0x{addr:0X}: 0x{init:0X},")
print("}")

checksum_init = {
  0x2ED: 0x6,
  0x2F5: 0xD,
  0x3CD: 0x5,
  0x3F2: 0xD,
  0x5CD: 0x7,
}


In [5]:
checksum_init = {
  0x2ED: 0x6,
  0x2F5: 0xD,
  0x3CD: 0x5,
  0x3F2: 0xD,
  0x5CD: 0x7,
}

In [6]:
def validate_init(data: pd.Series, addr: int, init: int) -> float:
  calculated_checksum = data.apply(lambda dat: calculate_checksum(strip_checksum(dat, addr), init))
  actual_checksum = data.apply(lambda dat: get_checksum(dat, addr))
  return (calculated_checksum == actual_checksum).sum() / len(actual_checksum)

for addr, init in checksum_init.items():
  data = df[df.addr == addr].dat
  correct_pct = validate_init(data, addr, init)
  print(f"0x{addr:0X}: {correct_pct:.2%}")

0x2ED: 100.00%
0x2F5: 100.00%
0x3CD: 100.00%
0x3F2: 100.00%
0x5CD: 100.00%
