Skip to content

Commit

Permalink
SERVICE_TYPE enum
Browse files Browse the repository at this point in the history
  • Loading branch information
gregjhogan committed Oct 15, 2019
1 parent 98e73b5 commit 95be481
Showing 1 changed file with 52 additions and 52 deletions.
104 changes: 52 additions & 52 deletions python/uds.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,32 @@
from enum import Enum
import threading

# class Services(Enum):
# DiagnosticSessionControl = 0x10
# ECUReset = 0x11
# SecurityAccess = 0x27
# CommunicationControl = 0x28
# TesterPresent = 0x3E
# AccessTimingParameter = 0x83
# SecuredDataTransmission = 0x84
# ControlDTCSetting = 0x85
# ResponseOnEvent = 0x86
# LinkControl = 0x87
# ReadDataByIdentifier = 0x22
# ReadMemoryByAddress = 0x23
# ReadScalingDataByIdentifier = 0x24
# ReadDataByPeriodicIdentifier = 0x2A
# DynamicallyDefineDataIdentifier = 0x2C
# WriteDataByIdentifier = 0x2E
# WriteMemoryByAddress = 0x3D
# ClearDiagnosticInformation = 0x14
# ReadDTCInformation = 0x19
# InputOutputControlByIdentifier = 0x2F
# RoutineControl = 0x31
# RequestDownload = 0x34
# RequestUpload = 0x35
# TransferData = 0x36
# RequestTransferExit = 0x37
class SERVICE_TYPE(Enum):
DIAGNOSTIC_SESSION_CONTROL = 0x10
ECU_RESET = 0x11
SECURITY_ACCESS = 0x27
COMMUNICATION_CONTROL = 0x28
TESTER_PRESENT = 0x3E
ACCESS_TIMING_PARAMETER = 0x83
SECURED_DATA_TRANSMISSION = 0x84
CONTROL_DTC_SETTING = 0x85
RESPONSE_ON_EVENT = 0x86
LINK_CONTROL = 0x87
READ_DATA_BY_IDENTIFIER = 0x22
READ_MEMORY_BY_ADDRESS = 0x23
READ_SCALING_DATA_BY_IDENTIFIER = 0x24
READ_DATA_BY_PERIODIC_IDENTIFIER = 0x2A
DYNAMICALLY_DEFINE_DATA_IDENTIFIER = 0x2C
WRITE_DATA_BY_IDENTIFIER = 0x2E
WRITE_MEMORY_BY_ADDRESS = 0x3D
CLEAR_DIAGNOSTIC_INFORMATION = 0x14
READ_DTC_INFORMATION = 0x19
INPUT_OUTPUT_CONTROL_BY_IDENTIFIER = 0x2F
ROUTINE_CONTROL = 0x31
REQUEST_DOWNLOAD = 0x34
REQUEST_UPLOAD = 0x35
TRANSFER_DATA = 0x36
REQUEST_TRANSFER_EXIT = 0x37

