Skip to content

Commit

Permalink
uds: no need for threads if you always drain rx
Browse files Browse the repository at this point in the history
  • Loading branch information
gregjhogan committed Nov 13, 2019
1 parent 91b7c5b commit 68c39fb
Showing 1 changed file with 60 additions and 69 deletions.
129 changes: 60 additions & 69 deletions python/uds.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#!/usr/bin/env python3
import time
import struct
from typing import NamedTuple, List
from typing import Callable, NamedTuple, Tuple, List
from enum import IntEnum
from queue import Queue, Empty
from threading import Thread
from binascii import hexlify

class SERVICE_TYPE(IntEnum):
Expand Down Expand Up @@ -271,14 +269,50 @@ class InvalidSubFunctioneError(Exception):
0x93: 'voltage too low',
}

class CanClient():
def __init__(self, can_send: Callable[[Tuple[int, bytes, int]], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], tx_addr: int, rx_addrs: int, bus: int, debug: bool=False):
self.tx = can_send
self.rx = can_recv
self.tx_addr = tx_addr
self.rx_addrs = rx_addrs
self.bus = bus
self.debug = debug

def recv(self, drain=False) -> List[bytes]:
msg_array = []
while True:
msgs = self.rx()
if drain:
if self.debug: print("CAN-RX: drain - {}".format(len(msgs)))
else:
for rx_addr, rx_ts, rx_data, rx_bus in msgs or []:
if rx_bus == self.bus and rx_addr in self.rx_addrs and len(rx_data) > 0:
if self.debug: print("CAN-RX: {} - {}".format(hex(rx_addr), hexlify(rx_data)))
msg_array.append(rx_data)
# break when non-full buffer is processed
if len(msgs) < 254:
return msg_array

def send(self, msgs: List[bytes], delay: float=0) -> None:
first = True
for msg in msgs:
if not first and delay:
if self.debug: print(f"CAN-TX: delay - {delay}")
time.sleep(delay)
if self.debug: print("CAN-TX: {} - {}".format(hex(self.tx_addr), hexlify(msg)))
self.tx(self.tx_addr, msg, self.bus)
first = False

class IsoTpMessage():
def __init__(self, can_tx_queue: Queue, can_rx_queue: Queue, timeout: float, debug: bool=False):
self.can_tx_queue = can_tx_queue
self.can_rx_queue = can_rx_queue
def __init__(self, can_client: CanClient, timeout: float=1, debug: bool=False):
self._can_client = can_client
self.timeout = timeout
self.debug = debug

def send(self, dat: bytes) -> None:
# throw away any stale data
self._can_client.recv(drain=True)

self.tx_dat = dat
self.tx_len = len(dat)
self.tx_idx = 0
Expand All @@ -297,27 +331,27 @@ def _tx_first_frame(self) -> None:
# first frame (send first 6 bytes)
if self.debug: print("ISO-TP: TX - first frame")
msg = (struct.pack("!H", 0x1000 | self.tx_len) + self.tx_dat[:6]).ljust(8, b"\x00")
self.can_tx_queue.put(msg)
self._can_client.send([msg])

def recv(self) -> bytes:
self.rx_dat = b""
self.rx_len = 0
self.rx_idx = 0
self.rx_done = False

start_time = time.time()
try:
while True:
self._isotp_rx_next()
if self.tx_done and self.rx_done:
return self.rx_dat
except Empty:
raise MessageTimeoutError("timeout waiting for response")
for msg in self._can_client.recv():
self._isotp_rx_next(msg)
if self.tx_done and self.rx_done:
return self.rx_dat
if time.time() - start_time > self.timeout:
raise MessageTimeoutError("timeout waiting for response")
finally:
if self.debug: print(f"ISO-TP: RESPONSE - {hexlify(self.rx_dat)}")

def _isotp_rx_next(self) -> None:
rx_data = self.can_rx_queue.get(block=True, timeout=self.timeout)

def _isotp_rx_next(self, rx_data: bytes) -> None:
# single rx_frame
if rx_data[0] >> 4 == 0x0:
self.rx_len = rx_data[0] & 0xFF
Expand All @@ -337,9 +371,9 @@ def _isotp_rx_next(self) -> None:
if self.debug: print(f"ISO-TP: TX - flow control continue")
# send flow control message (send all bytes)
msg = b"\x30\x00\x00".ljust(8, b"\x00")
self.can_tx_queue.put(msg)
self._can_client.send([msg])
return

