-
Notifications
You must be signed in to change notification settings - Fork 0
/
obdii.py
106 lines (72 loc) · 2.55 KB
/
obdii.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
class UnexpectedResponse(ValueError):
pass
class UnexpectedModeResponse(UnexpectedResponse):
pass
class UnexpectedPIDResponse(UnexpectedResponse):
pass
class UnexpectedDataValue(UnexpectedResponse):
pass
class Obdii(object):
supported_pids = { 0x01 : {},
0x05 : {}
}
def __init__(self, adapter):
self.adapter = adapter
def _get_response(self, command):
response = self.adapter.send_obdii_command(command)
data = self._parse_response_data(command, response)
return data
def get_current_ect(self):
data = self._get_response([0x01, 0x05])
if len(data) != 1:
raise UnexpectedDataValue
return data[0] - 40
def get_current_engine_rpm(self):
data = self._get_response([0x01, 0x0C])
if len(data) != 2:
raise UnexpectedDataValue
return ((data[0] << 8) + (data[1])) / 4
def get_vehicle_speed(self):
data = self._get_response([0x01, 0x0D])
if len(data) != 1:
raise UnexpectedDataValue
return data[0]
def get_throttle_position(self):
data = self._get_response([0x01, 0x11])
if len(data) != 1:
raise UnexpectedDataValue
return (data[0] * 100.0) / 255
def _parse_response_data(self, command, response):
#command = command.strip()
#cmd = [int(command[i:i + 2], 16) for i in range(0, len(command), 2)]
if response[0] != command[0] + 0x40:
raise UnexpectedModeResponse
if response[1] != command[1]:
raise UnexpectedPIDResponse
return response[2:]
def _read_supported_pids(self):
control_pids = [0x00, 0x20, 0x40, 0x60, 0x80, 0xA0, 0xC0]
mode = 0x01
for cp in control_pids:
if cp == 0x00 or self.is_pid_supported(mode, cp):
self.supported_pids[mode][cp] = self._get_response([mode, cp])
else:
break
def is_pid_supported(self, mode, pid):
if pid > 0xE0:
raise ValueError
if pid == 0:
return True
cp = ((pid - 1) / 0x20) * 0x20
co = pid - cp - 1
cs = 3 - (co / 8)
if self.supported_pids[mode][cp][cs] & (1 << ((co % 8) - 1)):
return True
else:
return False
def get_supported_pids(self, mode):
supported = []
for pid in range(0, 0xe0):
if self.is_pid_supported(mode, pid):
supported.append(pid)
return supported