Skip to content

Commit

Permalink
bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
gregjhogan committed Oct 15, 2019
1 parent 8ee89a0 commit 33a5167
Showing 1 changed file with 44 additions and 38 deletions.
82 changes: 44 additions & 38 deletions python/uds.py
Expand Up @@ -193,26 +193,32 @@ def _uds_request(address, service_type, subfunction=None, data=None):
if data is not None:
req += data
tx_queue.put(req)
try:
resp = rx_queue.get(block=True, timeout=10)
except Empty:
raise MessageTimeoutError("timeout waiting for response")
resp_sid = resp[0] if len(resp) > 0 else None

# negative response
if resp_sid == 0x7F:
service_id = 0
try:
service_id = resp[1]
service_desc = SERVICE_TYPE(service_id).name
except:
service_desc = 'NON_STANDARD_SERVICE'
error_code = resp[2] if len(resp) > 2 else '0x??'

while True:
try:
error_desc = _negative_response_codes[error_code]
except:
error_desc = 'unknown error'
raise NegativeResponseError('{} - {}'.format(service_desc, error_desc), service_id, error_code)
resp = rx_queue.get(block=True, timeout=10)
except Empty:
raise MessageTimeoutError("timeout waiting for response")
resp_sid = resp[0] if len(resp) > 0 else None

# negative response
if resp_sid == 0x7F:
service_id = resp[1] if len(resp) > 1 else -1
try:
service_desc = SERVICE_TYPE(service_id).name
except Exception:
service_desc = 'NON_STANDARD_SERVICE'
error_code = resp[2] if len(resp) > 2 else -1
try:
error_desc = _negative_response_codes[error_code]
except Exception:
error_desc = 'unknown error'
# wait for another message if response pending
if error_code == 0x78:
time.sleep(0.1)
continue
raise NegativeResponseError('{} - {}'.format(service_desc, error_desc), service_id, error_code)
break

# positive response
if service_type+0x40 != resp_sid:
Expand Down Expand Up @@ -249,15 +255,15 @@ def ecu_reset(address, reset_type):
resp = _uds_request(address, SERVICE_TYPE.ECU_RESET, subfunction=reset_type)
power_down_time = None
if reset_type == RESET_TYPE.ENABLE_RAPID_POWER_SHUTDOWN:
power_down_time = ord(resp[0])
power_down_time = resp[0]
return power_down_time

class ACCESS_TYPE(IntEnum):
REQUEST_SEED = 1
SEND_KEY = 2

