-
Notifications
You must be signed in to change notification settings - Fork 422
/
streamer.py
512 lines (413 loc) · 15.7 KB
/
streamer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
# -*- coding: utf-8 -*-
import base64
import copy
import functools
import json
import logging
import operator
import os
import random
import struct
import unicodedata
import weakref
import gevent
from jsonpointer import resolve_pointer
from jsonschema import validate
from pyramid.config import aslist
from pyramid.events import ApplicationCreated
from pyramid.events import subscriber
from pyramid.httpexceptions import HTTPBadRequest, HTTPForbidden
from pyramid.threadlocal import get_current_request
from ws4py.exc import HandshakeError
from ws4py.websocket import WebSocket as _WebSocket
from ws4py.server.wsgiutils import WebSocketWSGIApplication
from h import queue
from h._compat import text_type
from h.api import auth
from h.api import storage
log = logging.getLogger(__name__)
def uni_fold(text):
# Convert bytes to text
if isinstance(text, bytes):
text = text_type(text, "utf-8")
# Do not touch other types
if not isinstance(text, text_type):
return text
text = text.lower()
text = unicodedata.normalize('NFKD', text)
return u"".join([c for c in text if not unicodedata.combining(c)])
filter_schema = {
"type": "object",
"properties": {
"name": {"type": "string", "optional": True},
"match_policy": {
"type": "string",
"enum": ["include_any", "include_all",
"exclude_any", "exclude_all"]
},
"actions": {
"create": {"type": "boolean", "default": True},
"update": {"type": "boolean", "default": True},
"delete": {"type": "boolean", "default": True},
},
"clauses": {
"type": "array",
"items": {
"field": {"type": "string", "format": "json-pointer"},
"operator": {
"type": "string",
"enum": ["equals", "matches", "lt", "le", "gt", "ge",
"one_of", "first_of", "match_of",
"lene", "leng", "lenge", "lenl", "lenle"]
},
"value": "object",
"options": {"type": "object", "default": {}}
}
},
},
"required": ["match_policy", "clauses", "actions"]
}
len_operators = {
"lene": "=",
"leng": ">",
"lenge": ">=",
"lenl": "<",
"lenle": "<="
}
def first_of(a, b):
return a[0] == b
setattr(operator, 'first_of', first_of)
def match_of(a, b):
for subb in b:
if subb in a:
return True
return False
setattr(operator, 'match_of', match_of)
def lene(a, b):
return len(a) == b
setattr(operator, 'lene', lene)
def leng(a, b):
return len(a) > b
setattr(operator, 'leng', leng)
def lenge(a, b):
return len(a) >= b
setattr(operator, 'lenge', lenge)
def lenl(a, b):
return len(a) < b
setattr(operator, 'lenl', lenl)
def lenle(a, b):
return len(a) <= b
setattr(operator, 'lenle', lenle)
class FilterHandler(object):
def __init__(self, filter_json):
self.filter = filter_json
# operators
operators = {
'equals': 'eq',
'matches': 'contains',
'lt': 'lt',
'le': 'le',
'gt': 'gt',
'ge': 'ge',
'one_of': 'contains',
'first_of': 'first_of',
'match_of': 'match_of',
'lene': 'lene',
'leng': 'leng',
'lenge': 'lenge',
'lenl': 'lenl',
'lenle': 'lenle',
}
def evaluate_clause(self, clause, target):
if isinstance(clause['field'], list):
for field in clause['field']:
copied = copy.deepcopy(clause)
copied['field'] = field
result = self.evaluate_clause(copied, target)
if result:
return True
return False
else:
field_value = resolve_pointer(target, clause['field'], None)
if field_value is None:
return False
cval = clause['value']
fval = field_value
if isinstance(cval, list):
tval = []
for cv in cval:
tval.append(uni_fold(cv))
cval = tval
else:
cval = uni_fold(cval)
if isinstance(fval, list):
tval = []
for fv in fval:
tval.append(uni_fold(fv))
fval = tval
else:
fval = uni_fold(fval)
reversed_order = False
# Determining operator order
# Normal order: field_value, clause['value']
# i.e. condition created > 2000.01.01
# Here clause['value'] = '2001.01.01'.
# The field_value is target['created']
# So the natural order is: ge(field_value, clause['value']
# But!
# Reversed operator order for contains (b in a)
if isinstance(cval, list) or isinstance(fval, list):
if clause['operator'] in ['one_of', 'matches']:
reversed_order = True
# But not in every case. (i.e. tags matches 'b')
# Here field_value is a list, because an annotation can
# have many tags.
if isinstance(field_value, list):
reversed_order = False
if reversed_order:
lval = cval
rval = fval
else:
lval = fval
rval = cval
op = getattr(operator, self.operators[clause['operator']])
return op(lval, rval)
# match_policies
def include_any(self, target):
for clause in self.filter['clauses']:
if self.evaluate_clause(clause, target):
return True
return False
def include_all(self, target):
for clause in self.filter['clauses']:
if not self.evaluate_clause(clause, target):
return False
return True
def exclude_all(self, target):
for clause in self.filter['clauses']:
if not self.evaluate_clause(clause, target):
return True
return False
def exclude_any(self, target):
for clause in self.filter['clauses']:
if self.evaluate_clause(clause, target):
return False
return True
def match(self, target, action=None):
if not action or action == 'past' or action in self.filter['actions']:
if len(self.filter['clauses']) > 0:
return getattr(self, self.filter['match_policy'])(target)
else:
return True
else:
return False
# NSQ message topics that the WebSocket server
# processes messages from
ANNOTATIONS_TOPIC = 'annotations'
USER_TOPIC = 'user'
class WebSocket(_WebSocket):
# All instances of WebSocket, allowing us to iterate over open websockets
instances = weakref.WeakSet()
origins = []
# Instance attributes
client_id = None
filter = None
request = None
query = None
def __init__(self, *args, **kwargs):
kwargs.setdefault('heartbeat_freq', 30.0)
super(WebSocket, self).__init__(*args, **kwargs)
self.request = get_current_request()
def __new__(cls, *args, **kwargs):
instance = super(WebSocket, cls).__new__(cls, *args, **kwargs)
cls.instances.add(instance)
return instance
def opened(self):
# Release the database transaction
self.request.tm.commit()
def _expand_clauses(self, payload):
for clause in payload['clauses']:
if clause['field'] == '/uri':
self._expand_uris(clause)
def _expand_uris(self, clause):
uris = clause['value']
expanded = set()
if not isinstance(uris, list):
uris = [uris]
# FIXME: this is a temporary hack to allow us to disable URI
# equivalence support on the streamer while we debug a number of
# issues related to connection pool exhaustion for the websocket
# server. -NS 2016-02-19
if self.request.feature('ops_disable_streamer_uri_equivalence'):
expanded.update(uris)
else:
for item in uris:
expanded.update(storage.expand_uri(item))
clause['value'] = list(expanded)
def received_message(self, msg):
with self.request.tm:
self._process_message(msg)
def _process_message(self, msg):
try:
data = json.loads(msg.data)
msg_type = data.get('messageType', 'filter')
if msg_type == 'filter':
payload = data['filter']
# Let's try to validate the schema
validate(payload, filter_schema)
# Add backend expands for clauses
self._expand_clauses(payload)
self.filter = FilterHandler(payload)
elif msg_type == 'client_id':
self.client_id = data.get('value')
except:
# TODO: clean this up, catch specific errors, narrow the scope
log.exception("Parsing filter: %s", msg)
self.close()
raise
def handle_annotation_event(message, socket):
"""
Get message about annotation event `message` to be sent to `socket`.
Inspects the embedded annotation event and decides whether or not the
passed socket should receive notification of the event.
Returns None if the socket should not receive any message about this
annotation event, otherwise a dict containing information about the event.
"""
action = message['action']
annotation = storage.annotation_from_dict(message['annotation'])
if action == 'read':
return None
if message['src_client_id'] == socket.client_id:
return None
if annotation.get('nipsa') and (
socket.request.authenticated_userid != annotation.get('user', '')):
return None
if not _authorized_to_read(socket.request, annotation):
return None
# We don't send anything until we have received a filter from the client
if socket.filter is None:
return None
if not socket.filter.match(annotation, action):
return None
return {
'payload': [annotation],
'type': 'annotation-notification',
'options': {'action': action},
}
def handle_user_event(message, socket):
"""
Get message about user event `message` to be sent to `socket`.
Inspects the embedded user event and decides whether or not the passed
socket should receive notification of the event.
Returns None if the socket should not receive any message about this user
event, otherwise a dict containing information about the event.
"""
if socket.request.authenticated_userid != message['userid']:
return None
# for session state change events, the full session model
# is included so that clients can update themselves without
# further API requests
return {
'type': 'session-change',
'action': message['type'],
'model': message['session_model']
}
def _authorized_to_read(request, annotation):
"""Return True if the passed request is authorized to read the annotation.
If the annotation belongs to a private group, this will return False if the
authenticated user isn't a member of that group.
"""
# TODO: remove this when we've diagnosed this issue
if ('permissions' not in annotation or
'read' not in annotation['permissions']):
request.sentry.captureMessage(
'streamer received annotation lacking valid permissions',
level='warn',
extra={
'id': annotation['id'],
'permissions': json.dumps(annotation.get('permissions')),
})
read_permissions = annotation.get('permissions', {}).get('read', [])
read_principals = auth.translate_annotation_principals(read_permissions)
if set(read_principals).intersection(request.effective_principals):
return True
return False
def process_message(handler, reader, message):
"""
Deserialize and process a message from the reader.
For each message, `handler` is called with the deserialized message and a
single :py:class:`h.streamer.WebSocket` instance, and should return the
message to be sent to the client on that socket. The handler can return
`None`, to signify that no message should be sent, or a JSON-serializable
object. It is assumed that there is a 1:1 request-reply mapping between
incoming messages and messages to be sent out over the websockets.
Any exceptions thrown by this function or by `handler` will be caught by
:py:class:`gnsq.Reader` and the message will be requeued as a result.
"""
data = json.loads(message.body)
# N.B. We iterate over a non-weak list of instances because there's nothing
# to stop connections being added or dropped during iteration, and if that
# happens Python will throw a "Set changed size during iteration" error.
sockets = list(WebSocket.instances)
for socket in sockets:
reply = handler(data, socket)
if reply is None:
continue
if not socket.terminated:
socket.send(json.dumps(reply))
def process_queue(settings, topic, handler):
"""
Configure, start, and monitor a queue reader for the specified topic.
This sets up a :py:class:`gnsq.Reader` to route messages from `topic` to
`handler`, and starts it. The reader should never return. If it does, this
fact is logged and the function returns.
"""
channel = 'stream-{}#ephemeral'.format(_random_id())
receiver = functools.partial(process_message, handler)
reader = queue.get_reader(settings, topic, channel)
reader.on_message.connect(receiver=receiver, weak=False)
reader.start(block=True)
# We should never get here. If we do, it's because a reader thread has
# prematurely quit.
log.error("queue reader for topic '%s' exited: killing reader", topic)
reader.close()
def _random_id():
"""Generate a short random string"""
data = struct.pack('Q', random.getrandbits(64))
return base64.urlsafe_b64encode(data).strip(b'=')
@subscriber(ApplicationCreated)
def start_queue_processing(event):
"""
Start some greenlets to process the incoming data from NSQ.
This subscriber is called when the application is booted, and kicks off
greenlets running `process_queue` for each NSQ topic we subscribe to. The
function does not block.
"""
# Skip this if we're in a script, not actual app startup. See the comment
# in h.script:main for an explanation.
if 'H_SCRIPT' in os.environ:
return
def _loop(settings, topic, handler):
while True:
process_queue(settings, topic, handler)
settings = event.app.registry.settings
gevent.spawn(_loop, settings, ANNOTATIONS_TOPIC, handle_annotation_event)
gevent.spawn(_loop, settings, USER_TOPIC, handle_user_event)
def websocket(request):
# WebSockets can be opened across origins and send cookies. To prevent
# scripts on other sites from using this socket, ensure that the Origin
# header (if present) matches the request host URL or is whitelisted.
origin = request.headers.get('Origin')
allowed = aslist(request.registry.settings.get('origins', ''))
if origin is not None:
if origin != request.host_url and origin not in allowed:
return HTTPForbidden()
app = WebSocketWSGIApplication(handler_cls=WebSocket)
return request.get_response(app)
def bad_handshake(exc, request):
return HTTPBadRequest()
def includeme(config):
config.add_route('ws', 'ws')
config.add_view(websocket, route_name='ws')
config.add_view(bad_handshake, context=HandshakeError)
config.scan(__name__)