-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
A first set of features for the scc1 is added. This incldues: - A basic driver for the SCC1 sensor cable that is based on the sensirion_shdc_driver package - A i2c transceiver that can be used to communicate with public python drivers - A driver based on the sf06 api of the SCC1 sensor cable. - Basic tests to work with attached hardware
- Loading branch information
Showing
11 changed files
with
690 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
# -*- coding: utf-8 -*- |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
# -*- coding: utf-8 -*- | ||
# Copyright 2024, Sensirion AG, Switzerland. All rights reserved. | ||
# | ||
# Software source code and algorithms are Sensirion confidential | ||
# information and trade secrets. Licensee shall protect confidentiality | ||
# of software source code and algorithms. | ||
# | ||
# Licensee shall not distribute software and/or algorithms. | ||
# | ||
# Software and algorithms are licensed for use with | ||
# Sensirion sensors only. | ||
# | ||
# Software and algorithms are provided "AS IS" and any and | ||
# all express or implied warranties are disclaimed. | ||
# | ||
# THIS SOFTWARE IS PROVIDED BY SENSIRION "AS IS" AND ANY EXPRESS OR | ||
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES | ||
# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. | ||
# IN NO EVENT SHALL SENSIRION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | ||
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED | ||
# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR | ||
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF | ||
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING | ||
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, | ||
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
|
||
import math | ||
import struct | ||
import time | ||
from typing import List, Tuple, Optional, Any | ||
|
||
from sensirion_uart_scc1.drivers.slf_common import SlfMeasurementCommand, SlfMode, SLF_PRODUCT_LIQUI_MAP, SlfProduct | ||
from sensirion_uart_scc1.scc1_shdlc_device import Scc1ShdlcDevice | ||
|
||
|
||
class Scc1Sf06: | ||
""" | ||
Scc1 SF06 Sensor Driver | ||
The Scc1 provides features to support the sf06 liquid flow sensors. This driver accesses the sensor through the | ||
API specified by scc1. | ||
""" | ||
SENSOR_TYPE = 3 # SF06 | ||
START_MEASUREMENT_DELAY_S = 0.015 | ||
|
||
def __init__(self, device: Scc1ShdlcDevice, liquid_mode: SlfMode = SlfMode.LIQUI_1) -> None: | ||
self._scc1 = device | ||
self._liquid_config = SLF_PRODUCT_LIQUI_MAP[SlfProduct.SLF3x] | ||
self._serial_number, self._product_id = self._get_serial_number_and_product_id() | ||
self._is_measuring = False | ||
self._sampling_interval_ms = 100 # Default 10Hz | ||
self._liquid_mode = liquid_mode | ||
self._measurement_command = SlfMeasurementCommand.from_mode(self._liquid_mode) | ||
|
||
@property | ||
def serial_number(self) -> int: | ||
""" | ||
:return: The serial number as integer | ||
""" | ||
return self._serial_number | ||
|
||
@property | ||
def product_id(self) -> int: | ||
""" | ||
:return: The product identifier as integer | ||
""" | ||
return self._product_id | ||
|
||
@property | ||
def liquid_mode(self) -> SlfMode: | ||
""" | ||
Liquid measurement mode | ||
""" | ||
return self._liquid_mode | ||
|
||
@liquid_mode.setter | ||
def liquid_mode(self, mode: SlfMode) -> None: | ||
""" | ||
Set liquid measurement mode | ||
:param mode: One of the liquid measurement modes | ||
""" | ||
if not isinstance(mode, SlfMode): | ||
raise ValueError("Invalid liquid mode: {}".format(mode)) | ||
assert not self._is_measuring, "Set liquid mode not allowed while measurement is running" | ||
self._liquid_mode = mode | ||
self._measurement_command = SlfMeasurementCommand.from_mode(self._liquid_mode) | ||
|
||
@property | ||
def liquid_mode_name(self) -> str: | ||
""" | ||
:return: Name of current liquid measurement mode | ||
""" | ||
return self.get_liquid_mode_name(self._liquid_mode) | ||
|
||
def get_liquid_mode_name(self, mode: SlfMode) -> str: | ||
""" | ||
:param mode: A liquid mode | ||
:return: Get name of a specific liquid measurement mode | ||
""" | ||
return self._liquid_config.liqui_mode_name(mode) | ||
|
||
@property | ||
def sampling_interval_ms(self) -> int: | ||
""" | ||
Sampling interval for synchroneous measurement | ||
:return: Current internal sampling interval | ||
""" | ||
return self._sampling_interval_ms | ||
|
||
@sampling_interval_ms.setter | ||
def sampling_interval_ms(self, interval_ms: int): | ||
""" | ||
Set sampling interval for continuous measurement | ||
This will not be applied while measurement is running | ||
:param interval_ms: The requested measurement interval in milli-seconds | ||
""" | ||
self._sampling_interval_ms = interval_ms | ||
|
||
def get_serial_number(self) -> int: | ||
""" | ||
:return: The sensor serial number | ||
""" | ||
return self._serial_number | ||
|
||
def get_flow_unit_and_scale(self, command: int) -> Optional[Tuple[int, int]]: | ||
""" | ||
Get the scale factor, unit and sensor sanity check result of the sensor for the given argument. | ||
(only available on some SF06 sensor products) | ||
:param command: The 16-bit command to read flow unit and scale factor for | ||
:return: A tuple with (scale_factor, flow_unit), None if command is not supported | ||
""" | ||
args = list(struct.pack('>h', command)) | ||
data = self._scc1.transceive(0x53, args, 0.01) | ||
if len(data) != 6: | ||
return None | ||
scale, unit, _ = struct.unpack('>HHH', data) | ||
return scale, unit | ||
|
||
def get_last_measurement(self) -> Optional[Tuple[int, int, int]]: | ||
""" | ||
Read current measurement and starts internal continuous measurement with configured interval, if not | ||
already started. | ||
""" | ||
data = self._scc1.transceive(0x35, [0x03], 0.01) | ||
if not data: | ||
# Measurement not ready | ||
return None | ||
struct.unpack('>hhH', data) | ||
|
||
def start_continuous_measurement(self, interval_ms=0) -> None: | ||
""" | ||
Start a continuous measurement with a give interval. | ||
:param interval_ms: measurement interval in milliseconds | ||
""" | ||
if self._is_measuring: | ||
return | ||
data = bytearray() | ||
data.extend(bytearray(struct.pack('>H', int(interval_ms)))) | ||
data.extend(bytearray(struct.pack('>H', int(self._measurement_command)))) | ||
self._scc1.transceive(0x33, data, 0.01) | ||
time.sleep(self.START_MEASUREMENT_DELAY_S) | ||
self._is_measuring = True | ||
|
||
def stop_continuous_measurement(self) -> None: | ||
"""Stop continuous measurement""" | ||
if not self._is_measuring: | ||
return | ||
self._scc1.transceive(0x34, [], 0.01) | ||
self._is_measuring = False | ||
|
||
def read_extended_buffer(self) -> Tuple[int, int, List[Tuple[Any, ...]]]: | ||
""" | ||
Read out measurement buffer | ||
:return: A tuple with (bytes_remaining, bytes_los, data) | ||
""" | ||
data = self._scc1.transceive(0x36, [0x03], 0.01) | ||
bytes_lost = int(struct.unpack('>I', data[:4])[0]) | ||
bytes_remaining = int(struct.unpack('>H', data[4:6])[0]) | ||
num_signals = struct.unpack('>H', data[6:8])[0] | ||
num_packets = int(len(data[8:]) / 2 / num_signals) | ||
buffer = list(struct.unpack(">" + "hhh" * num_packets, data[8:])) | ||
m = math.modf(len(buffer) / num_signals) | ||
if m[0]: | ||
raise IOError("Received unexpected amount of data") | ||
num_samples = int(m[1]) | ||
# Output buffer a list of tuples with (flow, temp, flags). | ||
out = [tuple(buffer[i * num_signals:i * num_signals + num_signals]) | ||
for i in range(num_samples)] | ||
return bytes_remaining, bytes_lost, out | ||
|
||
def _get_serial_number_and_product_id(self) -> Tuple[int, int]: | ||
""" | ||
:return: The sensor serial number and product id as tuple | ||
""" | ||
data = self._scc1.transceive(0x50, [], 0.01) | ||
data = data.rstrip(b'\x00').decode('utf-8') | ||
product_id = int(data[:8], 16) | ||
serial_number = int(data[8:], 16) | ||
return serial_number, product_id |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
# -*- coding: utf-8 -*- | ||
# Copyright 2024, Sensirion AG, Switzerland. All rights reserved. | ||
# | ||
# Software source code and algorithms are Sensirion confidential | ||
# information and trade secrets. Licensee shall protect confidentiality | ||
# of software source code and algorithms. | ||
# | ||
# Licensee shall not distribute software and/or algorithms. | ||
# | ||
# Software and algorithms are licensed for use with | ||
# Sensirion sensors only. | ||
# | ||
# Software and algorithms are provided "AS IS" and any and | ||
# all express or implied warranties are disclaimed. | ||
# | ||
# THIS SOFTWARE IS PROVIDED BY SENSIRION "AS IS" AND ANY EXPRESS OR | ||
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES | ||
# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. | ||
# IN NO EVENT SHALL SENSIRION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, | ||
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED | ||
# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR | ||
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF | ||
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING | ||
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, | ||
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
|
||
""" | ||
This file contains channels, gas modes and product specifications used by multiple SLF products | ||
""" | ||
from collections import OrderedDict | ||
from enum import Enum | ||
|
||
|
||
class SlfMode(Enum): | ||
LIQUI_0 = 'Liquid 0' | ||
LIQUI_1 = 'Liquid 1' | ||
LIQUI_2 = 'Liquid 2' | ||
LIQUI_3 = 'Liquid 3' | ||
LIQUI_4 = 'Liquid 4' | ||
LIQUI_5 = 'Liquid 5' | ||
LIQUI_6 = 'Liquid 6' | ||
LIQUI_7 = 'Liquid 7' | ||
LIQUI_8 = 'Liquid 8' | ||
|
||
|
||
class SlfMeasurementCommand(object): | ||
MEASUREMENT_COMMANDS = { | ||
SlfMode.LIQUI_0: 0x3603, | ||
SlfMode.LIQUI_1: 0x3608, | ||
SlfMode.LIQUI_2: 0x3615, | ||
SlfMode.LIQUI_3: 0x361E, | ||
SlfMode.LIQUI_4: 0x3624, | ||
SlfMode.LIQUI_5: 0x362F, | ||
SlfMode.LIQUI_6: 0x3632, | ||
SlfMode.LIQUI_7: 0x3639, | ||
SlfMode.LIQUI_8: 0x3646, | ||
} | ||
|
||
@staticmethod | ||
def from_mode(mode): | ||
return SlfMeasurementCommand.MEASUREMENT_COMMANDS[mode] | ||
|
||
|
||
# See https://webstamm.sensirion.com/ | ||
# product identifier is specified in datasheet | ||
SLF3x_PRODUCT_NAME = { | ||
0x070302: 'SLF3S_1300', | ||
0x070303: 'SLF3S_600', | ||
0x070304: 'SLF3C_1300F', | ||
0x070305: 'SLF3S_4000B', | ||
} | ||
|
||
LD20_PRODUCT_NAME = { | ||
0x070102: 'LD20_2600B', | ||
0x070103: 'LD20-0600L' | ||
} | ||
|
||
LD25_PRODUCT_NAME = { | ||
0x070105: 'LD25_2600B' | ||
} | ||
|
||
SLF3x_PRODUCT_IDS = list(SLF3x_PRODUCT_NAME.keys()) | ||
|
||
LD20_2600B_PRODUCT_IDS = list(LD20_PRODUCT_NAME.keys()) | ||
|
||
LD25_2600B_PRODUCT_IDS = list(LD25_PRODUCT_NAME.keys()) | ||
|
||
|
||
class SlfProduct(Enum): | ||
SLF3x = 'SLF3x' | ||
LD20 = 'LD20-2600B', | ||
LD25 = 'LD25-2600B' | ||
|
||
@staticmethod | ||
def from_product_id(product_id): | ||
if product_id in SLF3x_PRODUCT_IDS: | ||
return SlfProduct.SLF3x | ||
elif product_id in LD20_2600B_PRODUCT_IDS: | ||
return SlfProduct.LD20 | ||
elif product_id in LD25_2600B_PRODUCT_IDS: | ||
return SlfProduct.LD25 | ||
|
||
return None | ||
|
||
|
||
class SlfProductName: | ||
@staticmethod | ||
def from_product(product: SlfProduct, product_id: int): | ||
if product is SlfProduct.SLF3x: | ||
return SLF3x_PRODUCT_NAME.get(product_id, 'SLF3x') | ||
elif product is SlfProduct.LD20: | ||
return LD20_PRODUCT_NAME.get(product_id, 'LD20') | ||
elif product is SlfProduct.LD25: | ||
return LD25_PRODUCT_NAME.get(product_id, 'LD25') | ||
|
||
raise None | ||
|
||
|
||
class SlfLiquiConfig(object): | ||
def __init__(self, config): | ||
""" | ||
:param config: A dict of form mode: name, where mode is one of | ||
`~shdlc_devices.fluebrig.drivers.slf_common.SlfMode` | ||
""" | ||
self._liqui_config = config | ||
self._supported_liquis = list(config.keys()) | ||
|
||
@property | ||
def supported_liqui_modes(self): | ||
return self._supported_liquis | ||
|
||
def liqui_mode_name(self, mode): | ||
return self._liqui_config[mode] | ||
|
||
|
||
SLF_PRODUCT_LIQUI_MAP = { | ||
SlfProduct.SLF3x: SlfLiquiConfig(OrderedDict({ | ||
SlfMode.LIQUI_1: 'Water', | ||
SlfMode.LIQUI_2: 'Isopropyl alcohol', | ||
})), | ||
SlfProduct.LD20: SlfLiquiConfig(OrderedDict({ | ||
SlfMode.LIQUI_1: 'Water', | ||
})), | ||
SlfProduct.LD25: SlfLiquiConfig(OrderedDict({ | ||
SlfMode.LIQUI_1: 'Water', | ||
})), | ||
} | ||
|
||
FLOW_UNIT_PREFIX = { | ||
3: 'n', | ||
4: 'u', | ||
5: 'm', | ||
6: 'c', | ||
7: 'd', | ||
8: '', # 1 | ||
9: '', # 10 | ||
10: 'h', | ||
11: 'k', | ||
12: 'M', | ||
13: 'G' | ||
} | ||
|
||
FLOW_UNIT_TIME_BASE = { | ||
0: '', | ||
1: 'us', | ||
2: 'ms', | ||
3: 's', | ||
4: 'min', | ||
5: 'h', | ||
6: 'day' | ||
} | ||
|
||
FLOW_UNIT_VOLUME = { | ||
0: 'nl', # norm liter | ||
1: 'sl', # standard liter | ||
8: 'l', # liter (typ: liquid flow) | ||
9: 'g', # gram (typ: liquid flow) | ||
} | ||
|
||
|
||
def parse_flow_unit(flow_unit_raw) -> str: | ||
prefix = FLOW_UNIT_PREFIX.get(flow_unit_raw & 0xF, '') | ||
time_base = FLOW_UNIT_TIME_BASE.get((flow_unit_raw >> 4) & 0xF, '') | ||
volume = FLOW_UNIT_VOLUME.get((flow_unit_raw >> 8) & 0xF, '') | ||
return f'{prefix}{volume}/{time_base}' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
# -*- coding: utf-8 -*- |
Oops, something went wrong.