forked from glucometers-tech/glucometerutils
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
td4277: new driver for rebranded TaiDoc TD-4277 glucometers.
This driver supports the GlucoRx Nexus, Menarini GlucoMen Nexus, and Aktivmed GlucoCheck XL.
- Loading branch information
Showing
3 changed files
with
291 additions
and
17 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,234 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# SPDX-License-Identifier: MIT | ||
"""Driver for TaiDoc TD-4277 devices. | ||
Supported features: | ||
- get readings, including pre-/post-meal notes; | ||
- get and set date and time; | ||
- get serial number (partial); | ||
- memory reset (caution!) | ||
Expected device path: 0001:001c:00 (libusb), /dev/hidraw1 (Linux). | ||
""" | ||
|
||
import binascii | ||
import datetime | ||
import enum | ||
import functools | ||
import logging | ||
import operator | ||
|
||
import construct | ||
|
||
from glucometerutils import common | ||
from glucometerutils import exceptions | ||
from glucometerutils.support import serial | ||
|
||
class Direction(enum.Enum): | ||
In = 0xa5 | ||
Out = 0xa3 | ||
|
||
|
||
def byte_checksum(data): | ||
return functools.reduce(operator.add, data) & 0xFF | ||
|
||
|
||
_PACKET = construct.Struct( | ||
'data' / construct.RawCopy( | ||
construct.Struct( | ||
construct.Const(b'\x51'), | ||
'command' / construct.Byte, | ||
'message' / construct.Bytes(4), | ||
'direction' / construct.Mapping( | ||
construct.Byte, | ||
{e: e.value for e in Direction}), | ||
), | ||
), | ||
'checksum' / construct.Checksum( | ||
construct.Byte, byte_checksum, construct.this.data.data), | ||
) | ||
|
||
_EMPTY_MESSAGE = 0 | ||
|
||
_CONNECT_REQUEST = 0x22 | ||
_VALID_CONNECT_RESPONSE = {0x22, 0x24, 0x54} | ||
|
||
_GET_DATETIME = 0x23 | ||
_SET_DATETIME = 0x33 | ||
|
||
_GET_MODEL = 0x24 | ||
|
||
_GET_READING_COUNT = 0x2b | ||
_GET_READING_DATETIME = 0x25 | ||
_GET_READING_VALUE = 0x26 | ||
|
||
_CLEAR_MEMORY = 0x52 | ||
|
||
_MODEL_STRUCT = construct.Struct( | ||
construct.Const(b'\x77\x42'), | ||
construct.Byte, | ||
construct.Byte, | ||
) | ||
|
||
_DATETIME_STRUCT = construct.Struct( | ||
'day' / construct.Int16ul, | ||
'minute' / construct.Byte, | ||
'hour' / construct.Byte, | ||
) | ||
|
||
_DAY_BITSTRUCT = construct.BitStruct( | ||
'year' / construct.BitsInteger(7), | ||
'month' / construct.BitsInteger(4), | ||
'day' / construct.BitsInteger(5), | ||
) | ||
|
||
_READING_COUNT_STRUCT = construct.Struct( | ||
'count' / construct.Int16ul, | ||
construct.Int16ul, | ||
) | ||
|
||
_READING_SELECTION_STRUCT = construct.Struct( | ||
'record_id' / construct.Int16ul, | ||
construct.Const(b'\x00\x00'), | ||
) | ||
|
||
_MEAL_FLAG = { | ||
common.Meal.NONE: 0x00, | ||
common.Meal.BEFORE: 0x40, | ||
common.Meal.AFTER: 0x80, | ||
} | ||
|
||
_READING_VALUE_STRUCT = construct.Struct( | ||
'value' / construct.Int16ul, | ||
construct.Const(b'\x06'), | ||
'meal'/ construct.Mapping( | ||
construct.Byte, _MEAL_FLAG), | ||
) | ||
|
||
def _make_packet(command, message, direction=Direction.Out): | ||
return _PACKET.build( | ||
{'data': { | ||
'value': { | ||
'command': command, | ||
'message': message, | ||
'direction': direction, | ||
}, | ||
}}) | ||
|
||
|
||
def _parse_datetime(message): | ||
date = _DATETIME_STRUCT.parse(message) | ||
# We can't parse the day properly with a single pass of Construct | ||
# unfortunately. | ||
day = _DAY_BITSTRUCT.parse(construct.Int16ub.build(date.day)) | ||
return datetime.datetime( | ||
2000+day.year, day.month, day.day, date.hour, date.minute) | ||
|
||
|
||
def _select_record(record_id): | ||
return _READING_SELECTION_STRUCT.build({'record_id': record_id}) | ||
|
||
|
||
class Device(serial.SerialDevice): | ||
BAUDRATE = 19200 | ||
TIMEOUT = 0.5 | ||
|
||
def __init__(self, device): | ||
super(Device, self).__init__('cp2110://' + device) | ||
self.buffered_reader_ = construct.Rebuffered( | ||
_PACKET, tailcutoff=1024) | ||
|
||
def _send_command( | ||
self, command, message=_EMPTY_MESSAGE, validate_response=True): | ||
pkt = _make_packet(command, message) | ||
logging.debug('sending packet: %s', binascii.hexlify(pkt)) | ||
|
||
self.serial_.write(pkt) | ||
self.serial_.flush() | ||
response = self.buffered_reader_.parse_stream(self.serial_) | ||
logging.debug('received packet: %r', response) | ||
|
||
if validate_response and response.data.value.command != command: | ||
raise InvalidResponse(response) | ||
|
||
return response.data.value.command, response.data.value.message | ||
|
||
def connect(self): | ||
response_command, message = self._send_command( | ||
_CONNECT_REQUEST, validate_response=False) | ||
if response_command not in _VALID_CONNECT_RESPONSE: | ||
raise exceptions.ConnectionFailed( | ||
'Invalid response received: %2x %r' % ( | ||
response_command, message)) | ||
|
||
_, model_message = self._send_command(_GET_MODEL) | ||
try: | ||
_MODEL_STRUCT.parse(model_message) | ||
except construct.ConstructError: | ||
raise exceptions.ConnectionFailed( | ||
'Invalid model identified: %r' % model_message) | ||
|
||
def disconnect(self): | ||
pass | ||
|
||
def get_meter_info(self): | ||
return common.MeterInfo('TaiDoc TD-4277 glucometer') | ||
|
||
def get_version(self): # pylint: disable=no-self-use | ||
raise NotImplementedError | ||
|
||
def get_serial_number(self): # pylint: disable=no-self-use | ||
raise NotImplementedError | ||
|
||
def get_datetime(self): | ||
_, message = self._send_command(_GET_DATETIME) | ||
|
||
return _parse_datetime(message) | ||
|
||
def set_datetime(self, date=datetime.datetime.now()): | ||
assert date.year >= 2000 | ||
|
||
day_struct = _DAY_BITSTRUCT.build({ | ||
'year': date.year - 2000, | ||
'month': date.month, | ||
'day': date.day, | ||
}) | ||
|
||
day_word = construct.Int16ub.parse(day_struct) | ||
|
||
date_message = _DATETIME_STRUCT.build({ | ||
'day': day_word, | ||
'minute': date.minute, | ||
'hour': date.hour}) | ||
|
||
_, message = self._send_command(_SET_DATETIME, message=date_message) | ||
|
||
return _parse_datetime(message) | ||
|
||
def _get_reading_count(self): | ||
_, message = self._send_command(_GET_READING_COUNT) | ||
|
||
return _READING_COUNT_STRUCT.parse(message).count | ||
|
||
def _get_reading(self, record_id): | ||
_, reading_date_message = self._send_command( | ||
_GET_READING_DATETIME, | ||
_select_record(record_id)) | ||
reading_date = _parse_datetime(reading_date_message) | ||
|
||
_, reading_value_message = self._send_command( | ||
_GET_READING_VALUE, | ||
_select_record(record_id)) | ||
reading_value = _READING_VALUE_STRUCT.parse(reading_value_message) | ||
|
||
return common.GlucoseReading( | ||
reading_date, reading_value.value, meal=reading_value.meal) | ||
|
||
def get_readings(self): | ||
record_count = self._get_reading_count() | ||
for record_id in range(record_count): | ||
yield self._get_reading(record_id) | ||
|
||
def zero_log(self): | ||
self._send_command(_CLEAR_MEMORY) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# SPDX-License-Identifier: MIT | ||
"""Tests for the TD-4277 driver.""" | ||
|
||
# pylint: disable=protected-access,missing-docstring | ||
|
||
import datetime | ||
|
||
from absl.testing import parameterized | ||
|
||
from glucometerutils.drivers import td4277 | ||
from glucometerutils.support import lifescan | ||
from glucometerutils import exceptions | ||
|
||
|
||
class TestTD4277Nexus(parameterized.TestCase): | ||
|
||
@parameterized.parameters( | ||
(b'\x21\x24\x0e\x15', datetime.datetime(2018, 1, 1, 21, 14)), | ||
(b'\x21\x26\x0e\x15', datetime.datetime(2019, 1, 1, 21, 14)), | ||
(b'\x04\x27\x25\x0d', datetime.datetime(2019, 8, 4, 13, 37)), | ||
) | ||
def test_parse_datetime(self, message, date): | ||
self.assertEqual(td4277._parse_datetime(message), | ||
date) | ||
|
||
def test_making_message(self): | ||
self.assertEqual( | ||
td4277._make_packet(0x22, 0), | ||
b'\x51\x22\x00\x00\x00\x00\xa3\x16') |