# consecutive rx frame
if rx_data[0] >> 4 == 0x2:
assert self.rx_done == False, "isotp - rx: consecutive frame with no active frame"
Expand All @@ -362,19 +396,19 @@ def _isotp_rx_next(self) -> None:
delay_ts = rx_data[2] & 0x7F
# scale is 1 milliseconds if first bit == 0, 100 micro seconds if first bit == 1
delay_div = 1000. if rx_data[2] & 0x80 == 0 else 10000.
delay_sec = delay_ts / delay_div
# first frame = 6 bytes, each consecutive frame = 7 bytes
start = 6 + self.tx_idx * 7
count = rx_data[1]
end = start + count * 7 if count > 0 else self.tx_len
tx_msgs = []
for i in range(start, end, 7):
if delay_ts > 0 and i > start:
delay_s = delay_ts / delay_div
if self.debug: print(f"ISO-TP: TX - delay - seconds={delay_s}")
time.sleep(delay_s)
self.tx_idx += 1
# consecutive tx frames
# consecutive tx messages
msg = (bytes([0x20 | (self.tx_idx & 0xF)]) + self.tx_dat[i:i+7]).ljust(8, b"\x00")
self.can_tx_queue.put(msg)
tx_msgs.append(msg)
# send consecutive tx messages
self._can_client.send(tx_msgs, delay=delay_sec)
if end >= self.tx_len:
self.tx_done = True
if self.debug: print(f"ISO-TP: TX - consecutive frame - idx={self.tx_idx} done={self.tx_done}")
Expand All @@ -383,8 +417,7 @@ def _isotp_rx_next(self) -> None:
if self.debug: print("ISO-TP: TX - flow control wait")

class UdsClient():
def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout: float=10, debug: bool=False):
self.panda = panda
def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout: float=1, debug: bool=False):
self.bus = bus
self.tx_addr = tx_addr
if rx_addr is None:
Expand All @@ -396,62 +429,20 @@ def __init__(self, panda, tx_addr: int, rx_addr: int=None, bus: int=0, timeout:
self.rx_addr = (tx_addr & 0xFFFF0000) + (tx_addr<<8 & 0xFF00) + (tx_addr>>8 & 0xFF)
else:
raise ValueError("invalid tx_addr: {}".format(tx_addr))

self.can_tx_queue = Queue()
self.can_rx_queue = Queue()
self.timeout = timeout
self.debug = debug

self.can_thread = Thread(target=self._can_thread, args=(self.debug,))
self.can_thread.daemon = True
self.can_thread.start()

def _can_thread(self, debug: bool=False):
try:
while True:
# send
tx_cnt = 0
while tx_cnt < 256 and not self.can_tx_queue.empty():
try:
msg = self.can_tx_queue.get(block=False)
tx_cnt += 1
if debug: print("CAN-TX: {} - {}".format(hex(self.tx_addr), hexlify(msg)))
self.panda.can_send(self.tx_addr, msg, self.bus)
except Empty:
pass

# receive
rx_cnt = 0
while rx_cnt < 4096:
msgs = self.panda.can_recv()
if not msgs:
break
rx_cnt += len(msgs)
for rx_addr, rx_ts, rx_data, rx_bus in msgs:
if rx_bus != self.bus or rx_addr != self.rx_addr or len(rx_data) == 0:
continue
if debug: print("CAN-RX: {} - {}".format(hex(self.rx_addr), hexlify(rx_data)))
self.can_rx_queue.put(rx_data)
finally:
self.panda.close()
self._can_client = CanClient(panda.can_send, panda.can_recv, self.tx_addr, [self.rx_addr], self.bus, debug=self.debug)

# generic uds request
def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int=None, data: bytes=None) -> bytes:
# throw away any stale data
while not self.can_rx_queue.empty():
try:
self.can_rx_queue.get(block=False)
except Empty:
pass

req = bytes([service_type])
if subfunction is not None:
req += bytes([subfunction])
if data is not None:
req += data

# send request, wait for response
isotp_msg = IsoTpMessage(self.can_tx_queue, self.can_rx_queue, self.timeout, self.debug)
isotp_msg = IsoTpMessage(self._can_client, self.timeout, self.debug)
isotp_msg.send(req)
while True:
resp = isotp_msg.recv()
Expand Down

0 comments on commit 68c39fb

Please sign in to comment.