-
Notifications
You must be signed in to change notification settings - Fork 776
/
Copy pathcode.py
100 lines (78 loc) · 2.78 KB
/
code.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# SPDX-FileCopyrightText: 2021 Eva Herrada for Adafruit Industries
# SPDX-License-Identifier: MIT
"""
Controlling a relay with Adafruit IO
Uses:
* Feather board
* https://www.adafruit.com/product/2890
* https://www.adafruit.com/product/4264
* https://www.adafruit.com/product/3191
OR
* https://www.adafruit.com/product/2935
"""
import board
import busio
import adafruit_connection_manager
from adafruit_esp32spi import adafruit_esp32spi
from adafruit_esp32spi import adafruit_esp32spi_wifimanager
import neopixel
import adafruit_minimqtt.adafruit_minimqtt as MQTT
from adafruit_io.adafruit_io import IO_MQTT
from digitalio import DigitalInOut, Direction
### WiFi ###
# Get wifi details and more from a secrets.py file
try:
from secrets import secrets
except ImportError:
print("WiFi secrets are kept in secrets.py, please add them there!")
raise
# If you are using a Relay FeatherWing, make sure this pin corresponds to the pin shorted on the
# bottom of the Relay FeatherWing
RELAY = DigitalInOut(board.D10)
RELAY.direction = Direction.OUTPUT
# If you are using a board with pre-defined ESP32 Pins:
esp32_cs = DigitalInOut(board.D13)
esp32_ready = DigitalInOut(board.D11)
esp32_reset = DigitalInOut(board.D12)
spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
"""Use below for Most Boards"""
status_light = neopixel.NeoPixel(
board.NEOPIXEL, 1, brightness=0.2
) # Uncomment for Most Boards
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
# Define callback functions which will be called when certain events happen.
# pylint: disable=unused-argument
def connected(client):
client.subscribe("lamp")
def subscribe(client, userdata, topic, granted_qos):
# This method is called when the client subscribes to a new feed.
print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
def on_lamp(client, topic, message):
RELAY.value = eval(message) # pylint: disable=eval-used
# Connect to WiFi
print("Connecting to WiFi...")
wifi.connect()
print("Connected!")
pool = adafruit_connection_manager.get_radio_socketpool(esp)
ssl_context = adafruit_connection_manager.get_radio_ssl_context(esp)
# Initialize a new MQTT Client object
mqtt_client = MQTT.MQTT(
broker="io.adafruit.com",
username=secrets["aio_username"],
password=secrets["aio_key"],
socket_pool=pool,
ssl_context=ssl_context,
)
# Initialize an Adafruit IO MQTT Client
io = IO_MQTT(mqtt_client)
io.add_feed_callback("lamp", on_lamp)
# Connect the callback methods defined above to Adafruit IO
io.on_connect = connected
io.on_subscribe = subscribe
# Connect to Adafruit IO
print("Connecting to Adafruit IO...")
io.connect()
io.get("lamp")
while True:
io.loop()