Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

WIP: PIDs

  • Loading branch information...
commit 3c31da76f658a16ca120d65dd06b3ce9408b0561 1 parent 8f19567
@djs authored
Showing with 71 additions and 12 deletions.
  1. +2 −1  elm.py
  2. +50 −8 obdii.py
  3. +1 −1  test_elm.py
  4. +18 −2 test_obdii.py
View
3  elm.py
@@ -69,7 +69,8 @@ def send_control_command(self, command):
return self.send(self.AT_PREFIX + command)
def send_obdii_command(self, command):
- text = self.send(command).strip()
+ cmd = ['%.2X' % x for x in command]
+ text = self.send(''.join(cmd)).strip()
if text.find('\n') != -1:
raise NotImplementedError, "multiline response not yet supported"
View
58 obdii.py
@@ -12,12 +12,20 @@ class UnexpectedDataValue(UnexpectedResponse):
pass
class Obdii(object):
+ supported_pids = { 0x01 : {},
+ 0x05 : {}
+ }
def __init__(self, adapter):
self.adapter = adapter
+ def _get_response(self, command):
+ response = self.adapter.send_obdii_command(command)
+ data = self._parse_response_data(command, response)
+
+ return data
+
def get_current_ect(self):
- response = self.adapter.send_obdii_command('0105')
- data = self._parse_response_data('0105', response)
+ data = self._get_response([0x01, 0x05])
if len(data) != 1:
raise UnexpectedDataValue
@@ -25,8 +33,7 @@ def get_current_ect(self):
return data[0] - 40
def get_current_engine_rpm(self):
- response = self.adapter.send_obdii_command('010C')
- data = self._parse_response_data('010C', response)
+ data = self._get_response([0x01, 0x0C])
if len(data) != 2:
raise UnexpectedDataValue
@@ -34,15 +41,50 @@ def get_current_engine_rpm(self):
return ((data[0] << 8) + (data[1])) / 4
def _parse_response_data(self, command, response):
- command = command.strip()
- cmd = [int(command[i:i + 2], 16) for i in range(0, len(command), 2)]
+ #command = command.strip()
+ #cmd = [int(command[i:i + 2], 16) for i in range(0, len(command), 2)]
- if response[0] != cmd[0] + 0x40:
+ if response[0] != command[0] + 0x40:
raise UnexpectedModeResponse
- if response[1] != cmd[1]:
+ if response[1] != command[1]:
raise UnexpectedPIDResponse
return response[2:]
+ def _read_supported_pids(self):
+ control_pids = [0x00, 0x20, 0x40, 0x60, 0x80, 0xA0, 0xC0]
+
+ mode = 0x01
+ for cp in control_pids:
+ if cp == 0x00 or self.is_pid_supported(mode, cp):
+ self.supported_pids[mode][cp] = self._get_response([mode, cp])
+ else:
+ break
+
+ def is_pid_supported(self, mode, pid):
+ if pid > 0xE0:
+ raise ValueError
+
+ if pid == 0:
+ return True
+
+ cp = ((pid - 1) / 0x20) * 0x20
+ co = pid - cp - 1
+ cs = 3 - (co / 8)
+
+ if self.supported_pids[mode][cp][cs] & (1 << ((co % 8) - 1)):
+ return True
+ else:
+ return False
+
+ def get_supported_pids(self, mode):
+ supported = []
+
+ for pid in range(0, 0xe0):
+ if self.is_pid_supported(mode, pid):
+ supported.append(pid)
+
+ return supported
+
View
2  test_elm.py
@@ -83,7 +83,7 @@ def test_get_device_info(self):
def test_send_obdii_command(self):
expected_response = [0x41, 0x0c, 0x1a, 0xf8]
- data = self.elm.send_obdii_command('010C')
+ data = self.elm.send_obdii_command([0x01, 0x0c])
assert data == expected_response
View
20 test_obdii.py
@@ -18,19 +18,35 @@ def setup_method(self, method):
def test_get_current_ect(self):
(self.mock.should_receive('send_obdii_command')
- .with_args('0105')
+ .with_args([0x01, 0x05])
.and_return([0x41, 0x05, 0x7b]))
temperature = self.obd.get_current_ect()
assert temperature == 0x7b - 40
def test_get_current_engine_rpm(self):
(self.mock.should_receive('send_obdii_command')
- .with_args('010C')
+ .with_args([0x01, 0x0c])
.and_return([0x41, 0x0c, 0x1a, 0xf8]))
rpm = self.obd.get_current_engine_rpm()
assert rpm == 1726
+ def test_is_pid_supported(self):
+ self.obd.supported_pids[0x01][0] = [0x28, 0x00, 0x01, 0x00]
+
+ assert self.obd.is_pid_supported(0x01, 0x1e)
+ assert not self.obd.is_pid_supported(0x01, 0x1f)
+ assert self.obd.is_pid_supported(0x01, 0x1c)
+ assert self.obd.is_pid_supported(0x01, 0x09)
+ assert not self.obd.is_pid_supported(0x01, 0x0a)
+
+ def test_read_supported_pids(self):
+ (self.mock.should_receive('send_obdii_command')
+ .with_args([0x01, 0x00])
+ .and_return([0x41, 0x00, 0x98, 0x18, 0x00, 0x13]))
+
+ self.obd._read_supported_pids()
+
Please sign in to comment.
Something went wrong with that request. Please try again.