-
Notifications
You must be signed in to change notification settings - Fork 0
/
Weather.py
87 lines (72 loc) · 2.3 KB
/
Weather.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# AN2001
from machine import Pin
from dht import *
from Blocky.Pin import getPin
from Blocky.Timer import runtime
import Blocky.uasyncio as asyncio
from Blocky.asyn import Cancellable
class Weather:
def __init__ (self , port,module='DHT11'):
pin = getPin(port)
if (pin[0] == None):
return
if module == 'DHT11': self.weather = DHT11(Pin(pin[0]))
elif module == 'DHT22': self.weather = DHT22(Pin(pin[0]))
elif module == 'DHTBase': self.weather = DHTBase(Pin(pin[0]))
else :
raise NameError
self.last_poll = runtime()
self.cb_humidity = []
self.cb_temperature = []
loop = asyncio.get_event_loop()
loop.call_soon(self.handler())
def temperature (self):
if runtime() - self.last_poll > 2000:
self.last_poll = runtime()
try :
self.weather.measure()
except Exception:
pass
return self.weather.temperature()
def humidity(self):
if runtime() - self.last_poll > 2000:
self.last_poll = runtime()
try :
self.weather.measure()
except Exception:
pass
return self.weather.humidity()
def event(self,type,function):
if type == 'temperature':
self.cb_temperature = [function,'g' if str(function).find('generator') >0 else 'f']
if type == 'humidity':
self.cb_humidity = [function,'g' if str(function).find('generator') >0 else 'f']
async def handler(self):
while True :
temp = self.weather.temperature()
humd = self.weather.humidity()
await asyncio.sleep_ms(2500)
try :
self.weather.measure()
except Exception:
pass
if self.weather.temperature() != temp and self.cb_temperature:
try :
if self.cb_temperature[1] == 'f':
self.cb_temperature[0](self.weather.temperature())
if self.cb_temperature[1] == 'g':
loop = asyncio.get_event_loop()
loop.create_task(Cancellable(self.cb_temperature[0])(self.weather.temperature()))
except Exception as err:
print('weather-event-temp->',err)
pass
if self.weather.humidity() != temp and self.cb_humidity:
try :
if self.cb_humidity[1] == 'f':
self.cb_humidity[0](self.weather.humidity())
if self.cb_humidity[1] == 'g':
loop = asyncio.get_event_loop()
loop.create_task(Cancellable(self.cb_temperature[0])(self.weather.humidity()))
except Exception:
print('weather-event-humd->',err)
pass