-
Notifications
You must be signed in to change notification settings - Fork 2
/
afynotifier.py
236 lines (234 loc) · 12.6 KB
/
afynotifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
#!/usr/bin/env python2.6
# Copyright (c) 2004-2014 GoPivotal, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# -----
"""This module provides access to Affinity's notification service. An application can request
notifications when specific PINs change, or when specific classes change (i.e. when PINs are
classified or un-classified, as well as when already classified PINs change). An application
can also emit standard queries to retrieve all changes since time x. The current implementation
uses a 'comet' pattern; in the future, a more efficient networking solution will be
implemented."""
from __future__ import with_statement
from affinity import *
import copy
import json
import logging
try:
import multiprocessing # Optional (for more flexible Event object).
import affinityinproc # Optional (for inproc execution of Affinity; see the 'ext' subdirectory).
except:
pass
import os
import socket
import threading
import urllib2
import uuid
# TODO: auth
# TODO: support qnames for class registration
class AfyNotifier(object):
"""Client-side notification manager. Accepts registrations for specific PINs and classes,
and invokes the specified handlers (callbacks) upon receiving Affinity notifications."""
class _ThreadCtx(object):
"""[internal] Thread context for the threads that wait for notifications."""
def __init__(self, pCriterion, pHandler, pHandlerData, pCreateThread, pNotifier):
self.mServerToken = None # The token returned at registration time by the server, to identify a specific notification channel.
self.mCriterion = pCriterion # The criterion used at registration time (more 'human-readable' identifier of the notification).
self.mHandler = pHandler # The handler/callback provided by the client, to be called upon notification.
self.mHandlerData = pHandlerData # Additional data provided by the client, to be returned to him upon notification.
try:
self.mFinished = multiprocessing.Event() # allows Ctrl+C
except:
self.mFinished = threading.Event()
self.mThread = None # Contexts attached to the "group" context don't have their own thread.
if pCreateThread:
self.mThread = threading.Thread(name="AfyNotifier.%s" % pCriterion, target=AfyNotifier._staticEntryPoint, args=(pNotifier, self))
self.mThread.setDaemon(1) # to allow Ctrl+C
def start(self, pServerToken):
self.mServerToken = pServerToken
if self.mThread != None:
self.mThread.start()
finished = property(fget=lambda c: (c.mFinished.__class__.__dict__.has_key("isSet") and c.mFinished.isSet()) or (c.mFinished.__class__.__dict__.has_key("is_set") and c.mFinished.is_set()), fset=lambda c,v: c.mFinished.set(), doc="Turns true when this thread is signaled to end.")
def __init__(self):
self.mDbConnection = None
self.mClientID = "python:%s:%s" % (os.getpid(), uuid.uuid4().hex)
self.mClassNames = {} # Classname-based registrations; the value is a notification handler callback.
self.mPIDs = {} # PID-based registrations; the value is a notification handler callback.
self.mTokens = {} # Mixed registrations, by server token.
self.mLock = threading.Lock() # Synchronization, to allow unregistration from the callbacks.
self.mPendingCtxs = set() # Threads currently waiting for a http response.
self.mGroupCtx = None # Thread used for all notifications grouped by clientid.
self.mAffinityInproc = AffinityConnection.isInproc()
def open(self, pDbConnection):
"Initializer for the notifier."
if not isinstance(pDbConnection, AffinityConnection):
raise Exception("Invalid pDbConnection passed to AfyNotifier.")
self.mDbConnection = pDbConnection
self.mGroupCtx = AfyNotifier._ThreadCtx(self.mClientID, AfyNotifier._groupHandler, self, True, self) # Thread allowing to group together all notifications.
self.mGroupCtx.start(None)
def close(self):
"Terminator for the notifier."
# Terminate the group ctx thread, and wait for completion.
self.mGroupCtx.finished = True
self.mGroupCtx.mThread.join()
# Wait for completion of all pending threads.
lPendingCtxs = None
with self.mLock:
lPendingCtxs = copy.copy(self.mPendingCtxs)
for i in lPendingCtxs:
i.mThread.join()
# Report any dangling registration.
for i in self.mClassNames.items():
if len(i[1]) > 0:
logging.warn("notification registration still active: %s" % i)
for i in self.mPIDs.items():
if len(i[1]) > 0:
logging.warn("notification registration still active: %s" % i)
def registerClass(self, pClassName, pHandler, pHandlerData=None, pGroupNotifs=True):
"Registration of notifications for class-related changes."
def _regci():
if self.mAffinityInproc: return affinityinproc.regnotif(self.mDbConnection._s(None), pClassName, None, self.mClientID)
return self._callServer("http://%s/db/?i=regnotif¬ifparam=%s&type=class&clientid=%s" % (self.mDbConnection.host(), pClassName, self.mClientID))
lThreadCtx = AfyNotifier._ThreadCtx(pClassName, pHandler, pHandlerData, not pGroupNotifs, self)
lResult = json.loads(_regci())
if lResult and len(lResult.keys()) > 0:
with self.mLock:
lToken = lResult.keys()[0]
if not self.mClassNames.has_key(pClassName):
self.mClassNames[pClassName] = []
self.mClassNames[pClassName].append(lThreadCtx)
self.mTokens[lToken] = lThreadCtx
lThreadCtx.start(lToken)
def unregisterClass(self, pClassName, pHandler):
def _unregci(_pServerToken):
if self.mAffinityInproc: affinityinproc.unregnotif(self.mDbConnection._s(None), _pServerToken, self.mClientID); return
self._callServer("http://%s/db/?i=unregnotif¬ifparam=%s" % (self.mDbConnection.host(), _pServerToken))
lCtx = None
with self.mLock:
if not self.mClassNames.has_key(pClassName):
return
for iCtx in xrange(len(self.mClassNames[pClassName])):
lCtx = self.mClassNames[pClassName][iCtx]
if lCtx.mHandler == pHandler:
del self.mClassNames[pClassName][iCtx]
del self.mTokens[lCtx.mServerToken]
break
if lCtx:
_unregci(lCtx.mServerToken)
lCtx.finished = True
def registerPIN(self, pLocalPID, pHandler, pHandlerData=None, pGroupNotifs=True):
"Registration of notifications for PIN-related changes."
def _regpi(_pCriterion):
if self.mAffinityInproc: return affinityinproc.regnotif(self.mDbConnection._s(None), None, _pCriterion, self.mClientID)
return self._callServer("http://%s/db/?i=regnotif¬ifparam=%s&type=pin&clientid=%s" % (self.mDbConnection.host(), _pCriterion, self.mClientID))
lCriterion = AfyNotifier.serializeLocalPID(pLocalPID)
lThreadCtx = AfyNotifier._ThreadCtx(lCriterion, pHandler, pHandlerData, not pGroupNotifs, self)
lResult = json.loads(_regpi(lCriterion))
if lResult and len(lResult.keys()) > 0:
with self.mLock:
lToken = lResult.keys()[0]
if not self.mPIDs.has_key(lCriterion):
self.mPIDs[lCriterion] = []
self.mPIDs[lCriterion].append(lThreadCtx)
self.mTokens[lToken] = lThreadCtx
lThreadCtx.start(lToken)
def unregisterPIN(self, pLocalPID, pHandler):
def _unregpi(_pServerToken):
if self.mAffinityInproc: affinityinproc.unregnotif(self.mDbConnection._s(None), _pServerToken, self.mClientID); return
self._callServer("http://%s/db/?i=unregnotif¬ifparam=%s" % (self.mDbConnection.host(), _pServerToken))
lCtx = None
with self.mLock:
lCriterion = AfyNotifier.serializeLocalPID(pLocalPID)
if not self.mPIDs.has_key(lCriterion):
return
for iCtx in xrange(len(self.mPIDs[lCriterion])):
lCtx = self.mPIDs[lCriterion][iCtx]
if lCtx.mHandler == pHandler:
del self.mPIDs[lCriterion][iCtx]
del self.mTokens[lCtx.mServerToken]
break
if lCtx:
_unregpi(lCtx.mServerToken)
lCtx.finished = True
def _callServer(self, pURL, pTimeout=None):
assert not self.mAffinityInproc
lT1 = time.time()
logging.debug(pURL)
try:
lResult = urllib2.urlopen(urllib2.Request(pURL, headers={"Authorization":"Basic %s" % self.mDbConnection.basicauth()}), timeout=pTimeout).read()
logging.debug("received response (%ss)...\n url: %s\n response: %s\n" % (time.time() - lT1, pURL, lResult))
return lResult
except socket.timeout as ex:
logging.info(repr(ex))
return None
except Exception as ex:
logging.error(repr(ex))
return None
def _addPendingCtx(self, pThreadCtx):
with self.mLock:
self.mPendingCtxs.add(pThreadCtx)
def _removePendingCtx(self, pThreadCtx):
with self.mLock:
self.mPendingCtxs.discard(pThreadCtx)
def _entryPoint(self, pThreadCtx):
def _waiti(_pArg):
if self.mAffinityInproc: return affinityinproc.waitnotif(self.mDbConnection._s(None), _pArg, 5000)
return self._callServer("http://%s/db/?i=waitnotif&%s&timeout=5000" % (self.mDbConnection.host(), _pArg), pTimeout=10)
while not pThreadCtx.finished:
self._addPendingCtx(pThreadCtx)
if pThreadCtx.mServerToken:
lUrlArg = "notifparam=%s" % pThreadCtx.mServerToken
lRawRet = _waiti(lUrlArg)
if lRawRet != None:
lRet = json.loads(lRawRet)
lRetKeys = lRet.keys()
if len(lRetKeys) > 0 and lRetKeys[0] != "timeout":
if lRetKeys[0] != pThreadCtx.mServerToken:
logging.warn("unexpected response for token %s: %s" % (pThreadCtx.mServerToken, lRetKeys[0]))
pThreadCtx.mHandler(pThreadCtx.mHandlerData, lRet[lRetKeys[0]])
else:
lUrlArg = "clientid=%s" % pThreadCtx.mCriterion
lRawRet = _waiti(lUrlArg)
if lRawRet != None:
lRet = json.loads(lRawRet)
lRetKeys = lRet.keys()
if len(lRetKeys) > 0 and lRetKeys[0] != "timeout":
pThreadCtx.mHandler(pThreadCtx.mHandlerData, lRet)
self._removePendingCtx(pThreadCtx)
@staticmethod
def _staticEntryPoint(*args, **kwargs):
"The thread entry point for all _ThreadCtx.mThread objects."
args[0]._entryPoint(args[1])
@staticmethod
def _groupHandler(pSelf, pNotifData):
"The notification handler for notifications that are grouped by clientid; allows to use a single connection per client process for all notifications."
lHandlers = []
logging.debug("got group notif data: %s" % pNotifData)
with pSelf.mLock:
for iT in pNotifData.items():
logging.debug("token %s: %s notifications" % (iT[0], len(iT[1])))
try:
if pSelf.mTokens.has_key(iT[0]) and None == pSelf.mTokens[iT[0]].mThread:
lHandlers.append([pSelf.mTokens[iT[0]].mHandler, pSelf.mTokens[iT[0]].mHandlerData, iT[1]])
except Exception as ex:
logging.warn("caught exception during notification: %s" % repr(ex))
for iH in lHandlers:
logging.debug("forwarded group notification for %s" % iH[2])
iH[0](iH[1], iH[2])
@staticmethod
def serializeLocalPID(pLocalPID):
"Normalized representation of PIDs for notification purposes."
if isInteger(pLocalPID):
return "%x" % pLocalPID
return pLocalPID
AFYNOTIFIER = AfyNotifier()