-
Notifications
You must be signed in to change notification settings - Fork 74
/
adafruit_esp32spi_wifimanager.py
executable file
·352 lines (318 loc) · 12.9 KB
/
adafruit_esp32spi_wifimanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# SPDX-FileCopyrightText: Copyright (c) 2019 Melissa LeBlanc-Williams for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`adafruit_esp32spi_wifimanager`
================================================================================
WiFi Manager for making ESP32 SPI as WiFi much easier
* Author(s): Melissa LeBlanc-Williams, ladyada
"""
# pylint: disable=no-name-in-module
from time import sleep
from micropython import const
import adafruit_connection_manager
import adafruit_requests
from adafruit_esp32spi import adafruit_esp32spi
# pylint: disable=too-many-instance-attributes
class ESPSPI_WiFiManager:
"""
A class to help manage the Wifi connection
"""
NORMAL = const(1)
ENTERPRISE = const(2)
# pylint: disable=too-many-arguments
def __init__(
self,
esp,
secrets,
status_pixel=None,
attempts=2,
connection_type=NORMAL,
debug=False,
):
"""
:param ESP_SPIcontrol esp: The ESP object we are using
:param dict secrets: The WiFi and Adafruit IO secrets dict (See examples)
The use of secrets.py to populate the secrets dict is depreciated
in favor of using settings.toml.
:param status_pixel: (Optional) The pixel device - A NeoPixel, DotStar,
or RGB LED (default=None). The status LED, if given, turns red when
attempting to connect to a Wi-Fi network or create an access point,
turning green upon success. Additionally, if given, it will turn blue
when attempting an HTTP method or returning IP address, turning off
upon success.
:type status_pixel: NeoPixel, DotStar, or RGB LED
:param int attempts: (Optional) Failed attempts before resetting the ESP32 (default=2)
:param const connection_type: (Optional) Type of WiFi connection: NORMAL or ENTERPRISE
"""
# Read the settings
self.esp = esp
self.debug = debug
self.ssid = secrets["ssid"]
self.password = secrets.get("password", None)
self.attempts = attempts
self._connection_type = connection_type
self.statuspix = status_pixel
self.pixel_status(0)
self._ap_index = 0
# create requests session
pool = adafruit_connection_manager.get_radio_socketpool(self.esp)
ssl_context = adafruit_connection_manager.get_radio_ssl_context(self.esp)
self._requests = adafruit_requests.Session(pool, ssl_context)
# Check for WPA2 Enterprise keys in the secrets dictionary and load them if they exist
self.ent_ssid = secrets.get("ent_ssid", secrets["ssid"])
self.ent_ident = secrets.get("ent_ident", "")
self.ent_user = secrets.get("ent_user")
self.ent_password = secrets.get("ent_password")
# pylint: enable=too-many-arguments
def reset(self):
"""
Perform a hard reset on the ESP32
"""
if self.debug:
print("Resetting ESP32")
self.esp.reset()
def connect(self):
"""
Attempt to connect to WiFi using the current settings
"""
if self.debug:
if self.esp.status == adafruit_esp32spi.WL_IDLE_STATUS:
print("ESP32 found and in idle mode")
print("Firmware vers.", self.esp.firmware_version)
print("MAC addr:", [hex(i) for i in self.esp.MAC_address])
for access_pt in self.esp.scan_networks():
print(
"\t%s\t\tRSSI: %d"
% (str(access_pt["ssid"], "utf-8"), access_pt["rssi"])
)
if self._connection_type == ESPSPI_WiFiManager.NORMAL:
self.connect_normal()
elif self._connection_type == ESPSPI_WiFiManager.ENTERPRISE:
self.connect_enterprise()
else:
raise TypeError("Invalid WiFi connection type specified")
def _get_next_ap(self):
if isinstance(self.ssid, (tuple, list)) and isinstance(
self.password, (tuple, list)
):
if not self.ssid or not self.password:
raise ValueError("SSID and Password should contain at least 1 value")
if len(self.ssid) != len(self.password):
raise ValueError("The length of SSIDs and Passwords should match")
access_point = (self.ssid[self._ap_index], self.password[self._ap_index])
self._ap_index += 1
if self._ap_index >= len(self.ssid):
self._ap_index = 0
return access_point
if isinstance(self.ssid, (tuple, list)) or isinstance(
self.password, (tuple, list)
):
raise NotImplementedError(
"If using multiple passwords, both SSID and Password should be lists or tuples"
)
return (self.ssid, self.password)
def connect_normal(self):
"""
Attempt a regular style WiFi connection.
"""
failure_count = 0
(ssid, password) = self._get_next_ap()
while not self.esp.is_connected:
try:
if self.debug:
print("Connecting to AP...")
self.pixel_status((100, 0, 0))
self.esp.connect_AP(bytes(ssid, "utf-8"), bytes(password, "utf-8"))
failure_count = 0
self.pixel_status((0, 100, 0))
except OSError as error:
print("Failed to connect, retrying\n", error)
failure_count += 1
if failure_count >= self.attempts:
failure_count = 0
(ssid, password) = self._get_next_ap()
self.reset()
continue
def create_ap(self):
"""
Attempt to initialize in Access Point (AP) mode.
Uses SSID and optional passphrase from the current settings
Other WiFi devices will be able to connect to the created Access Point
"""
failure_count = 0
while not self.esp.ap_listening:
try:
if self.debug:
print("Waiting for AP to be initialized...")
self.pixel_status((100, 0, 0))
if self.password:
self.esp.create_AP(
bytes(self.ssid, "utf-8"), bytes(self.password, "utf-8")
)
else:
self.esp.create_AP(bytes(self.ssid, "utf-8"), None)
failure_count = 0
self.pixel_status((0, 100, 0))
except OSError as error:
print("Failed to create access point\n", error)
failure_count += 1
if failure_count >= self.attempts:
failure_count = 0
self.reset()
continue
print("Access Point created! Connect to ssid:\n {}".format(self.ssid))
def connect_enterprise(self):
"""
Attempt an enterprise style WiFi connection
"""
failure_count = 0
self.esp.wifi_set_network(bytes(self.ent_ssid, "utf-8"))
self.esp.wifi_set_entidentity(bytes(self.ent_ident, "utf-8"))
self.esp.wifi_set_entusername(bytes(self.ent_user, "utf-8"))
self.esp.wifi_set_entpassword(bytes(self.ent_password, "utf-8"))
self.esp.wifi_set_entenable()
while not self.esp.is_connected:
try:
if self.debug:
print(
"Waiting for the ESP32 to connect to the WPA2 Enterprise AP..."
)
self.pixel_status((100, 0, 0))
sleep(1)
failure_count = 0
self.pixel_status((0, 100, 0))
sleep(1)
except OSError as error:
print("Failed to connect, retrying\n", error)
failure_count += 1
if failure_count >= self.attempts:
failure_count = 0
self.reset()
continue
def get(self, url, **kw):
"""
Pass the Get request to requests and update status LED
:param str url: The URL to retrieve data from
:param dict data: (Optional) Form data to submit
:param dict json: (Optional) JSON data to submit. (Data must be None)
:param dict header: (Optional) Header data to include
:param bool stream: (Optional) Whether to stream the Response
:return: The response from the request
:rtype: Response
"""
if not self.esp.is_connected:
self.connect()
self.pixel_status((0, 0, 100))
return_val = self._requests.get(url, **kw)
self.pixel_status(0)
return return_val
def post(self, url, **kw):
"""
Pass the Post request to requests and update status LED
:param str url: The URL to post data to
:param dict data: (Optional) Form data to submit
:param dict json: (Optional) JSON data to submit. (Data must be None)
:param dict header: (Optional) Header data to include
:param bool stream: (Optional) Whether to stream the Response
:return: The response from the request
:rtype: Response
"""
if not self.esp.is_connected:
self.connect()
self.pixel_status((0, 0, 100))
return_val = self._requests.post(url, **kw)
self.pixel_status(0)
return return_val
def put(self, url, **kw):
"""
Pass the put request to requests and update status LED
:param str url: The URL to PUT data to
:param dict data: (Optional) Form data to submit
:param dict json: (Optional) JSON data to submit. (Data must be None)
:param dict header: (Optional) Header data to include
:param bool stream: (Optional) Whether to stream the Response
:return: The response from the request
:rtype: Response
"""
if not self.esp.is_connected:
self.connect()
self.pixel_status((0, 0, 100))
return_val = self._requests.put(url, **kw)
self.pixel_status(0)
return return_val
def patch(self, url, **kw):
"""
Pass the patch request to requests and update status LED
:param str url: The URL to PUT data to
:param dict data: (Optional) Form data to submit
:param dict json: (Optional) JSON data to submit. (Data must be None)
:param dict header: (Optional) Header data to include
:param bool stream: (Optional) Whether to stream the Response
:return: The response from the request
:rtype: Response
"""
if not self.esp.is_connected:
self.connect()
self.pixel_status((0, 0, 100))
return_val = self._requests.patch(url, **kw)
self.pixel_status(0)
return return_val
def delete(self, url, **kw):
"""
Pass the delete request to requests and update status LED
:param str url: The URL to PUT data to
:param dict data: (Optional) Form data to submit
:param dict json: (Optional) JSON data to submit. (Data must be None)
:param dict header: (Optional) Header data to include
:param bool stream: (Optional) Whether to stream the Response
:return: The response from the request
:rtype: Response
"""
if not self.esp.is_connected:
self.connect()
self.pixel_status((0, 0, 100))
return_val = self._requests.delete(url, **kw)
self.pixel_status(0)
return return_val
def ping(self, host, ttl=250):
"""
Pass the Ping request to the ESP32, update status LED, return response time
:param str host: The hostname or IP address to ping
:param int ttl: (Optional) The Time To Live in milliseconds for the packet (default=250)
:return: The response time in milliseconds
:rtype: int
"""
if not self.esp.is_connected:
self.connect()
self.pixel_status((0, 0, 100))
response_time = self.esp.ping(host, ttl=ttl)
self.pixel_status(0)
return response_time
def ip_address(self):
"""
Returns a formatted local IP address, update status pixel.
"""
if not self.esp.is_connected:
self.connect()
self.pixel_status((0, 0, 100))
self.pixel_status(0)
return self.esp.pretty_ip(self.esp.ip_address)
def pixel_status(self, value):
"""
Change Status Pixel if it was defined
:param value: The value to set the Board's status LED to
:type value: int or 3-value tuple
"""
if self.statuspix:
if hasattr(self.statuspix, "color"):
self.statuspix.color = value
else:
self.statuspix.fill(value)
def signal_strength(self):
"""
Returns receiving signal strength indicator in dBm
"""
if not self.esp.is_connected:
self.connect()
return self.esp.rssi