def security_access(address, access_type, security_key=None):
request_seed = access_type % 2 == 0
request_seed = access_type % 2 != 0
if request_seed and security_key is not None:
raise ValueError('security_key not allowed')
if not request_seed and security_key is None:
Expand Down Expand Up @@ -339,13 +345,13 @@ def response_on_event(address, response_event_type, store_event, window_time, ev

if response_event_type == REPORT_ACTIVATED_EVENTS:
return {
"num_of_activated_events": ord(resp[0]),
"num_of_activated_events": resp[0],
"data": resp[1:], # TODO: parse the reset of response
}

return {
"num_of_identified_events": ord(resp[0]),
"event_window_time": ord(resp[1]),
"num_of_identified_events": resp[0],
"event_window_time": resp[1],
"data": resp[2:], # TODO: parse the reset of response
}

Expand Down Expand Up @@ -423,7 +429,7 @@ def read_memory_by_address(address, memory_address, memory_size, memory_address_
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data = struct.pack('!BB', memory_size_bytes, memory_address_bytes)
data = chr(memory_size_bytes<<4 | memory_address_bytes)

if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
Expand Down Expand Up @@ -464,14 +470,14 @@ def dynamically_define_data_identifier(address, dynamic_definition_type, dynamic
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data = struct.pack('!BB', memory_size_bytes, memory_address_bytes)
data = chr(memory_size_bytes<<4 | memory_address_bytes)

data = struct.pack('!H', dynamic_data_identifier)
if dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_IDENTIFIER:
for s in source_definitions:
data += struct.pack('!H', s["data_identifier"]) + chr(s["position"]) + chr(s["memory_size"])
elif dynamic_definition_type == DYNAMIC_DEFINITION_TYPE.DEFINE_BY_MEMORY_ADDRESS:
data += struct.pack('!BB', memory_size_bytes, memory_address_bytes)
data += chr(memory_size_bytes<<4 | memory_address_bytes)
for s in source_definitions:
if s["memory_address"] >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(s["memory_address"]))
Expand All @@ -497,7 +503,7 @@ def write_memory_by_address(address, memory_address, memory_size, data_record, m
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data = struct.pack('!BB', memory_size_bytes, memory_address_bytes)
data = chr(memory_size_bytes<<4 | memory_address_bytes)

if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
Expand Down Expand Up @@ -619,8 +625,8 @@ class ROUTINE_IDENTIFIER_TYPE(IntEnum):

def routine_control(address, routine_control_type, routine_identifier_type, routine_option_record=''):
data = struct.pack('!H', routine_identifier_type) + routine_option_record
_uds_request(address, SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data)
resp = resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
resp = _uds_request(address, SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != routine_identifier_type:
raise ValueError('invalid response routine identifier: {}'.format(hex(resp_id)))
return resp[2:]
Expand All @@ -632,7 +638,7 @@ def request_download(address, memory_address, memory_size, memory_address_bytes=
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data += struct.pack('!BB', memory_size_bytes, memory_address_bytes)
data += chr(memory_size_bytes<<4 | memory_address_bytes)

if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
Expand All @@ -642,7 +648,7 @@ def request_download(address, memory_address, memory_size, memory_address_bytes=
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]

resp = _uds_request(address, SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data)
max_num_bytes_len = ord(resp[0]) >> 4 if len(resp) > 0 else None
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', ('\x00'*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
else:
Expand All @@ -657,7 +663,7 @@ def request_upload(address, memory_address, memory_size, memory_address_bytes=4,
raise ValueError('invalid memory_address_bytes: {}'.format(memory_address_bytes))
if memory_size_bytes < 1 or memory_size_bytes > 4:
raise ValueError('invalid memory_size_bytes: {}'.format(memory_size_bytes))
data += struct.pack('!BB', memory_size_bytes, memory_address_bytes)
data += chr(memory_size_bytes<<4 | memory_address_bytes)

if memory_address >= 1<<(memory_address_bytes*8):
raise ValueError('invalid memory_address: {}'.format(memory_address))
Expand All @@ -667,7 +673,7 @@ def request_upload(address, memory_address, memory_size, memory_address_bytes=4,
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]

resp = _uds_request(address, SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data)
max_num_bytes_len = ord(resp[0]) >> 4 if len(resp) > 0 else None
max_num_bytes_len = resp[0] >> 4 if len(resp) > 0 else None
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', ('\x00'*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
else:
Expand All @@ -676,8 +682,9 @@ def request_upload(address, memory_address, memory_size, memory_address_bytes=4,
return max_num_bytes # max number of bytes per transfer data request

def transfer_data(address, block_sequence_count, data=''):
data = chr(block_sequence_count)+data
resp = _uds_request(address, SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data)
resp_id = ord(resp[0]) if len(resp) > 0 else None
resp_id = resp[0] if len(resp) > 0 else None
if resp_id != block_sequence_count:
raise ValueError('invalid block_sequence_count: {}'.format(resp_id))
return resp[1:]
Expand All @@ -687,7 +694,6 @@ def request_transfer_exit(address):

if __name__ == "__main__":
from python import Panda
from string import printable
panda = Panda()
bus = 0
tx_addr = 0x18da30f1 # EPS
Expand All @@ -702,7 +708,7 @@ def request_transfer_exit(address):
tester_present(tx_addr)
print("extended diagnostic session ...")
diagnostic_session_control(tx_addr, SESSION_TYPE.EXTENDED_DIAGNOSTIC)
print("application software id ...")
print("read data by id: application software id ...")
app_id = read_data_by_identifier(tx_addr, DATA_IDENTIFIER_TYPE.APPLICATION_SOFTWARE_IDENTIFICATION)
print(app_id)
# for i in range(0xF100, 0xF2FF):
Expand All @@ -713,7 +719,7 @@ def request_transfer_exit(address):
# desc = " [" + DATA_IDENTIFIER_TYPE(i).name + "]"
# except ValueError:
# pass
# print("{}:{} {} {}".format(hex(i), desc, hexlify(dat), dat.decode(errors="ignore")))
# print("{}:{} {} {}".format(hex(i), desc, hexlify(dat), "")) #, dat.decode(errors="ignore")))
# except NegativeResponseError as e:
# if e.error_code != 0x31:
# print("{}: {}".format(hex(i), e))

0 comments on commit 33a5167

Please sign in to comment.