/
code.py
executable file
·210 lines (179 loc) · 6.86 KB
/
code.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# SPDX-FileCopyrightText: 2020 Brent Rubell for Adafruit Industries
#
# SPDX-License-Identifier: MIT
import time
import board
import busio
from digitalio import DigitalInOut
import neopixel
from adafruit_esp32spi import adafruit_esp32spi, adafruit_esp32spi_wifimanager
from adafruit_io.adafruit_io import IO_HTTP
from simpleio import map_range
from adafruit_pm25.uart import PM25_UART
# Uncomment below for PMSA003I Air Quality Breakout
# from adafruit_pm25.i2c import PM25_I2C
import adafruit_bme280
### Configure Sensor ###
# Return environmental sensor readings in degrees Celsius
USE_CELSIUS = False
# Interval the sensor publishes to Adafruit IO, in minutes
PUBLISH_INTERVAL = 10
### WiFi ###
# Get wifi details and more from a secrets.py file
try:
from secrets import secrets
except ImportError:
print("WiFi secrets are kept in secrets.py, please add them there!")
raise
# AirLift FeatherWing
esp32_cs = DigitalInOut(board.D13)
esp32_reset = DigitalInOut(board.D12)
esp32_ready = DigitalInOut(board.D11)
spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
status_light = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2)
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
# Connect to a PM2.5 sensor over UART
reset_pin = None
uart = busio.UART(board.TX, board.RX, baudrate=9600)
pm25 = PM25_UART(uart, reset_pin)
# Create i2c object
i2c = busio.I2C(board.SCL, board.SDA, frequency=100000)
# Connect to a BME280 over I2C
bme_sensor = adafruit_bme280.Adafruit_BME280_I2C(i2c)
# Uncomment below for PMSA003I Air Quality Breakout
# pm25 = PM25_I2C(i2c, reset_pin)
# Uncomment below for BME680
# import adafruit_bme680
# bme_sensor = adafruit_bme680.Adafruit_BME680_I2C(i2c)
### Sensor Functions ###
def calculate_aqi(pm_sensor_reading):
"""Returns a calculated air quality index (AQI)
and category as a tuple.
NOTE: The AQI returned by this function should ideally be measured
using the 24-hour concentration average. Calculating a AQI without
averaging will result in higher AQI values than expected.
:param float pm_sensor_reading: Particulate matter sensor value.
"""
# Check sensor reading using EPA breakpoint (Clow-Chigh)
if 0.0 <= pm_sensor_reading <= 12.0:
# AQI calculation using EPA breakpoints (Ilow-IHigh)
aqi_val = map_range(int(pm_sensor_reading), 0, 12, 0, 50)
aqi_cat = "Good"
elif 12.1 <= pm_sensor_reading <= 35.4:
aqi_val = map_range(int(pm_sensor_reading), 12, 35, 51, 100)
aqi_cat = "Moderate"
elif 35.5 <= pm_sensor_reading <= 55.4:
aqi_val = map_range(int(pm_sensor_reading), 36, 55, 101, 150)
aqi_cat = "Unhealthy for Sensitive Groups"
elif 55.5 <= pm_sensor_reading <= 150.4:
aqi_val = map_range(int(pm_sensor_reading), 56, 150, 151, 200)
aqi_cat = "Unhealthy"
elif 150.5 <= pm_sensor_reading <= 250.4:
aqi_val = map_range(int(pm_sensor_reading), 151, 250, 201, 300)
aqi_cat = "Very Unhealthy"
elif 250.5 <= pm_sensor_reading <= 350.4:
aqi_val = map_range(int(pm_sensor_reading), 251, 350, 301, 400)
aqi_cat = "Hazardous"
elif 350.5 <= pm_sensor_reading <= 500.4:
aqi_val = map_range(int(pm_sensor_reading), 351, 500, 401, 500)
aqi_cat = "Hazardous"
else:
print("Invalid PM2.5 concentration")
aqi_val = -1
aqi_cat = None
return aqi_val, aqi_cat
def sample_aq_sensor():
"""Samples PM2.5 sensor
over a 2.3 second sample rate.
"""
aq_reading = 0
aq_samples = []
# initial timestamp
time_start = time.monotonic()
# sample pm2.5 sensor over 2.3 sec sample rate
while time.monotonic() - time_start <= 2.3:
try:
aqdata = pm25.read()
aq_samples.append(aqdata["pm25 env"])
except RuntimeError:
print("Unable to read from sensor, retrying...")
continue
# pm sensor output rate of 1s
time.sleep(1)
# average sample reading / # samples
for sample in range(len(aq_samples)):
aq_reading += aq_samples[sample]
aq_reading = aq_reading / len(aq_samples)
aq_samples.clear()
return aq_reading
def read_bme(is_celsius=False):
"""Returns temperature and humidity
from BME280/BME680 environmental sensor, as a tuple.
:param bool is_celsius: Returns temperature in degrees celsius
if True, otherwise fahrenheit.
"""
humid = bme_sensor.humidity
temp = bme_sensor.temperature
if not is_celsius:
temp = temp * 1.8 + 32
return temp, humid
# Create an instance of the Adafruit IO HTTP client
io = IO_HTTP(secrets["aio_user"], secrets["aio_key"], wifi)
# Describes feeds used to hold Adafruit IO data
feed_aqi = io.get_feed("air-quality-sensor.aqi")
feed_aqi_category = io.get_feed("air-quality-sensor.category")
feed_humidity = io.get_feed("air-quality-sensor.humidity")
feed_temperature = io.get_feed("air-quality-sensor.temperature")
# Set up location metadata from secrets.py file
location_metadata = {
"lat": secrets["latitude"],
"lon": secrets["longitude"],
"ele": secrets["elevation"],
}
elapsed_minutes = 0
prv_mins = 0
while True:
try:
print("Fetching time...")
cur_time = io.receive_time()
print("Time fetched OK!")
# Hourly reset
if cur_time.tm_min == 0:
prv_mins = 0
except (ValueError, RuntimeError, ConnectionError, OSError) as e:
print("Failed to fetch time, retrying\n", e)
wifi.reset()
wifi.connect()
continue
if cur_time.tm_min >= prv_mins:
print("%d min elapsed.." % elapsed_minutes)
prv_mins = cur_time.tm_min
elapsed_minutes += 1
if elapsed_minutes >= PUBLISH_INTERVAL:
print("Sampling AQI...")
aqi_reading = sample_aq_sensor()
aqi, aqi_category = calculate_aqi(aqi_reading)
print("AQI: %d" % aqi)
print("Category: %s" % aqi_category)
# temp and humidity
print("Sampling environmental sensor...")
temperature, humidity = read_bme(USE_CELSIUS)
print("Temperature: %0.1f F" % temperature)
print("Humidity: %0.1f %%" % humidity)
# Publish all values to Adafruit IO
print("Publishing to Adafruit IO...")
try:
io.send_data(feed_aqi["key"], str(aqi), location_metadata)
io.send_data(feed_aqi_category["key"], aqi_category)
io.send_data(feed_temperature["key"], str(temperature))
io.send_data(feed_humidity["key"], str(humidity))
print("Published!")
except (ValueError, RuntimeError, ConnectionError, OSError) as e:
print("Failed to send data to IO, retrying\n", e)
wifi.reset()
wifi.connect()
continue
# Reset timer
elapsed_minutes = 0
time.sleep(30)