-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
A first set of features for the scc1 is added. This incldues: - A basic driver for the SCC1 sensor cable that is based on the sensirion_shdc_driver package - A i2c transceiver that can be used to communicate with public python drivers - A driver based on the sf06 api of the SCC1 sensor cable. - Basic tests to work with attached hardware
- Loading branch information
Showing
11 changed files
with
530 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
# -*- coding: utf-8 -*- |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
import math | ||
import struct | ||
import time | ||
from typing import List, Tuple, Optional, Any, cast | ||
|
||
from sensirion_uart_scc1.drivers.slf_common import SlfMeasurementCommand, SlfMode, SLF_PRODUCT_LIQUI_MAP, SlfProduct | ||
from sensirion_uart_scc1.scc1_shdlc_device import Scc1ShdlcDevice | ||
|
||
|
||
class Scc1Sf06: | ||
""" | ||
Scc1 SF06 Sensor Driver | ||
The Scc1 provides features to support the sf06 liquid flow sensors. This driver accesses the sensor through the | ||
API specified by scc1. | ||
""" | ||
SENSOR_TYPE = 3 # SF06 | ||
START_MEASUREMENT_DELAY_S = 0.015 | ||
|
||
def __init__(self, device: Scc1ShdlcDevice, liquid_mode: SlfMode = SlfMode.LIQUI_1) -> None: | ||
self._scc1 = device | ||
self._liquid_config = SLF_PRODUCT_LIQUI_MAP[SlfProduct.SLF3x] | ||
self._serial_number, self._product_id = self._get_serial_number_and_product_id() | ||
self._is_measuring = False | ||
self._sampling_interval_ms = 100 # Default 10Hz | ||
self._liquid_mode = liquid_mode | ||
self._measurement_command = SlfMeasurementCommand.from_mode(self._liquid_mode) | ||
|
||
@property | ||
def serial_number(self) -> int: | ||
""" | ||
:return: The serial number as integer | ||
""" | ||
return self._serial_number | ||
|
||
@property | ||
def product_id(self) -> int: | ||
""" | ||
:return: The product identifier as integer | ||
""" | ||
return self._product_id | ||
|
||
@property | ||
def liquid_mode(self) -> SlfMode: | ||
""" | ||
Liquid measurement mode | ||
""" | ||
return self._liquid_mode | ||
|
||
@liquid_mode.setter | ||
def liquid_mode(self, mode: SlfMode) -> None: | ||
""" | ||
Set liquid measurement mode | ||
:param mode: One of the liquid measurement modes | ||
""" | ||
if not isinstance(mode, SlfMode): | ||
raise ValueError("Invalid liquid mode: {}".format(mode)) | ||
assert not self._is_measuring, "Set liquid mode not allowed while measurement is running" | ||
self._liquid_mode = mode | ||
self._measurement_command = SlfMeasurementCommand.from_mode(self._liquid_mode) | ||
|
||
@property | ||
def liquid_mode_name(self) -> str: | ||
""" | ||
:return: Name of current liquid measurement mode | ||
""" | ||
return self.get_liquid_mode_name(self._liquid_mode) | ||
|
||
def get_liquid_mode_name(self, mode: SlfMode) -> str: | ||
""" | ||
:param mode: A liquid mode | ||
:return: Get name of a specific liquid measurement mode | ||
""" | ||
return self._liquid_config.liqui_mode_name(mode) | ||
|
||
@property | ||
def sampling_interval_ms(self) -> int: | ||
""" | ||
Sampling interval for synchroneous measurement | ||
:return: Current internal sampling interval | ||
""" | ||
return self._sampling_interval_ms | ||
|
||
@sampling_interval_ms.setter | ||
def sampling_interval_ms(self, interval_ms: int): | ||
""" | ||
Set sampling interval for continuous measurement | ||
This will not be applied while measurement is running | ||
:param interval_ms: The requested measurement interval in milli-seconds | ||
""" | ||
self._sampling_interval_ms = interval_ms | ||
|
||
def get_serial_number(self) -> int: | ||
""" | ||
:return: The sensor serial number | ||
""" | ||
return self._serial_number | ||
|
||
def get_flow_unit_and_scale(self, command: int) -> Optional[Tuple[int, int]]: | ||
""" | ||
Get the scale factor, unit and sensor sanity check result of the sensor for the given argument. | ||
(only available on some SF06 sensor products) | ||
:param command: The 16-bit command to read flow unit and scale factor for | ||
:return: A tuple with (scale_factor, flow_unit), None if command is not supported | ||
""" | ||
args = list(struct.pack('>h', command)) | ||
data = self._scc1.transceive(0x53, args, 0.01) | ||
if len(data) != 6: | ||
return None | ||
scale, unit, _ = struct.unpack('>HHH', data) | ||
return scale, unit | ||
|
||
def get_last_measurement(self) -> Optional[Tuple[int, int, int]]: | ||
""" | ||
Read current measurement and starts internal continuous measurement with configured interval, if not | ||
already started. | ||
""" | ||
data = self._scc1.transceive(0x35, [0x03], 0.01) | ||
if not data: | ||
# Measurement not ready | ||
return None | ||
return cast(Tuple[int, int, int], struct.unpack('>hhH', data)) | ||
|
||
def start_continuous_measurement(self, interval_ms=0) -> None: | ||
""" | ||
Start a continuous measurement with a give interval. | ||
:param interval_ms: measurement interval in milliseconds | ||
""" | ||
if self._is_measuring: | ||
return | ||
data = bytearray() | ||
data.extend(bytearray(struct.pack('>H', int(interval_ms)))) | ||
data.extend(bytearray(struct.pack('>H', int(self._measurement_command)))) | ||
self._scc1.transceive(0x33, data, 0.01) | ||
time.sleep(self.START_MEASUREMENT_DELAY_S) | ||
self._is_measuring = True | ||
|
||
def stop_continuous_measurement(self) -> None: | ||
"""Stop continuous measurement""" | ||
if not self._is_measuring: | ||
return | ||
self._scc1.transceive(0x34, [], 0.01) | ||
self._is_measuring = False | ||
|
||
def read_extended_buffer(self) -> Tuple[int, int, List[Tuple[Any, ...]]]: | ||
""" | ||
Read out measurement buffer | ||
:return: A tuple with (bytes_remaining, bytes_los, data) | ||
""" | ||
data = self._scc1.transceive(0x36, [0x03], 0.01) | ||
bytes_lost = int(struct.unpack('>I', data[:4])[0]) | ||
bytes_remaining = int(struct.unpack('>H', data[4:6])[0]) | ||
num_signals = struct.unpack('>H', data[6:8])[0] | ||
num_packets = int(len(data[8:]) / 2 / num_signals) | ||
buffer = list(struct.unpack(">" + "hhh" * num_packets, data[8:])) | ||
m = math.modf(len(buffer) / num_signals) | ||
if m[0]: | ||
raise IOError("Received unexpected amount of data") | ||
num_samples = int(m[1]) | ||
# Output buffer a list of tuples with (flow, temp, flags). | ||
out = [tuple(buffer[i * num_signals:i * num_signals + num_signals]) | ||
for i in range(num_samples)] | ||
return bytes_remaining, bytes_lost, out | ||
|
||
def _get_serial_number_and_product_id(self) -> Tuple[int, int]: | ||
""" | ||
:return: The sensor serial number and product id as tuple | ||
""" | ||
data = self._scc1.transceive(0x50, [], 0.01) | ||
data = data.rstrip(b'\x00').decode('utf-8') | ||
product_id = int(data[:8], 16) | ||
serial_number = int(data[8:], 16) | ||
return serial_number, product_id |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
""" | ||
This file contains channels, gas modes and product specifications used by multiple SLF products | ||
""" | ||
from collections import OrderedDict | ||
from enum import Enum | ||
|
||
|
||
class SlfMode(Enum): | ||
LIQUI_0 = 'Liquid 0' | ||
LIQUI_1 = 'Liquid 1' | ||
LIQUI_2 = 'Liquid 2' | ||
LIQUI_3 = 'Liquid 3' | ||
LIQUI_4 = 'Liquid 4' | ||
LIQUI_5 = 'Liquid 5' | ||
LIQUI_6 = 'Liquid 6' | ||
LIQUI_7 = 'Liquid 7' | ||
LIQUI_8 = 'Liquid 8' | ||
|
||
|
||
class SlfMeasurementCommand(object): | ||
MEASUREMENT_COMMANDS = { | ||
SlfMode.LIQUI_0: 0x3603, | ||
SlfMode.LIQUI_1: 0x3608, | ||
SlfMode.LIQUI_2: 0x3615, | ||
SlfMode.LIQUI_3: 0x361E, | ||
SlfMode.LIQUI_4: 0x3624, | ||
SlfMode.LIQUI_5: 0x362F, | ||
SlfMode.LIQUI_6: 0x3632, | ||
SlfMode.LIQUI_7: 0x3639, | ||
SlfMode.LIQUI_8: 0x3646, | ||
} | ||
|
||
@staticmethod | ||
def from_mode(mode): | ||
return SlfMeasurementCommand.MEASUREMENT_COMMANDS[mode] | ||
|
||
|
||
# See https://webstamm.sensirion.com/ | ||
# product identifier is specified in datasheet | ||
SLF3x_PRODUCT_NAME = { | ||
0x070302: 'SLF3S_1300', | ||
0x070303: 'SLF3S_600', | ||
0x070304: 'SLF3C_1300F', | ||
0x070305: 'SLF3S_4000B', | ||
} | ||
|
||
LD20_PRODUCT_NAME = { | ||
0x070102: 'LD20_2600B', | ||
0x070103: 'LD20-0600L' | ||
} | ||
|
||
SLF3x_PRODUCT_IDS = list(SLF3x_PRODUCT_NAME.keys()) | ||
|
||
LD20_2600B_PRODUCT_IDS = list(LD20_PRODUCT_NAME.keys()) | ||
|
||
|
||
class SlfProduct(Enum): | ||
SLF3x = 'SLF3x' | ||
LD20 = 'LD20-2600B', | ||
|
||
@staticmethod | ||
def from_product_id(product_id): | ||
if product_id in SLF3x_PRODUCT_IDS: | ||
return SlfProduct.SLF3x | ||
elif product_id in LD20_2600B_PRODUCT_IDS: | ||
return SlfProduct.LD20 | ||
return None | ||
|
||
|
||
class SlfProductName: | ||
@staticmethod | ||
def from_product(product: SlfProduct, product_id: int): | ||
if product is SlfProduct.SLF3x: | ||
return SLF3x_PRODUCT_NAME.get(product_id, 'SLF3x') | ||
elif product is SlfProduct.LD20: | ||
return LD20_PRODUCT_NAME.get(product_id, 'LD20') | ||
raise None | ||
|
||
|
||
class SlfLiquiConfig(object): | ||
def __init__(self, config): | ||
""" | ||
:param config: A dictionary of form mode: name, where mode is one of | ||
`~sensirion_uart_scc1.drivers.slf_common.SlfMode` | ||
""" | ||
self._liqui_config = config | ||
self._supported_liquids = list(config.keys()) | ||
|
||
@property | ||
def supported_liqui_modes(self): | ||
return self._supported_liquids | ||
|
||
def liqui_mode_name(self, mode): | ||
return self._liqui_config[mode] | ||
|
||
|
||
SLF_PRODUCT_LIQUI_MAP = { | ||
SlfProduct.SLF3x: SlfLiquiConfig(OrderedDict({ | ||
SlfMode.LIQUI_1: 'Water', | ||
SlfMode.LIQUI_2: 'Isopropyl alcohol', | ||
})), | ||
SlfProduct.LD20: SlfLiquiConfig(OrderedDict({ | ||
SlfMode.LIQUI_1: 'Water', | ||
})) | ||
} | ||
|
||
FLOW_UNIT_PREFIX = { | ||
3: 'n', | ||
4: 'u', | ||
5: 'm', | ||
6: 'c', | ||
7: 'd', | ||
8: '', # 1 | ||
9: '', # 10 | ||
10: 'h', | ||
11: 'k', | ||
12: 'M', | ||
13: 'G' | ||
} | ||
|
||
FLOW_UNIT_TIME_BASE = { | ||
0: '', | ||
1: 'us', | ||
2: 'ms', | ||
3: 's', | ||
4: 'min', | ||
5: 'h', | ||
6: 'day' | ||
} | ||
|
||
FLOW_UNIT_VOLUME = { | ||
0: 'nl', # norm liter | ||
1: 'sl', # standard liter | ||
8: 'l', # liter (typ: liquid flow) | ||
9: 'g', # gram (typ: liquid flow) | ||
} | ||
|
||
|
||
def parse_flow_unit(flow_unit_raw) -> str: | ||
prefix = FLOW_UNIT_PREFIX.get(flow_unit_raw & 0xF, '') | ||
time_base = FLOW_UNIT_TIME_BASE.get((flow_unit_raw >> 4) & 0xF, '') | ||
volume = FLOW_UNIT_VOLUME.get((flow_unit_raw >> 8) & 0xF, '') | ||
return f'{prefix}{volume}/{time_base}' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
# -*- coding: utf-8 -*- |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
from typing import Protocol, Optional, runtime_checkable | ||
|
||
|
||
@runtime_checkable | ||
class I2cTransceiver(Protocol): | ||
|
||
def transceive(self, slave_address: int, tx_data: Optional[bytes], rx_length: int, read_delay: float, | ||
timeout: float) -> Optional[bytes]: | ||
""" | ||
Send data to the sensor and receive back a response. This function can be used for sending or receiving only | ||
as well. | ||
:param slave_address: I2c address of the sensor. | ||
:param tx_data: The data to be sent. In case no data shall be sent, this parameter is supposed to be None | ||
:param rx_length: Length of the repsonse of the sensor. If the request does not have a response, this parameter | ||
may be 0. | ||
:param read_delay: Defines the time in seconds that needs to be observed after sending the data before | ||
the read is initiated. | ||
:param timeout: Defines the time after receiving the response from the sensor before the next command may be | ||
sent. | ||
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
from typing import Protocol, Optional, runtime_checkable | ||
|
||
|
||
@runtime_checkable | ||
class ShdlcTransceiver(Protocol): | ||
|
||
def transceive(self, command: int, data: bytes, timeout: float = -1.0) -> Optional[bytes]: | ||
"""Wrapper method for legacy shdlc-driver compatibility | ||
:param command: The command to send (one byte) | ||
:param data: byte array of the data to send as arguments to the command | ||
:param timeout: response timeout in seconds (-1 for using default value) | ||
:return: The returned data as bytes | ||
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
from struct import pack | ||
from typing import Optional | ||
|
||
from sensirion_uart_scc1.protocols.shdlc_transceiver import ShdlcTransceiver | ||
|
||
|
||
class Scc1I2cTransceiver: | ||
""" | ||
Wrapper that implements the I2cTransceiver protocol. | ||
This wrappers allows to use the public I2c Python drivers drivers wit the SCC1 cable | ||
""" | ||
|
||
def __init__(self, device: ShdlcTransceiver) -> None: | ||
self._scc1 = device | ||
|
||
def transceive(self, slave_address: int, tx_data: Optional[bytes], rx_length: int, read_delay: float, | ||
timeout: float = 0.01) -> bytes: | ||
"""Implements the I2cTransceiver protocol""" | ||
|
||
if tx_data is None: | ||
tx_data = bytearray() | ||
else: | ||
tx_data = bytearray(tx_data) | ||
tx_size = len(tx_data) | ||
cmd_data = bytearray(pack('>BBBH', slave_address, tx_size, rx_length, int(read_delay) * 1000)) | ||
cmd_data.extend(tx_data) | ||
result = self._scc1.transceive(0x2A, cmd_data, timeout) | ||
if not result or rx_length == 0: # no data to return | ||
return bytearray([]) | ||
return result |
Oops, something went wrong.