_negative_response_codes = {
'\x00': 'positive response',
Expand Down Expand Up @@ -83,7 +83,7 @@ class InvalidSubFunctioneError(Exception):
pass

# generic uds request
def _request(address, service, subfunction, data=None):
def _request(address, service_type, subfunction, data=None):
# TODO: send request
# TODO: wait for response

Expand Down Expand Up @@ -115,7 +115,7 @@ class SESSION_TYPE(Enum):
SAFETY_SYSTEM_DIAGNOSTIC = 4

def diagnostic_session_control(address, session_type):
_request(address, service=0x10, subfunction=session_type)
_request(address, service=SERVICE_TYPE.DIAGNOSTIC_SESSION_CONTROL, subfunction=session_type)

class RESET_TYPE(Enum):
HARD = 1
Expand All @@ -125,7 +125,7 @@ class RESET_TYPE(Enum):
DISABLE_RAPID_POWER_SHUTDOWN = 5

def ecu_reset(address, reset_type):
resp = _request(address, service=0x11, subfunction=reset_type)
resp = _request(address, SERVICE_TYPE.ECU_RESET, subfunction=reset_type)
power_down_time = None
if reset_type == RESET_TYPE.ENABLE_RAPID_POWER_SHUTDOWN
power_down_time = ord(resp[0])
Expand All @@ -141,7 +141,7 @@ def security_access(address, access_type, security_key=None):
raise ValueError('security_key not allowed')
if not request_seed and security_key is None:
raise ValueError('security_key is missing')
resp = _request(address, service=0x27, subfunction=access_type, data=security_key)
resp = _request(address, service=SERVICE_TYPE.SECURITY_ACCESS, subfunction=access_type, data=security_key)
if request_seed:
security_seed = resp
return security_seed
Expand All @@ -159,10 +159,10 @@ class MESSAGE_TYPE(Enum):

def communication_control(address, control_type, message_type):
data = chr(message_type)
_request(address, service=0x28, subfunction=control_type, data=data)
_request(address, service=SERVICE_TYPE.COMMUNICATION_CONTROL, subfunction=control_type, data=data)

def tester_present(address):
_request(address, service=0x3E, subfunction=0x00)
_request(address, service=SERVICE_TYPE.TESTER_PRESENT, subfunction=0x00)

class TIMING_PARAMETER_TYPE(Enum):
READ_EXTENDED_SET = 1
Expand All @@ -180,15 +180,15 @@ def access_timing_parameter(address, timing_parameter_type, parameter_values):
raise ValueError('parameter_values not allowed')
if write_custom_values and parameter_values is None:
raise ValueError('parameter_values is missing')
resp = _request(address, service=0x83, subfunction=timing_parameter_type, data=parameter_values)
resp = _request(address, service=SERVICE_TYPE.ACCESS_TIMING_PARAMETER, subfunction=timing_parameter_type, data=parameter_values)
if read_values:
# TODO: parse response into values?
parameter_values = resp
return parameter_values

def secured_data_transmission(address, data):
# TODO: split data into multiple input parameters?
resp = _request(address, service=0x84, subfunction=None, data=data)
resp = _request(address, service=SERVICE_TYPE.SECURED_DATA_TRANSMISSION, subfunction=None, data=data)
# TODO: parse response into multiple output values?
return resp

Expand All @@ -197,7 +197,7 @@ class DTC_SETTING_TYPE(Enum):
OFF = 2

def control_dtc_setting(address, dtc_setting_type):
_request(address, service=0x85, subfunction=dtc_setting_type)
_request(address, service=SERVICE_TYPE.CONTROL_DTC_SETTING, subfunction=dtc_setting_type)

class RESPONSE_EVENT_TYPE(Enum):
STOP_RESPONSE_ON_EVENT = 0
Expand All @@ -214,7 +214,7 @@ def response_on_event(address, response_event_type, store_event, window_time, ev
response_event_type |= 0x20
# TODO: split record parameters into arrays
data = char(window_time) + event_type_record + service_response_record
resp = _request(address, service=0x86, subfunction=response_event_type, data=data)
resp = _request(address, service=SERVICE_TYPE.RESPONSE_ON_EVENT, subfunction=response_event_type, data=data)

if response_event_type == REPORT_ACTIVATED_EVENTS:
return {
Expand Down Expand Up @@ -253,7 +253,7 @@ def link_control(address, link_control_type, baud_rate_type=None):
data = struct.pack('!I', baud_rate_type)[1:]
else:
data = None
_request(address, service=0x87, subfunction=link_control_type, data=data)
_request(address, service=SERVICE_TYPE.LINK_CONTROL, subfunction=link_control_type, data=data)

class DATA_IDENTIFIER_TYPE(Enum):
BOOT_SOFTWARE_IDENTIFICATION = 0XF180
Expand Down Expand Up @@ -291,7 +291,7 @@ class DATA_IDENTIFIER_TYPE(Enum):
def read_data_by_identifier(address, data_identifier_type):
# TODO: support list of identifiers
data = struct.pack('!H', data_identifier_type)
resp = _request(address, service=0x22, subfunction=None, data=data)
resp = _request(address, service=SERVICE_TYPE.READ_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
Expand All @@ -311,12 +311,12 @@ def read_memory_by_address(address, memory_address, memory_size, memory_address_
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]

resp = _request(address, service=0x23, subfunction=None, data=data)
resp = _request(address, service=SERVICE_TYPE.READ_MEMORY_BY_ADDRESS, subfunction=None, data=data)
return resp

def read_scaling_data_by_identifier(address, data_identifier_type):
data = struct.pack('!H', data_identifier_type)
resp = _request(address, service=0x24, subfunction=None, data=data)
resp = _request(address, service=SERVICE_TYPE.READ_SCALING_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
Expand All @@ -331,7 +331,7 @@ class TRANSMISSION_MODE_TYPE(Enum):
def read_data_by_periodic_identifier(address, transmission_mode_type, periodic_data_identifier):
# TODO: support list of identifiers
data = chr(transmission_mode_type) + chr(periodic_data_identifier)
_request(address, service=0x2A, subfunction=None, data=data)
_request(address, service=SERVICE_TYPE.READ_DATA_BY_PERIODIC_IDENTIFIER, subfunction=None, data=data)

class DYNAMIC_DEFINITION_TYPE(Enum):
DEFINE_BY_IDENTIFIER = 1
Expand Down Expand Up @@ -362,11 +362,11 @@ def dynamically_define_data_identifier(address, dynamic_definition_type, dynamic
pass
else:
raise ValueError('invalid dynamic identifier type: {}'.format(hex(dynamic_definition_type)))
_request(address, service=0x2C, subfunction=dynamic_definition_type, data=data)
_request(address, service=SERVICE_TYPE.DYNAMICALLY_DEFINE_DATA_IDENTIFIER, subfunction=dynamic_definition_type, data=data)

def write_data_by_identifier(address, data_identifier_type, data_record):
data = struct.pack('!H', data_identifier_type) + data_record
resp = _request(address, service=0x2E, subfunction=None, data=data)
resp = _request(address, service=SERVICE_TYPE.WRITE_DATA_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
Expand All @@ -386,15 +386,15 @@ def write_memory_by_address(address, memory_address, memory_size, data_record, m
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]

data += data_record
_request(address, service=0x3D, subfunction=0x00, data=data)
_request(address, service=SERVICE_TYPE.WRITE_MEMORY_BY_ADDRESS, subfunction=0x00, data=data)

class DTC_GROUP_TYPE(Enum):
EMISSIONS = 0x000000
ALL = 0xFFFFFF

def clear_diagnostic_information(address, dtc_group_type):
data = struct.pack('!I', dtc_group_type)[1:] # 3 bytes
_request(address, service=0x14, subfunction=None, data=data)
_request(address, service=SERVICE_TYPE.CLEAR_DIAGNOSTIC_INFORMATION, subfunction=None, data=data)

class DTC_REPORT_TYPE(Enum):
NUMBER_OF_DTC_BY_STATUS_MASK = 0x01
Expand Down Expand Up @@ -467,7 +467,7 @@ def read_dtc_information(address, dtc_report_type, dtc_status_mask_type=DTC_STAT
dtc_report_type == DTC_REPORT_TYPE.DTC_BY_SEVERITY_MASK_RECORD:
data += chr(dtc_severity_mask_type) + chr(dtc_status_mask_type)

resp = _request(address, service=0x19, subfunction=dtc_report_type, data=data)
resp = _request(address, service=SERVICE_TYPE.READ_DTC_INFORMATION, subfunction=dtc_report_type, data=data)

# TODO: parse response
return resp
Expand All @@ -480,7 +480,7 @@ class CONTROL_OPTION_TYPE(Enum):

def input_output_control_by_identifier(address, data_identifier_type, control_option_record, control_enable_mask_record=''):
data = struct.pack('!H', data_identifier_type) + control_option_record + control_enable_mask_record
resp = _request(address, service=0x2F, subfunction=None, data=data)
resp = _request(address, service=SERVICE_TYPE.INPUT_OUTPUT_CONTROL_BY_IDENTIFIER, subfunction=None, data=data)
resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != data_identifier_type:
raise ValueError('invalid response data identifier: {}'.format(hex(resp_id)))
Expand All @@ -498,7 +498,7 @@ class ROUTINE_IDENTIFIER_TYPE(Enum):

def routine_control(address, routine_control_type, routine_identifier_type, routine_option_record=''):
data = struct.pack('!H', routine_identifier_type) + routine_option_record
_request(address, service=0x31, subfunction=routine_control_type, data=data)
_request(address, service=SERVICE_TYPE.ROUTINE_CONTROL, subfunction=routine_control_type, data=data)
resp = resp_id = struct.unpack('!H', resp[0:2])[0] if len(resp) >= 2 else None
if resp_id != routine_identifier_type:
raise ValueError('invalid response routine identifier: {}'.format(hex(resp_id)))
Expand All @@ -520,7 +520,7 @@ def request_download(address, memory_address, memory_size, memory_address_bytes=
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]

resp = _request(address, service=0x34, subfunction=None, data=data)
resp = _request(address, service=SERVICE_TYPE.REQUEST_DOWNLOAD, subfunction=None, data=data)
max_num_bytes_len = ord(resp[0]) >> 4 if len(resp) > 0 else None
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', ('\x00'*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
Expand All @@ -545,7 +545,7 @@ def request_upload(address, memory_address, memory_size, memory_address_bytes=4,
raise ValueError('invalid memory_size: {}'.format(memory_size))
data += struct.pack('!I', memory_size)[4-memory_size_bytes:]

resp = _request(address, service=0x35, subfunction=None, data=data)
resp = _request(address, service=SERVICE_TYPE.REQUEST_UPLOAD, subfunction=None, data=data)
max_num_bytes_len = ord(resp[0]) >> 4 if len(resp) > 0 else None
if max_num_bytes_len >= 1 and max_num_bytes_len <= 4:
max_num_bytes = struct.unpack('!I', ('\x00'*(4-max_num_bytes_len))+resp[1:max_num_bytes_len+1])[0]
Expand All @@ -555,14 +555,14 @@ def request_upload(address, memory_address, memory_size, memory_address_bytes=4,
return max_num_bytes # max number of bytes per transfer data request

def transfer_data(address, block_sequence_count, data=''):
resp = _request(address, service=0x36, subfunction=None, data=data)
resp = _request(address, service=SERVICE_TYPE.TRANSFER_DATA, subfunction=None, data=data)
resp_id = ord(resp[0]) if len(resp) > 0 else None
if resp_id != block_sequence_count:
raise ValueError('invalid block_sequence_count: {}'.format(resp_id))
return resp[1:]

def request_transfer_exit(address)
_request(address, service=0x37, subfunction=None)
_request(address, service=SERVICE_TYPE.REQUEST_TRANSFER_EXIT, subfunction=None)

if __name__ == "__main__":
from . import uds
Expand Down

0 comments on commit 95be481

Please sign in to comment.