Skip to content

Commit

Permalink
fix typing errors
Browse files Browse the repository at this point in the history
  • Loading branch information
pd0wm committed May 28, 2020
1 parent 9bece64 commit a618e64
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 19 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ examples/output.csv
.DS_Store
.vscode
nosetests.xml
.mypy_cache/
44 changes: 25 additions & 19 deletions python/uds.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time
import struct
from collections import deque
from typing import Callable, NamedTuple, Tuple, List
from typing import Callable, NamedTuple, Tuple, List, Deque, Generator, Optional, cast
from enum import IntEnum

class SERVICE_TYPE(IntEnum):
Expand Down Expand Up @@ -271,12 +271,12 @@ class InvalidSubFunctioneError(Exception):


class CanClient():
def __init__(self, can_send: Callable[[Tuple[int, bytes, int]], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], tx_addr: int, rx_addr: int, bus: int, sub_addr: int=None, debug: bool=False):
def __init__(self, can_send: Callable[[int, bytes, int], None], can_recv: Callable[[], List[Tuple[int, int, bytes, int]]], tx_addr: int, rx_addr: int, bus: int, sub_addr: int=None, debug: bool=False):
self.tx = can_send
self.rx = can_recv
self.tx_addr = tx_addr
self.rx_addr = rx_addr
self.rx_buff = deque()
self.rx_buff = deque() # type: Deque[bytes]
self.sub_addr = sub_addr
self.bus = bus
self.debug = debug
Expand Down Expand Up @@ -320,7 +320,7 @@ def _recv_buffer(self, drain: bool=False) -> None:
if len(msgs) < 254:
return

def recv(self, drain: bool=False) -> List[bytes]:
def recv(self, drain: bool=False) -> Generator[bytes, None, None]:
# buffer rx messages in case two response messages are received at once
# (e.g. response pending and success/failure response)
self._recv_buffer(drain)
Expand Down Expand Up @@ -383,7 +383,7 @@ def _tx_first_frame(self) -> None:
msg = (struct.pack("!H", 0x1000 | self.tx_len) + self.tx_dat[:self.max_len - 2]).ljust(self.max_len - 2, b"\x00")
self._can_client.send([msg])

def recv(self) -> bytes:
def recv(self) -> Optional[bytes]:
start_time = time.time()
try:
while True:
Expand Down Expand Up @@ -505,6 +505,10 @@ def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int=None, data:
isotp_msg.send(req)
while True:
resp = isotp_msg.recv()

if resp is None:
continue

resp_sid = resp[0] if len(resp) > 0 else None

# negative response
Expand All @@ -518,7 +522,7 @@ def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int=None, data:
try:
error_desc = _negative_response_codes[error_code]
except BaseException:
error_desc = resp[3:]
error_desc = resp[3:].hex()
# wait for another message if response pending
if error_code == 0x78:
if self.debug: print("UDS-RX: response pending")
Expand All @@ -534,7 +538,7 @@ def _uds_request(self, service_type: SERVICE_TYPE, subfunction: int=None, data:
resp_sfn = resp[1] if len(resp) > 1 else None
if subfunction != resp_sfn:
resp_sfn_hex = hex(resp_sfn) if resp_sfn is not None else None
raise InvalidSubFunctioneError('invalid response subfunction: {}'.format(hex(resp_sfn_hex)))
raise InvalidSubFunctioneError(f'invalid response subfunction: {resp_sfn_hex:x}')

# return data (exclude service id and sub-function id)
return resp[(1 if subfunction is None else 2):]
Expand Down Expand Up @@ -595,7 +599,7 @@ def control_dtc_setting(self, dtc_setting_type: DTC_SETTING_TYPE):

def response_on_event(self, response_event_type: RESPONSE_EVENT_TYPE, store_event: bool, window_time: int, event_type_record: int, service_response_record: int):
if store_event:
response_event_type |= 0x20
response_event_type |= 0x20 # type: ignore
# TODO: split record parameters into arrays
data = bytes([window_time, event_type_record, service_response_record])
resp = self._uds_request(SERVICE_TYPE.RESPONSE_ON_EVENT, subfunction=response_event_type, data=data)
Expand All @@ -613,9 +617,11 @@ def response_on_event(self, response_event_type: RESPONSE_EVENT_TYPE, store_even
}

def link_control(self, link_control_type: LINK_CONTROL_TYPE, baud_rate_type: BAUD_RATE_TYPE=None):
data : Optional[bytes]

if link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_FIXED_BAUDRATE:
# baud_rate_type = BAUD_RATE_TYPE
data = bytes([baud_rate_type])
data = bytes([cast(int, baud_rate_type)])
elif link_control_type == LINK_CONTROL_TYPE.VERIFY_BAUDRATE_TRANSITION_WITH_SPECIFIC_BAUDRATE:
# baud_rate_type = custom value (3 bytes big-endian)
data = struct.pack('!I', baud_rate_type)[1:]
Expand Down Expand Up @@ -671,16 +677,16 @@ def dynamically_define_data_identifier(self, dynamic_definition_type: DYNAMIC_DE
data = struct.pack('!H', dynamic_data_identifier)
if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER:
for s in source_definitions:
data += struct.pack('!H', s["data_identifier"]) + bytes([s["position"], s["memory_size"]])
data += struct.pack('!H', s.data_identifier) + bytes([s.position, s.memory_size])
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS:
data += bytes([memory_size_bytes<<4 | memory_address_bytes])
for s in source_definitions:
if s["memory_address"] >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(s["memory_address"]))
data += struct.pack('!I', s["memory_address"])[4-memory_address_bytes:]
if s["memory_size"] >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(s["memory_size"]))
data += struct.pack('!I', s["memory_size"])[4-memory_size_bytes:]
if s.memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(s.memory_address))
data += struct.pack('!I', s.memory_address)[4-memory_address_bytes:]
if s.memory_size >= 1<<(memory_size_bytes*8):
raise ValueError('invalid memory_size: {}'.format(s.memory_size))
data += struct.pack('!I', s.memory_size)[4-memory_size_bytes:]
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.CLEAR_DYNAMICALLY_DEFINED_DATA_IDENTIFIER:
pass
else:
Expand Down Expand Up @@ -736,7 +742,7 @@ def read_dtc_information(self, dtc_report_type: DTC_REPORT_TYPE, dtc_status_mask
if dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_IDENTIFICATION or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.DTC_SNAPSHOT_RECORD_BY_RECORD_NUMBER:
data += ord(dtc_snapshot_record_num)
data += bytes([dtc_snapshot_record_num])
# dtc_extended_record_num
if dtc_report_type == DTC_REPORT_TYPE.DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER or \
dtc_report_type == DTC_REPORT_TYPE.MIRROR_MEMORY_DTC_EXTENDED_DATA_RECORD_BY_DTC_NUMBER:
Expand Down Expand Up @@ -784,7 +790,7 @@ def request_download(self, memory_address: int, memory_size: int, memory_address
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]

resp = self._uds_request(SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data)
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else 0
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', (b"\x00"*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
else:
Expand All @@ -809,7 +815,7 @@ def request_upload(self, memory_address: int, memory_size: int, memory_address_b
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]

resp = self._uds_request(SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data)
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else 0
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', (b"\x00"*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
else:
Expand Down

0 comments on commit a618e64

Please sign in to comment.