-
Notifications
You must be signed in to change notification settings - Fork 28
/
ds18x20.py
169 lines (122 loc) · 5 KB
/
ds18x20.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# -*- coding: utf-8 -*-
# (c) 2019 Richard Pobering <richard@hiveeyes.org>
# (c) 2019 Andreas Motl <andreas@hiveeyes.org>
# License: GNU General Public License, Version 3
import time
from binascii import hexlify
from terkin import logging
from terkin.sensor import AbstractSensor
from terkin.sensor.core import OneWireBus
from terkin.util import get_platform_info
platform_info = get_platform_info()
log = logging.getLogger(__name__)
class DS18X20Sensor(AbstractSensor):
"""A generic DS18B20 sensor component."""
def acquire_bus(self, bus):
"""
:param bus:
"""
self.bus = bus
def start(self):
""" """
if self.bus is None:
raise KeyError("Bus missing for DS18X20Sensor")
# Initialize the DS18x20 hardware driver.
try:
if platform_info.vendor == platform_info.MICROPYTHON.Vanilla:
import ds18x20
#self.driver = ds18x20.DS18X20(self.ow)
# TBD there is some difference between uPy & Pycom
elif platform_info.vendor == platform_info.MICROPYTHON.Pycom:
from onewire import DS18X20
self.driver = DS18X20(self.bus.adapter)
else:
raise NotImplementedError('DS18X20 is '
'not implemented on this platform')
return True
except Exception as ex:
log.exc(ex, 'DS18X20 hardware driver failed')
def read(self):
""" """
if self.bus is None or self.driver is None:
return self.SENSOR_NOT_INITIALIZED
# TODO: Review device reading re. glitches and timing.
log.info('Acquire readings from all DS18X20 sensors attached to bus "{}"'.format(self.bus.name))
devices = self.start_reading()
time.sleep(1)
data = self.read_devices(devices)
if not data:
log.warning('No data from any DS18X20 devices on bus "{}"'.format(self.bus.name))
log.debug('Data from 1-Wire bus "{}" is "{}"'.format(self.bus.name, data))
return data
def start_reading(self):
""" """
log.info('Start conversion for DS18X20 devices on bus "{}"'.format(self.bus.name))
effective_devices = []
for device in self.bus.devices:
address = OneWireBus.device_address_ascii(device)
device_settings = self.get_device_settings(address)
enabled = device_settings.get('enabled')
if enabled is False:
log.info('Skipping DS18X20 device "{}"'.format(address))
continue
self.driver.start_conversion(device)
effective_devices.append(device)
return effective_devices
def read_devices(self, devices):
"""
:param devices:
"""
data = {}
for device in devices:
address = OneWireBus.device_address_ascii(device)
log.info('Reading DS18X20 device "{}"'.format(address))
try:
value = self.driver.read_temp_async(device)
except Exception as ex:
log.exc(ex, "Reading DS18X20 device {} failed".format(address))
continue
# Evaluate device response.
if value is not None:
try:
# Compute telemetry field name.
fieldname = self.format_fieldname('temperature', address)
# Apply value offset.
offset = self.get_setting(address, 'offset')
if offset is not None:
log.info('Adding offset {} to value {} from device "{}"'.format(offset, value, address))
value += offset
# Add value to telemetry message.
data[fieldname] = value
except Exception as ex:
log.exc(ex, 'Processing data from DS18X20 device "{}" failed'.format(address))
continue
else:
log.warning('No response from DS18X20 device "{}"'.format(address))
return data
def get_setting(self, address, name, default=None):
"""
:param address:
:param name:
:param default: (Default value = None)
"""
settings = self.get_device_settings(address)
value = settings.get(name, default)
return value
def get_device_settings(self, address):
"""
:param address:
"""
# Compute ASCII representation of device address.
address = OneWireBus.device_address_ascii(address)
# Get device-specific settings from configuration.
for device_settings in self.settings.get('devices', []):
if device_settings['address'].lower() == address:
return device_settings
return {}
def get_device_description(self, address):
"""
:param address:
"""
device_settings = self.get_device_settings(address)
return device_settings.get('description')