Skip to content

Commit

Permalink
Add basic features
Browse files Browse the repository at this point in the history
A first set of features for the scc1 is added. This incldues:
- A basic driver for the SCC1 sensor cable that is based on the
  sensirion_shdc_driver package
- A i2c transceiver that can be used to communicate with public python
  drivers
- A driver based on the sf06 api of the SCC1 sensor cable.
- Basic tests to work with attached hardware
  • Loading branch information
Rol-la committed Feb 13, 2024
1 parent b242ba5 commit 65493d5
Show file tree
Hide file tree
Showing 11 changed files with 578 additions and 0 deletions.
1 change: 1 addition & 0 deletions sensirion_uart_scc1/drivers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# -*- coding: utf-8 -*-
175 changes: 175 additions & 0 deletions sensirion_uart_scc1/drivers/scc1_sf06.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# -*- coding: utf-8 -*-

import math
import struct
import time
from typing import List, Tuple, Optional, Any, cast

from sensirion_uart_scc1.drivers.slf_common import SlfMeasurementCommand, SlfMode, SLF_PRODUCT_LIQUI_MAP, SlfProduct
from sensirion_uart_scc1.scc1_shdlc_device import Scc1ShdlcDevice


class Scc1Sf06:
"""
Scc1 SF06 Sensor Driver
The Scc1 provides features to support the sf06 liquid flow sensors. This driver accesses the sensor through the
API specified by scc1.
"""
SENSOR_TYPE = 3 # SF06
START_MEASUREMENT_DELAY_S = 0.015

def __init__(self, device: Scc1ShdlcDevice, liquid_mode: SlfMode = SlfMode.LIQUI_1) -> None:
self._scc1 = device
self._liquid_config = SLF_PRODUCT_LIQUI_MAP[SlfProduct.SLF3x]
self._serial_number, self._product_id = self._get_serial_number_and_product_id()
self._is_measuring = False
self._sampling_interval_ms = 100 # Default 10Hz
self._liquid_mode = liquid_mode
self._measurement_command = SlfMeasurementCommand.from_mode(self._liquid_mode)

@property
def serial_number(self) -> int:
"""
:return: The serial number as integer
"""
return self._serial_number

@property
def product_id(self) -> int:
"""
:return: The product identifier as integer
"""
return self._product_id

@property
def liquid_mode(self) -> SlfMode:
"""
Liquid measurement mode
"""
return self._liquid_mode

@liquid_mode.setter
def liquid_mode(self, mode: SlfMode) -> None:
"""
Set liquid measurement mode
:param mode: One of the liquid measurement modes
"""
if not isinstance(mode, SlfMode):
raise ValueError("Invalid liquid mode: {}".format(mode))
assert not self._is_measuring, "Set liquid mode not allowed while measurement is running"
self._liquid_mode = mode
self._measurement_command = SlfMeasurementCommand.from_mode(self._liquid_mode)

@property
def liquid_mode_name(self) -> str:
"""
:return: Name of current liquid measurement mode
"""
return self.get_liquid_mode_name(self._liquid_mode)

def get_liquid_mode_name(self, mode: SlfMode) -> str:
"""
:param mode: A liquid mode
:return: Get name of a specific liquid measurement mode
"""
return self._liquid_config.liqui_mode_name(mode)

@property
def sampling_interval_ms(self) -> int:
"""
Sampling interval for synchroneous measurement
:return: Current internal sampling interval
"""
return self._sampling_interval_ms

@sampling_interval_ms.setter
def sampling_interval_ms(self, interval_ms: int):
"""
Set sampling interval for continuous measurement
This will not be applied while measurement is running
:param interval_ms: The requested measurement interval in milli-seconds
"""
self._sampling_interval_ms = interval_ms

def get_serial_number(self) -> int:
"""
:return: The sensor serial number
"""
return self._serial_number

def get_flow_unit_and_scale(self, command: int) -> Optional[Tuple[int, int]]:
"""
Get the scale factor, unit and sensor sanity check result of the sensor for the given argument.
(only available on some SF06 sensor products)
:param command: The 16-bit command to read flow unit and scale factor for
:return: A tuple with (scale_factor, flow_unit), None if command is not supported
"""
args = list(struct.pack('>h', command))
data = self._scc1.transceive(0x53, args, 0.01)
if len(data) != 6:
return None
scale, unit, _ = struct.unpack('>HHH', data)
return scale, unit

