From cc55e8f2edafd30102c31dd4b337deab6badd6c8 Mon Sep 17 00:00:00 2001 From: Rolf Laich Date: Mon, 12 Feb 2024 16:03:00 +0100 Subject: [PATCH] Add basic features A first set of features for the scc1 is added. This incldues: - A basic driver for the SCC1 sensor cable that is based on the sensirion_shdc_driver package - A i2c transceiver that can be used to communicate with public python drivers - A driver based on the sf06 api of the SCC1 sensor cable. - Basic tests to work with attached hardware --- sensirion_uart_scc1/drivers/__init__.py | 1 + sensirion_uart_scc1/drivers/scc1_sf06.py | 199 ++++++++++++++++++ sensirion_uart_scc1/drivers/slf_common.py | 185 ++++++++++++++++ sensirion_uart_scc1/protocols/__init__.py | 1 + .../protocols/i2c_transceiver.py | 47 +++++ .../protocols/shdlc_transceiver.py | 39 ++++ sensirion_uart_scc1/scc1_i2c_transceiver.py | 56 +++++ sensirion_uart_scc1/scc1_shdlc_device.py | 99 +++++++++ tests/conftest.py | 23 ++ tests/test_scc1_shdlc_device.py | 9 + tests/test_sf06.py | 31 +++ 11 files changed, 690 insertions(+) create mode 100644 sensirion_uart_scc1/drivers/__init__.py create mode 100644 sensirion_uart_scc1/drivers/scc1_sf06.py create mode 100644 sensirion_uart_scc1/drivers/slf_common.py create mode 100644 sensirion_uart_scc1/protocols/__init__.py create mode 100644 sensirion_uart_scc1/protocols/i2c_transceiver.py create mode 100644 sensirion_uart_scc1/protocols/shdlc_transceiver.py create mode 100644 sensirion_uart_scc1/scc1_i2c_transceiver.py create mode 100644 sensirion_uart_scc1/scc1_shdlc_device.py create mode 100644 tests/test_scc1_shdlc_device.py create mode 100644 tests/test_sf06.py diff --git a/sensirion_uart_scc1/drivers/__init__.py b/sensirion_uart_scc1/drivers/__init__.py new file mode 100644 index 0000000..40a96af --- /dev/null +++ b/sensirion_uart_scc1/drivers/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/sensirion_uart_scc1/drivers/scc1_sf06.py b/sensirion_uart_scc1/drivers/scc1_sf06.py new file mode 100644 index 0000000..20a81b9 --- /dev/null +++ b/sensirion_uart_scc1/drivers/scc1_sf06.py @@ -0,0 +1,199 @@ +# -*- coding: utf-8 -*- +# Copyright 2024, Sensirion AG, Switzerland. All rights reserved. +# +# Software source code and algorithms are Sensirion confidential +# information and trade secrets. Licensee shall protect confidentiality +# of software source code and algorithms. +# +# Licensee shall not distribute software and/or algorithms. +# +# Software and algorithms are licensed for use with +# Sensirion sensors only. +# +# Software and algorithms are provided "AS IS" and any and +# all express or implied warranties are disclaimed. +# +# THIS SOFTWARE IS PROVIDED BY SENSIRION "AS IS" AND ANY EXPRESS OR +# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES +# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL SENSIRION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED +# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, +# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import math +import struct +import time +from typing import Tuple, Optional, Any + +from sensirion_uart_scc1.drivers.slf_common import SlfMeasurementCommand, SlfMode, SLF_PRODUCT_LIQUI_MAP, SlfProduct +from sensirion_uart_scc1.scc1_shdlc_device import Scc1ShdlcDevice + + +class Scc1Sf06: + """ + Scc1 SF06 Sensor Driver + + The Scc1 provides features to support the sf06 liquid flow sensors. This driver accesses the sensor through the + API specified by scc1. + """ + SENSOR_TYPE = 3 # SF06 + START_MEASUREMENT_DELAY_S = 0.015 + + def __init__(self, device: Scc1ShdlcDevice, liquid_mode: SlfMode = SlfMode.LIQUI_1) -> None: + self._scc1 = device + self._liquid_config = SLF_PRODUCT_LIQUI_MAP[SlfProduct.SLF3x] + self._serial_number, self._product_id = self._get_serial_number_and_product_id() + self._is_measuring = False + self._sampling_interval_ms = 100 # Default 10Hz + self._liquid_mode = liquid_mode + self._measurement_command = SlfMeasurementCommand.from_mode(self._liquid_mode) + + @property + def serial_number(self) -> int: + """ + :return: The serial number as integer + """ + return self._serial_number + + @property + def product_id(self) -> int: + """ + :return: The product identifier as integer + """ + return self._product_id + + @property + def liquid_mode(self) -> SlfMode: + """ + Liquid measurement mode + """ + return self._liquid_mode + + @liquid_mode.setter + def liquid_mode(self, mode: SlfMode) -> None: + """ + Set liquid measurement mode + :param mode: One of the liquid measurement modes + """ + if not isinstance(mode, SlfMode): + raise ValueError("Invalid liquid mode: {}".format(mode)) + assert not self._is_measuring, "Set liquid mode not allowed while measurement is running" + self._liquid_mode = mode + self._measurement_command = SlfMeasurementCommand.from_mode(self._liquid_mode) + + @property + def liquid_mode_name(self) -> str: + """ + :return: Name of current liquid measurement mode + """ + return self.get_liquid_mode_name(self._liquid_mode) + + def get_liquid_mode_name(self, mode: SlfMode) -> str: + """ + :param mode: A liquid mode + :return: Get name of a specific liquid measurement mode + """ + return self._liquid_config.liqui_mode_name(mode) + + @property + def sampling_interval_ms(self) -> int: + """ + Sampling interval for synchroneous measurement + :return: Current internal sampling interval + """ + return self._sampling_interval_ms + + @sampling_interval_ms.setter + def sampling_interval_ms(self, interval_ms: int): + """ + Set sampling interval for continuous measurement + This will not be applied while measurement is running + :param interval_ms: The requested measurement interval in milli-seconds + """ + self._sampling_interval_ms = interval_ms + + def get_serial_number(self) -> int: + """ + :return: The sensor serial number + """ + return self._serial_number + + def get_flow_unit_and_scale(self, command: int) -> Optional[Tuple[int, int]]: + """ + Get the scale factor, unit and sensor sanity check result of the sensor for the given argument. + (only available on some SF06 sensor products) + :param command: The 16-bit command to read flow unit and scale factor for + :return: A tuple with (scale_factor, flow_unit), None if command is not supported + """ + args = list(struct.pack('>h', command)) + data = self._scc1.transceive(0x53, args, 0.01) + if len(data) != 6: + return None + scale, unit, _ = struct.unpack('>HHH', data) + return scale, unit + + def get_last_measurement(self) -> Optional[Tuple[int, int, int]]: + """ + Read current measurement and starts internal continuous measurement with configured interval, if not + already started. + """ + data = self._scc1.transceive(0x35, [0x03], 0.01) + if not data: + # Measurement not ready + return None + struct.unpack('>hhH', data) + + def start_continuous_measurement(self, interval_ms=0) -> None: + """ + Start a continuous measurement with a give interval. + :param interval_ms: measurement interval in milliseconds + """ + if self._is_measuring: + return + data = bytearray() + data.extend(bytearray(struct.pack('>H', int(interval_ms)))) + data.extend(bytearray(struct.pack('>H', int(self._measurement_command)))) + self._scc1.transceive(0x33, data, 0.01) + time.sleep(self.START_MEASUREMENT_DELAY_S) + self._is_measuring = True + + def stop_continuous_measurement(self) -> None: + """Stop continuous measurement""" + if not self._is_measuring: + return + self._scc1.transceive(0x34, [], 0.01) + self._is_measuring = False + + def read_extended_buffer(self) -> Tuple[int, int, list[Tuple[Any, ...]]]: + """ + Read out measurement buffer + :return: A tuple with (bytes_remaining, bytes_los, data) + """ + data = self._scc1.transceive(0x36, [0x03], 0.01) + bytes_lost = int(struct.unpack('>I', data[:4])[0]) + bytes_remaining = int(struct.unpack('>H', data[4:6])[0]) + num_signals = struct.unpack('>H', data[6:8])[0] + num_packets = int(len(data[8:]) / 2 / num_signals) + buffer = list(struct.unpack(">" + "hhh" * num_packets, data[8:])) + m = math.modf(len(buffer) / num_signals) + if m[0]: + raise IOError("Received unexpected amount of data") + num_samples = int(m[1]) + # Output buffer a list of tuples with (flow, temp, flags). + out = [tuple(buffer[i * num_signals:i * num_signals + num_signals]) + for i in range(num_samples)] + return bytes_remaining, bytes_lost, out + + def _get_serial_number_and_product_id(self) -> Tuple[int, int]: + """ + :return: The sensor serial number and product id as tuple + """ + data = self._scc1.transceive(0x50, [], 0.01) + data = data.rstrip(b'\x00').decode('utf-8') + product_id = int(data[:8], 16) + serial_number = int(data[8:], 16) + return serial_number, product_id diff --git a/sensirion_uart_scc1/drivers/slf_common.py b/sensirion_uart_scc1/drivers/slf_common.py new file mode 100644 index 0000000..edc6523 --- /dev/null +++ b/sensirion_uart_scc1/drivers/slf_common.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- +# Copyright 2024, Sensirion AG, Switzerland. All rights reserved. +# +# Software source code and algorithms are Sensirion confidential +# information and trade secrets. Licensee shall protect confidentiality +# of software source code and algorithms. +# +# Licensee shall not distribute software and/or algorithms. +# +# Software and algorithms are licensed for use with +# Sensirion sensors only. +# +# Software and algorithms are provided "AS IS" and any and +# all express or implied warranties are disclaimed. +# +# THIS SOFTWARE IS PROVIDED BY SENSIRION "AS IS" AND ANY EXPRESS OR +# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES +# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL SENSIRION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED +# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, +# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +This file contains channels, gas modes and product specifications used by multiple SLF products +""" +from collections import OrderedDict +from enum import Enum + + +class SlfMode(Enum): + LIQUI_0 = 'Liquid 0' + LIQUI_1 = 'Liquid 1' + LIQUI_2 = 'Liquid 2' + LIQUI_3 = 'Liquid 3' + LIQUI_4 = 'Liquid 4' + LIQUI_5 = 'Liquid 5' + LIQUI_6 = 'Liquid 6' + LIQUI_7 = 'Liquid 7' + LIQUI_8 = 'Liquid 8' + + +class SlfMeasurementCommand(object): + MEASUREMENT_COMMANDS = { + SlfMode.LIQUI_0: 0x3603, + SlfMode.LIQUI_1: 0x3608, + SlfMode.LIQUI_2: 0x3615, + SlfMode.LIQUI_3: 0x361E, + SlfMode.LIQUI_4: 0x3624, + SlfMode.LIQUI_5: 0x362F, + SlfMode.LIQUI_6: 0x3632, + SlfMode.LIQUI_7: 0x3639, + SlfMode.LIQUI_8: 0x3646, + } + + @staticmethod + def from_mode(mode): + return SlfMeasurementCommand.MEASUREMENT_COMMANDS[mode] + + +# See https://webstamm.sensirion.com/ +# product identifier is specified in datasheet +SLF3x_PRODUCT_NAME = { + 0x070302: 'SLF3S_1300', + 0x070303: 'SLF3S_600', + 0x070304: 'SLF3C_1300F', + 0x070305: 'SLF3S_4000B', +} + +LD20_PRODUCT_NAME = { + 0x070102: 'LD20_2600B', + 0x070103: 'LD20-0600L' +} + +LD25_PRODUCT_NAME = { + 0x070105: 'LD25_2600B' +} + +SLF3x_PRODUCT_IDS = list(SLF3x_PRODUCT_NAME.keys()) + +LD20_2600B_PRODUCT_IDS = list(LD20_PRODUCT_NAME.keys()) + +LD25_2600B_PRODUCT_IDS = list(LD25_PRODUCT_NAME.keys()) + + +class SlfProduct(Enum): + SLF3x = 'SLF3x' + LD20 = 'LD20-2600B', + LD25 = 'LD25-2600B' + + @staticmethod + def from_product_id(product_id): + if product_id in SLF3x_PRODUCT_IDS: + return SlfProduct.SLF3x + elif product_id in LD20_2600B_PRODUCT_IDS: + return SlfProduct.LD20 + elif product_id in LD25_2600B_PRODUCT_IDS: + return SlfProduct.LD25 + + return None + + +class SlfProductName: + @staticmethod + def from_product(product: SlfProduct, product_id: int): + if product is SlfProduct.SLF3x: + return SLF3x_PRODUCT_NAME.get(product_id, 'SLF3x') + elif product is SlfProduct.LD20: + return LD20_PRODUCT_NAME.get(product_id, 'LD20') + elif product is SlfProduct.LD25: + return LD25_PRODUCT_NAME.get(product_id, 'LD25') + + raise None + + +class SlfLiquiConfig(object): + def __init__(self, config): + """ + :param config: A dict of form mode: name, where mode is one of + `~shdlc_devices.fluebrig.drivers.slf_common.SlfMode` + """ + self._liqui_config = config + self._supported_liquis = list(config.keys()) + + @property + def supported_liqui_modes(self): + return self._supported_liquis + + def liqui_mode_name(self, mode): + return self._liqui_config[mode] + + +SLF_PRODUCT_LIQUI_MAP = { + SlfProduct.SLF3x: SlfLiquiConfig(OrderedDict({ + SlfMode.LIQUI_1: 'Water', + SlfMode.LIQUI_2: 'Isopropyl alcohol', + })), + SlfProduct.LD20: SlfLiquiConfig(OrderedDict({ + SlfMode.LIQUI_1: 'Water', + })), + SlfProduct.LD25: SlfLiquiConfig(OrderedDict({ + SlfMode.LIQUI_1: 'Water', + })), +} + +FLOW_UNIT_PREFIX = { + 3: 'n', + 4: 'u', + 5: 'm', + 6: 'c', + 7: 'd', + 8: '', # 1 + 9: '', # 10 + 10: 'h', + 11: 'k', + 12: 'M', + 13: 'G' +} + +FLOW_UNIT_TIME_BASE = { + 0: '', + 1: 'us', + 2: 'ms', + 3: 's', + 4: 'min', + 5: 'h', + 6: 'day' +} + +FLOW_UNIT_VOLUME = { + 0: 'nl', # norm liter + 1: 'sl', # standard liter + 8: 'l', # liter (typ: liquid flow) + 9: 'g', # gram (typ: liquid flow) +} + + +def parse_flow_unit(flow_unit_raw) -> str: + prefix = FLOW_UNIT_PREFIX.get(flow_unit_raw & 0xF, '') + time_base = FLOW_UNIT_TIME_BASE.get((flow_unit_raw >> 4) & 0xF, '') + volume = FLOW_UNIT_VOLUME.get((flow_unit_raw >> 8) & 0xF, '') + return f'{prefix}{volume}/{time_base}' diff --git a/sensirion_uart_scc1/protocols/__init__.py b/sensirion_uart_scc1/protocols/__init__.py new file mode 100644 index 0000000..40a96af --- /dev/null +++ b/sensirion_uart_scc1/protocols/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/sensirion_uart_scc1/protocols/i2c_transceiver.py b/sensirion_uart_scc1/protocols/i2c_transceiver.py new file mode 100644 index 0000000..5170579 --- /dev/null +++ b/sensirion_uart_scc1/protocols/i2c_transceiver.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +# Copyright 2024, Sensirion AG, Switzerland. All rights reserved. +# +# Software source code and algorithms are Sensirion confidential +# information and trade secrets. Licensee shall protect confidentiality +# of software source code and algorithms. +# +# Licensee shall not distribute software and/or algorithms. +# +# Software and algorithms are licensed for use with +# Sensirion sensors only. +# +# Software and algorithms are provided "AS IS" and any and +# all express or implied warranties are disclaimed. +# +# THIS SOFTWARE IS PROVIDED BY SENSIRION "AS IS" AND ANY EXPRESS OR +# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES +# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL SENSIRION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED +# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, +# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from typing import Protocol, Optional, runtime_checkable + + +@runtime_checkable +class I2cTransceiver(Protocol): + + def transceive(self, slave_address: int, tx_data: Optional[bytes], rx_length: int, read_delay: float, + timeout: float) -> Optional[bytes]: + """ + Send data to the sensor and receive back a response. This function can be used for sending or receiving only + as well. + + :param slave_address: I2c address of the sensor. + :param tx_data: The data to be sent. In case no data shall be sent, this parameter is supposed to be None + :param rx_length: Length of the repsonse of the sensor. If the request does not have a response, this parameter + may be 0. + :param read_delay: Defines the time in seconds that needs to be observed after sending the data before + the read is initiated. + :param timeout: Defines the time after receiving the response from the sensor before the next command may be + sent. + """ diff --git a/sensirion_uart_scc1/protocols/shdlc_transceiver.py b/sensirion_uart_scc1/protocols/shdlc_transceiver.py new file mode 100644 index 0000000..a05c894 --- /dev/null +++ b/sensirion_uart_scc1/protocols/shdlc_transceiver.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# Copyright 2024, Sensirion AG, Switzerland. All rights reserved. +# +# Software source code and algorithms are Sensirion confidential +# information and trade secrets. Licensee shall protect confidentiality +# of software source code and algorithms. +# +# Licensee shall not distribute software and/or algorithms. +# +# Software and algorithms are licensed for use with +# Sensirion sensors only. +# +# Software and algorithms are provided "AS IS" and any and +# all express or implied warranties are disclaimed. +# +# THIS SOFTWARE IS PROVIDED BY SENSIRION "AS IS" AND ANY EXPRESS OR +# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES +# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL SENSIRION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED +# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, +# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from typing import Protocol, Optional, runtime_checkable + + +@runtime_checkable +class ShdlcTransceiver(Protocol): + + def transceive(self, command: int, data: bytes, timeout: float = -1.0) -> Optional[bytes]: + """Wrapper method for legacy shdlc-driver compatibility + :param command: The command to send (one byte) + :param data: byte array of the data to send as arguments to the command + :param timeout: response timeout in seconds (-1 for using default value) + :return: The returned data as bytes + """ diff --git a/sensirion_uart_scc1/scc1_i2c_transceiver.py b/sensirion_uart_scc1/scc1_i2c_transceiver.py new file mode 100644 index 0000000..3147b34 --- /dev/null +++ b/sensirion_uart_scc1/scc1_i2c_transceiver.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# Copyright 2024, Sensirion AG, Switzerland. All rights reserved. +# +# Software source code and algorithms are Sensirion confidential +# information and trade secrets. Licensee shall protect confidentiality +# of software source code and algorithms. +# +# Licensee shall not distribute software and/or algorithms. +# +# Software and algorithms are licensed for use with +# Sensirion sensors only. +# +# Software and algorithms are provided "AS IS" and any and +# all express or implied warranties are disclaimed. +# +# THIS SOFTWARE IS PROVIDED BY SENSIRION "AS IS" AND ANY EXPRESS OR +# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES +# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL SENSIRION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED +# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, +# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from struct import pack +from typing import Optional + +from sensirion_uart_scc1.protocols.shdlc_transceiver import ShdlcTransceiver + + +class Scc1I2cTransceiver: + """ + Wrapper that implements the I2cTransceiver protocol. + This wrappers allows to use the public I2c Python drivers drivers wit the SCC1 cable + """ + + def __init__(self, device: ShdlcTransceiver) -> None: + self._scc1 = device + + def transceive(self, slave_address: int, tx_data: Optional[bytes], rx_length: int, read_delay: float, + timeout: float = 0.01) -> bytes: + """Implements the I2cTransceiver protocol""" + + if tx_data is None: + tx_data = bytearray() + else: + tx_data = bytearray(tx_data) + tx_size = len(tx_data) + cmd_data = bytearray(pack('>BBBH', slave_address, tx_size, rx_length, int(read_delay) * 1000)) + cmd_data.extend(tx_data) + result = self._scc1.transceive(0x2A, cmd_data, timeout) + if not result or rx_length == 0: # no data to return + return bytearray([]) + return result diff --git a/sensirion_uart_scc1/scc1_shdlc_device.py b/sensirion_uart_scc1/scc1_shdlc_device.py new file mode 100644 index 0000000..4e1b78b --- /dev/null +++ b/sensirion_uart_scc1/scc1_shdlc_device.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# Copyright 2024, Sensirion AG, Switzerland. All rights reserved. +# +# Software source code and algorithms are Sensirion confidential +# information and trade secrets. Licensee shall protect confidentiality +# of software source code and algorithms. +# +# Licensee shall not distribute software and/or algorithms. +# +# Software and algorithms are licensed for use with +# Sensirion sensors only. +# +# Software and algorithms are provided "AS IS" and any and +# all express or implied warranties are disclaimed. +# +# THIS SOFTWARE IS PROVIDED BY SENSIRION "AS IS" AND ANY EXPRESS OR +# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES +# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL SENSIRION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED +# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, +# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import logging +from typing import Optional + +from packaging.version import Version +from sensirion_shdlc_driver import ShdlcDevice, ShdlcConnection +from sensirion_shdlc_driver.command import ShdlcCommand + +from sensirion_uart_scc1.protocols.i2c_transceiver import I2cTransceiver +from sensirion_uart_scc1.scc1_i2c_transceiver import Scc1I2cTransceiver + +log = logging.getLogger(__name__) + + +class Scc1ShdlcDevice(ShdlcDevice): + """ + The Scc1 Shdlc device is used to communicate with various sensor using the Sensirion SCC1 sensor cable. + """ + + def __init__(self, connection: ShdlcConnection, slave_address: int) -> None: + super().__init__(connection, slave_address) + self._version = self.get_version() + self._serial_number = self.get_serial_number() + + def __str__(self): + return f"SCC1-{self.serial_number}@{self.com_port}" + + @property + def com_port(self): + return self.connection.port.description.split('@')[0] + + @property + def serial_number(self) -> str: + return self._serial_number + + @property + def firmware_version(self) -> Version: + return Version(str(self._version.firmware)) + + def sensor_reset(self) -> None: + """ + Execute a hard reset on the sensor and check for correct response. Active + continuous/single measurement is stopped and the sensor is left in idle state. + """ + self.transceive(0x66, [], 0.3) + + def transceive(self, command: int, data: bytes, timeout: float = -1.0) -> Optional[bytes]: + """ + Provides a generic way to send shdlc commands. + :param command: The command to send (one byte) + :param data: byte array of the data to send as arguments to the command + :param timeout: response timeout in seconds (-1 for using default value) + :return: The returned data as bytes + """ + if timeout <= 0.0: + timeout = 3.0 + result = self.execute(ShdlcCommand( + id=command, + data=data, + max_response_time=float(timeout) + )) + if not result: + return b'' + return result + + def get_i2c_transceiver(self) -> I2cTransceiver: + """ + An I2cTransceiver object is required in or der to use the cable with public python i2c drivers. + + In general all functionality of the sensors are available in the public python drivers as well. The + throughput of the public python driver will be lower than the throughput that can be achieved with + the sensor specific api of the SCC1 sensor cable. + """ + return Scc1I2cTransceiver(self) diff --git a/tests/conftest.py b/tests/conftest.py index 40a96af..edb11e2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1 +1,24 @@ # -*- coding: utf-8 -*- +import pytest +from serial.tools.list_ports import comports +from sensirion_shdlc_driver.connection import ShdlcConnection +from sensirion_shdlc_driver.port import ShdlcSerialPort +from sensirion_uart_scc1.scc1_shdlc_device import Scc1ShdlcDevice + + +@pytest.fixture +def scc1_device(): + for port_info in comports(): + try: + shdlc_port = ShdlcSerialPort(port=port_info.device, baudrate=115200) + with shdlc_port: + try: + device = Scc1ShdlcDevice(ShdlcConnection(shdlc_port), 0) + device.get_product_name() + yield device + device.sensor_reset() + return + except: # noqa + ... + except: # noqa + ... diff --git a/tests/test_scc1_shdlc_device.py b/tests/test_scc1_shdlc_device.py new file mode 100644 index 0000000..07c54e4 --- /dev/null +++ b/tests/test_scc1_shdlc_device.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- +import pytest +import re + + +@pytest.mark.needs_hardware +def test_scc1_device(scc1_device): + assert scc1_device is not None + assert re.match("SCC1-([^@])*@COM.", str(scc1_device)) diff --git a/tests/test_sf06.py b/tests/test_sf06.py new file mode 100644 index 0000000..0be5315 --- /dev/null +++ b/tests/test_sf06.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +import pytest + +from sensirion_uart_scc1.drivers import scc1_sf06 + + +@pytest.fixture +def sf06(scc1_device): + yield scc1_sf06.Scc1Sf06(scc1_device) + + +@pytest.mark.needs_hardware +def test_scc1_sf06_serial_number_and_product_id(sf06): + assert sf06 is not None + assert isinstance(sf06.serial_number, int) + assert isinstance(sf06.product_id, int) + + +@pytest.mark.needs_hardware +def test_scc1_sf06_start_measurements(sf06): + assert sf06 is not None + sf06.start_continuous_measurement(100) + for i in range(3): + remaining, lost, data = sf06.read_extended_buffer() + assert isinstance(remaining, int) + assert isinstance(lost, int) + assert isinstance(data, list) + for record in data: + assert isinstance(record, tuple) + assert len(record) == 3 + sf06.stop_continuous_measurement()