-
Notifications
You must be signed in to change notification settings - Fork 3
/
elm327.py
executable file
·201 lines (159 loc) · 6.13 KB
/
elm327.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
#!/usr/bin/env python3
import os
import serial
import struct
import sys
import time
import binascii
class ELM327(object):
def __init__(self, port = '/dev/ttyUSB0', baudrate = 500000, timeout_serial = 0.1, timeout_resp = 0.02, debug = False):
self.debug = debug
self.ser = serial.Serial(port, baudrate = baudrate, timeout = timeout_serial)
self.last_header = None
self.last_resp_address = None
self.last_command_str = None
self.timeout_resp = timeout_resp
self.reset()
def reset(self):
"""Resets the device and sets parameters ready for querying."""
if self.debug:
print("resetting ...")
self.reset_buffers()
self.send("ATZ") # reset
time.sleep(1)
self.reset_buffers()
if self.debug:
print("setting parameters ...")
self.send_and_wait_for_ok("ATE0") # echo off
self.send_and_wait_for_ok("ATS0") # spaces off
self.send_and_wait_for_ok("ATL0") # line feeds off
self.send_and_wait_for_ok("ATH0") # headers on
self.send_and_wait_for_ok("ATST%02d" % int(self.timeout_resp / 0.004)) # resp timeout
time.sleep(0.5)
self.reset_buffers()
def query(self, header, resp_address, command, resp_structure):
"""
Issues a CAN query expecting a response.
header: message header, usually related to the module you expect to hear from
resp_address: the module you actually want to hear from, usually header + 8
command: the command, often 22xxxx for extended PIDs
resp_structure: a tuple of ints indicating the structure you expect back.
For example (4,) means you expect a single line message with 4 bytes.
(6,6) means you expect two lines each with 6 bytes.
"""
if self.debug:
print("query: ", header, resp_address, command, resp_structure)
if header != self.last_header:
self.send_and_wait_for_ok("ATSH%06x" % header)
self.last_header = header
if resp_address != self.last_resp_address:
self.send_and_wait_for_ok("ATCRA%08x" % resp_address)
self.last_resp_address = resp_address
resp_num_frames = len(resp_structure)
if command <= 0xFFFF:
command_str = "%04x%d" % (command, resp_num_frames)
else:
command_str = "%06x%d" % (command, resp_num_frames)
self.ser.reset_input_buffer()
self.send(command_str)
resp = self.receive_message()
if resp == None:
return
if tuple(map(len, resp)) != resp_structure:
if self.debug:
print("response did not match expected structure", resp)
return
return resp
def receive(self):
"""Receives a single line over serial, agnostic to the content of the line."""
if self.debug:
print("receive")
line = self.ser.read_until(b'\r')
return(line)
def receive_message(self):
"""
Receives a CAN bus message, including multi-line messages, in which case
it repeatedly calls receive() until it gets the correct number of bytes.
A single line message looks like this:
DEADBEEF
A multi line message looks like this (refer to ELM327 documentation):
010
0:DEADBEEF
1:5ADDBEEF
where 010 specifies the total number of bytes expected to be received (in hex).
"""
resp = []
if self.debug:
print("receive_message")
line = self.receive().replace(b'>',b'').replace(b'\r',b'')
if b'NO DATA' in line:
return
if b'CAN ERROR' in line:
return
i = 0
if len(line) == 3:
# multi line message
total_num_bytes = int.from_bytes(binascii.unhexlify(b"0" + line), byteorder = "big")
received_num_bytes = 0
if self.debug:
print("begin multi-line response for bytes: ", total_num_bytes)
while received_num_bytes < total_num_bytes and i < 10:
line = self.receive().replace(b'>', b'').replace(b'\r',b'')
try:
line_bytes = binascii.unhexlify(line[2:])
received_num_bytes += len(line_bytes)
resp.append(line_bytes)
except binascii.Error:
if self.debug:
print("error: received:", line)
return
# give up if 10 consecutive receives fail to give the correct total number of bytes
i += 1
else:
# single-line message
try:
line_bytes = binascii.unhexlify(line)
resp.append(line_bytes)
except binascii.Error:
if self.debug:
print("error: received:", line)
return
if self.debug:
print("received:", resp)
return resp
def send(self, cmd):
"""
Sends a line.
"""
if self.debug:
print("send", cmd)
bytes_out = (cmd + "\r").encode()
self.ser.write(bytes_out)
def send_and_wait_for_ok(self, cmd):
"""
Sends a command and blocks until it receives OK, or
gives up after 10 tries. Used for AT commands.
"""
if self.debug:
print("send_and_wait_for_ok", cmd)
for i in range(10):
if self.debug:
print("send attempt", i)
self.reset_buffers()
self.send(cmd)
resp = self.receive()
if b"OK" in resp:
if self.debug:
print("received OK, exiting", resp)
return
else:
if self.debug:
print("did not receive OK, received ", resp)
if self.debug:
print("giving up")
def reset_buffers(self):
"""Clears serial input and output buffers."""
if self.debug:
print("reset_buffers")
self.ser.reset_input_buffer()
self.ser.reset_output_buffer()