def get_last_measurement(self) -> Optional[Tuple[int, int, int]]:
"""
Read current measurement and starts internal continuous measurement with configured interval, if not
already started.
"""
data = self._scc1.transceive(0x35, [0x03], 0.01)
if not data:
# Measurement not ready
return None
return cast(Tuple[int, int, int], struct.unpack('>hhH', data))

def start_continuous_measurement(self, interval_ms=0) -> None:
"""
Start a continuous measurement with a give interval.
:param interval_ms: measurement interval in milliseconds
"""
if self._is_measuring:
return
data = bytearray()
data.extend(bytearray(struct.pack('>H', int(interval_ms))))
data.extend(bytearray(struct.pack('>H', int(self._measurement_command))))
self._scc1.transceive(0x33, data, 0.01)
time.sleep(self.START_MEASUREMENT_DELAY_S)
self._is_measuring = True

def stop_continuous_measurement(self) -> None:
"""Stop continuous measurement"""
if not self._is_measuring:
return
self._scc1.transceive(0x34, [], 0.01)
self._is_measuring = False

def read_extended_buffer(self) -> Tuple[int, int, List[Tuple[Any, ...]]]:
"""
Read out measurement buffer
:return: A tuple with (bytes_remaining, bytes_los, data)
"""
data = self._scc1.transceive(0x36, [0x03], 0.01)
bytes_lost = int(struct.unpack('>I', data[:4])[0])
bytes_remaining = int(struct.unpack('>H', data[4:6])[0])
num_signals = struct.unpack('>H', data[6:8])[0]
num_packets = int(len(data[8:]) / 2 / num_signals)
buffer = list(struct.unpack(">" + "hhh" * num_packets, data[8:]))
m = math.modf(len(buffer) / num_signals)
if m[0]:
raise IOError("Received unexpected amount of data")
num_samples = int(m[1])
# Output buffer a list of tuples with (flow, temp, flags).
out = [tuple(buffer[i * num_signals:i * num_signals + num_signals])
for i in range(num_samples)]
return bytes_remaining, bytes_lost, out

def _get_serial_number_and_product_id(self) -> Tuple[int, int]:
"""
:return: The sensor serial number and product id as tuple
"""
data = self._scc1.transceive(0x50, [], 0.01)
data = data.rstrip(b'\x00').decode('utf-8')
product_id = int(data[:8], 16)
serial_number = int(data[8:], 16)
return serial_number, product_id
145 changes: 145 additions & 0 deletions sensirion_uart_scc1/drivers/slf_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# -*- coding: utf-8 -*-

"""
This file contains channels, gas modes and product specifications used by multiple SLF products
"""
from collections import OrderedDict
from enum import Enum


class SlfMode(Enum):
LIQUI_0 = 'Liquid 0'
LIQUI_1 = 'Liquid 1'
LIQUI_2 = 'Liquid 2'
LIQUI_3 = 'Liquid 3'
LIQUI_4 = 'Liquid 4'
LIQUI_5 = 'Liquid 5'
LIQUI_6 = 'Liquid 6'
LIQUI_7 = 'Liquid 7'
LIQUI_8 = 'Liquid 8'


class SlfMeasurementCommand(object):
MEASUREMENT_COMMANDS = {
SlfMode.LIQUI_0: 0x3603,
SlfMode.LIQUI_1: 0x3608,
SlfMode.LIQUI_2: 0x3615,
SlfMode.LIQUI_3: 0x361E,
SlfMode.LIQUI_4: 0x3624,
SlfMode.LIQUI_5: 0x362F,
SlfMode.LIQUI_6: 0x3632,
SlfMode.LIQUI_7: 0x3639,
SlfMode.LIQUI_8: 0x3646,
}

@staticmethod
def from_mode(mode):
return SlfMeasurementCommand.MEASUREMENT_COMMANDS[mode]


# See https://webstamm.sensirion.com/
# product identifier is specified in datasheet
SLF3x_PRODUCT_NAME = {
0x070302: 'SLF3S_1300',
0x070303: 'SLF3S_600',
0x070304: 'SLF3C_1300F',
0x070305: 'SLF3S_4000B',
}

LD20_PRODUCT_NAME = {
0x070102: 'LD20_2600B',
0x070103: 'LD20-0600L'
}

SLF3x_PRODUCT_IDS = list(SLF3x_PRODUCT_NAME.keys())

