-
Notifications
You must be signed in to change notification settings - Fork 1
/
pophttp.py
179 lines (151 loc) · 7.49 KB
/
pophttp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#!/usr/bin/env python
import socket
from time import time
from collections import defaultdict
import sys
import logging
try:
import lifx
from config import Config, ConfigError, format_template
except ImportError:
from . import lifx
from .config import Config, ConfigError, format_template
import yaml
from urllib.request import urlopen, HTTPError, Request
from urllib.error import URLError
from http.client import BadStatusLine
from argparse import ArgumentParser
log = logging.Logger('')
class MessageHandler:
class PopBridgeMessageState:
def __init__(self):
self.power_msg = None
self.color_msg = None
self.last_triggered = None # The Pop sends multiple of the same message over several seconds, this removes the duplicates
def reset(self):
self.power_msg = None
self.color_msg = None
self.last_triggered = None
def __init__(self, config):
self.config = config
self.bridge_states = defaultdict(self.PopBridgeMessageState)
self.last_trigger = None
def handle_msg(self, sender_ip, packet):
bridge_state = self.bridge_states[sender_ip]
if bridge_state.last_triggered is not None and time() - bridge_state.last_triggered >= 5:
bridge_state.reset()
if packet.code == lifx.Message.Light_Get.code:
bridge_state.reset()
elif packet.code == lifx.Message.Light_SetPower.code:
if bridge_state.power_msg is not None and packet != bridge_state.power_msg:
bridge_state.reset()
bridge_state.power_msg = packet
elif packet.code == lifx.Message.Light_SetColor.code:
if bridge_state.color_msg is not None and packet != bridge_state.color_msg:
bridge_state.reset()
bridge_state.color_msg = packet
if bridge_state.power_msg is None or bridge_state.color_msg is None:
#The Pop bridge sends a pair of messages: power & color. We need both to know the full state of what it is sending
return
if bridge_state.last_triggered is None:
bridge_state.last_triggered = time()
elif time() - bridge_state.last_triggered < 15:
return
this_trigger = (bridge_state.power_msg, bridge_state.color_msg)
if self.last_trigger is not None and self.last_trigger[1] == this_trigger and self.last_trigger[0] != sender_ip:
#There are multiple Pop bridges and another bridge has already triggered this message
return
self.last_trigger = (sender_ip, this_trigger)
self.trigger_action(sender_ip, bridge_state.power_msg, bridge_state.color_msg)
def trigger_action(self, sender_ip, power_msg, color_msg):
targets = self.config.get_target_for_switch(
power = power_msg.level == lifx.DevicePower.ON,
hue = color_msg.hue,
saturation = color_msg.saturation,
brightness = color_msg.brightness,
kelvin = color_msg.kelvin
)
if not targets:
log.warning('request %dh,%ds,%db,%dk,%s not mapped to a URL',
color_msg.hue,
color_msg.saturation,
color_msg.brightness,
color_msg.kelvin,
'on' if power_msg.level == lifx.DevicePower.ON else 'off',
extra=dict(sender_ip=sender_ip)
)
for target in targets:
endpoint = self.config.get_endpoint_for_url(target.url)
method = target.method
headers = target.headers
body = None
if endpoint is not None:
method = endpoint.method or method
headers = dict(endpoint.headers)
headers.update(target.headers)
body = format_template(
target.body,
power = power_msg.level == lifx.DevicePower.ON,
hue = color_msg.hue,
saturation = color_msg.saturation,
brightness = color_msg.brightness,
kelvin = color_msg.kelvin
)
try:
start = time()
req = Request(target.url, data=body.encode('utf-8'), headers=headers, method=method)
with urlopen(req) as resp:
log.info('resp %d in %dms %s' % (resp.code, (time()-start)*1000, target.url), extra=dict(sender_ip=sender_ip))
except HTTPError as err:
log.error('resp %d in %dms %s' % (err.code, (time()-start)*1000, target.url), extra=dict(sender_ip=sender_ip))
except (BadStatusLine, URLError) as err: #BadStatusLine also includes RemoteDisconnected
log.error('%s in %dms %s' % (err, (time()-start)*1000, target.url), extra=dict(sender_ip=sender_ip))
def server_loop(address, handler):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((address, 56700))
print('Server started on on %s' % address)
while True:
data, address = sock.recvfrom(4096)
if not config.is_ip_allowed(address[0]):
log.debug('recv filtering packet %r', data, extra=dict(sender_ip=address[0], sender_port=address[1]))
continue
try:
packet = lifx.Message.decode(data)
except Exception as exc:
log.debug('recv unable to decode packet %r: %r', data, exc, extra=dict(sender_ip=address[0], sender_port=address[1]))
continue
if packet is None:
log.debug('recv Unknown packet type %r', data, extra=dict(sender_ip=address[0], sender_port=address[1]))
continue
log.debug('recv %r (%r)', packet, packet.header, extra=dict(sender_ip=address[0], sender_port=address[1]))
resp = None
if packet.code == lifx.Message.Device_GetVersion.code:
resp = lifx.Message.Device_StateVersion(vendor=1, product=36, version=0)
elif packet.code == lifx.Message.Light_Get.code:
resp = lifx.Message.Light_State(hue=0, saturation=655, brightness=65535, kelvin=2500, dim=0, power=65535, label='Pop HTTP', tags=0)
elif packet.code in (lifx.Message.Light_SetPower.code, lifx.Message.Light_SetColor.code):
resp = lifx.Message.Device_Acknowledgment()
if resp is not None:
log.debug('send %s', str(resp), extra=dict(sender_ip=address[0], sender_port=address[1]))
sock.sendto(resp.encode(packet.header.target, packet.header.site), address)
handler.handle_msg(address[0], packet)
if __name__ == '__main__':
parser = ArgumentParser(description='Make a fake LIFX light to allow the Logitech pop to send web requests')
parser.add_argument('-v', dest='verbosity', action='count', default=0, help='increase verbosity level')
parser.add_argument('--config', dest='config', metavar='FILE', default='config.yml', help='path to the configuration YAML file to use')
args = parser.parse_args()
ch = logging.StreamHandler()
ch.setLevel([logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG][min(args.verbosity, 3)])
formatter = logging.Formatter('%(asctime)-15s %(sender_ip)s %(message)s')
ch.setFormatter(formatter)
log.addHandler(ch)
try:
config = Config(args.config)
except ConfigError as err:
print(str(err))
sys.exit(-2)
except yaml.parser.ParserError as err:
print(str(err))
sys.exit(-1)
server_loop(config.interface, MessageHandler(config))