Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PMS5003 Particulate Sensor #346

Merged
merged 10 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ Hardware support is provided by specific GPIO, Sensor and Stream modules. It's e
- LM75 temperature sensor (`lm75`)
- MCP3008 analog to digital converter (`mcp3008`)
- ADXl345 3-axis accelerometer up to ±16g (`adxl345`)
- PMS5003 particulate sensor (`pms5003`)
- SHT40/SHT41/SHT45 temperature and humidity sensors (`sht4x`)

### Streams

Expand Down
77 changes: 77 additions & 0 deletions mqtt_io/modules/sensor/pms5003.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""
PMS5003 Particulate Matter Sensor
"""

import time
from typing import cast
from ...types import CerberusSchemaType, ConfigType, SensorValueType
from . import GenericSensor

REQUIREMENTS = ("plantower",)
CONFIG_SCHEMA: CerberusSchemaType = {
"serial_port": dict(type="string", required=True, empty=False),
}


class Sensor(GenericSensor):
"""
Implementation of Sensor class for the PMS5003 sensor.
"""

SENSOR_SCHEMA: CerberusSchemaType = {
"type": dict(
type="string",
required=False,
empty=False,
default="pm25_std",
allowed=
["pm10_cf1", "pm25_cf1","pm100_cf1",
"pm10_std","pm25_std","pm100_std",
"gr03um","gr05um","gr10um",
"gr25um","gr50um","gr100um"],
)
}

def setup_module(self) -> None:
# pylint: disable=import-outside-toplevel,import-error
import plantower # type: ignore

self.serial_port = self.config["serial_port"]
self.sensor = plantower.Plantower(port=self.serial_port)
self.sensor.mode_change(plantower.PMS_PASSIVE_MODE)
self.sensor.set_to_wakeup()
time.sleep(30) #give fan time to stabilize readings

def get_value(self, sens_conf: ConfigType) -> SensorValueType:
"""
Get the particulate data from the sensor
"""
#turn sensor off if interval between readings is >= 2 minutes
sleep_sensor = sens_conf["interval"] >= 120
if sleep_sensor:
self.sensor.set_to_wakeup()
time.sleep(30)
sens_type = sens_conf["type"]
result = self.sensor.read()
if sleep_sensor:
self.sensor.set_to_sleep()
return cast(
int,
dict(
pm10_cf1=result.pm10_cf1,
pm25_cf1=result.pm25_cf1,
pm100_cf1=result.pm100_cf1,
pm10_std=result.pm10_std,
pm25_std=result.pm25_std,
pm100_std=result.pm100_std,
gr03um=result.gr03um,
gr05um=result.gr05um,
gr10um=result.gr10um,
gr25um=result.gr25um,
gr50um=result.gr50um,
gr100um=result.gr100um
)[sens_type],
)

def cleanup(self) -> None:
self.sensor.set_to_sleep()
57 changes: 57 additions & 0 deletions mqtt_io/modules/sensor/sht4x.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
SHT4x temperature and humidity sensor
"""

from typing import cast

from ...types import ConfigType, SensorValueType
from . import GenericSensor
from ...exceptions import RuntimeConfigError

REQUIREMENTS = ("adafruit-circuitpython-sht4x",)


class Sensor(GenericSensor):
"""
Implementation of Sensor class for sht4x.
"""

SENSOR_SCHEMA = {
"type": dict(
type="string",
required=False,
empty=False,
default="temperature",
allowed=["temperature", "humidity"],
)
}

def setup_module(self) -> None:
# pylint: disable=import-outside-toplevel,import-error
import adafruit_sht4x # type: ignore
import board # type: ignore
import busio # type: ignore

i2c = busio.I2C(board.SCL, board.SDA)
self.sensor = adafruit_sht4x.SHT4x(i2c)

@property
def _temperature(self) -> SensorValueType:
return cast(SensorValueType, self.sensor.temperature)

@property
def _humidity(self) -> SensorValueType:
return cast(SensorValueType, self.sensor.relative_humidity)

def get_value(self, sens_conf: ConfigType) -> SensorValueType:
"""
Get the temperature value from the sensor
"""
if sens_conf["type"] == "temperature":
return self._temperature
if sens_conf["type"] == "humidity":
return self._humidity
raise RuntimeConfigError(
"sht4x sensor '%s' was not configured to return 'temperature' or 'humidity'"
% sens_conf["name"]
)
Loading