LD20_2600B_PRODUCT_IDS = list(LD20_PRODUCT_NAME.keys())


class SlfProduct(Enum):
SLF3x = 'SLF3x'
LD20 = 'LD20-2600B',

@staticmethod
def from_product_id(product_id):
if product_id in SLF3x_PRODUCT_IDS:
return SlfProduct.SLF3x
elif product_id in LD20_2600B_PRODUCT_IDS:
return SlfProduct.LD20
return None


class SlfProductName:
@staticmethod
def from_product(product: SlfProduct, product_id: int):
if product is SlfProduct.SLF3x:
return SLF3x_PRODUCT_NAME.get(product_id, 'SLF3x')
elif product is SlfProduct.LD20:
return LD20_PRODUCT_NAME.get(product_id, 'LD20')
raise None


class SlfLiquiConfig(object):
def __init__(self, config):
"""
:param config: A dictionary of form mode: name, where mode is one of
`~sensirion_uart_scc1.drivers.slf_common.SlfMode`
"""
self._liqui_config = config
self._supported_liquids = list(config.keys())

@property
def supported_liqui_modes(self):
return self._supported_liquids

def liqui_mode_name(self, mode):
return self._liqui_config[mode]


SLF_PRODUCT_LIQUI_MAP = {
SlfProduct.SLF3x: SlfLiquiConfig(OrderedDict({
SlfMode.LIQUI_1: 'Water',
SlfMode.LIQUI_2: 'Isopropyl alcohol',
})),
SlfProduct.LD20: SlfLiquiConfig(OrderedDict({
SlfMode.LIQUI_1: 'Water',
}))
}

FLOW_UNIT_PREFIX = {
3: 'n',
4: 'u',
5: 'm',
6: 'c',
7: 'd',
8: '', # 1
9: '', # 10
10: 'h',
11: 'k',
12: 'M',
13: 'G'
}

FLOW_UNIT_TIME_BASE = {
0: '',
1: 'us',
2: 'ms',
3: 's',
4: 'min',
5: 'h',
6: 'day'
}

FLOW_UNIT_VOLUME = {
0: 'nl', # norm liter
1: 'sl', # standard liter
8: 'l', # liter (typ: liquid flow)
9: 'g', # gram (typ: liquid flow)
}


def parse_flow_unit(flow_unit_raw) -> str:
prefix = FLOW_UNIT_PREFIX.get(flow_unit_raw & 0xF, '')
time_base = FLOW_UNIT_TIME_BASE.get((flow_unit_raw >> 4) & 0xF, '')
volume = FLOW_UNIT_VOLUME.get((flow_unit_raw >> 8) & 0xF, '')
return f'{prefix}{volume}/{time_base}'
1 change: 1 addition & 0 deletions sensirion_uart_scc1/protocols/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# -*- coding: utf-8 -*-
47 changes: 47 additions & 0 deletions sensirion_uart_scc1/protocols/i2c_transceiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
# Copyright 2024, Sensirion AG, Switzerland. All rights reserved.
#
# Software source code and algorithms are Sensirion confidential
# information and trade secrets. Licensee shall protect confidentiality
# of software source code and algorithms.
#
# Licensee shall not distribute software and/or algorithms.
#
# Software and algorithms are licensed for use with
# Sensirion sensors only.
#
# Software and algorithms are provided "AS IS" and any and
# all express or implied warranties are disclaimed.
#
# THIS SOFTWARE IS PROVIDED BY SENSIRION "AS IS" AND ANY EXPRESS OR
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL SENSIRION BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from typing import Protocol, Optional, runtime_checkable


@runtime_checkable
class I2cTransceiver(Protocol):

def transceive(self, slave_address: int, tx_data: Optional[bytes], rx_length: int, read_delay: float,
timeout: float) -> Optional[bytes]:
"""
Send data to the sensor and receive back a response. This function can be used for sending or receiving only
as well.
:param slave_address: I2c address of the sensor.
:param tx_data: The data to be sent. In case no data shall be sent, this parameter is supposed to be None
:param rx_length: Length of the repsonse of the sensor. If the request does not have a response, this parameter
may be 0.
:param read_delay: Defines the time in seconds that needs to be observed after sending the data before
the read is initiated.
:param timeout: Defines the time after receiving the response from the sensor before the next command may be
sent.
"""
Loading

0 comments on commit 65493d5

Please sign in to comment.