-
Notifications
You must be signed in to change notification settings - Fork 1
/
polling_edge_service.py
160 lines (134 loc) · 5.68 KB
/
polling_edge_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from typing import Optional
import re
import urllib3
import threading
import logging
import json
import asyncio
from hashlib import sha256
from typing import List
from featurehub_sdk.edge_service import EdgeService
from featurehub_sdk.featurehub_repository import FeatureHubRepository
from featurehub_sdk.version import sdk_version
log = logging.getLogger('featurehub_sdk')
class PollingEdgeService(EdgeService):
_interval: int
_repository: FeatureHubRepository
_cancel: bool
_thread: Optional[threading.Timer]
_client_eval: bool
_stopped: bool
_http: urllib3.PoolManager
_cache_control_pattern: re.Pattern
_sha_context: Optional[str]
_etag: Optional[str]
def __init__(self, edge_url: str, api_keys: List[str],
repository: FeatureHubRepository,
interval: int):
self._interval = interval
self._repository = repository
self._cancel = False
self._stopped = False
self._thread = None
self._client_eval = '*' in api_keys[0]
self._context = None
self._etag = None
self._sha_context = None
self._http = urllib3.PoolManager()
self._cache_control_pattern = re.compile('max-age=(\\d+)')
self._url = f"{edge_url}features?" + "&".join(map(lambda i: 'apiKey=' + i, api_keys))
log.debug(f"polling url {self._url}")
# allow us to update the interval of the current polling edge service
def update_interval(self, interval: int):
self._interval = interval
old_cancel = self._cancel
self._cancel = False
if old_cancel: # if we had cancelled, start polling again
self.poll_with_interval()
# this does the business, calls the remote service and gets the features back
async def _get_updates(self):
# TODO: set timeout of tcp requests to 12 seconds, or give users control over it using environ vars
sha_context = "0" if self._sha_context is None else self._sha_context
url = f"{self._url}&contextSha={sha_context}"
log.debug("polling %s", url)
headers = {
'X-SDK': 'Python',
'X-SDK-Version': sdk_version
}
if self._etag:
headers['if-none-match'] = self._etag
if self._context:
headers['x-featurehub'] = self._context
resp = self._http.request(method='GET', url=url, headers=headers)
log.debug("polling status %s", resp.status)
if resp.status == 200 or resp.status == 236:
if 'etag' in resp.headers:
self._etag = resp.headers['etag']
if 'cache-control' in resp.headers:
self._cache_control_polling_interval(resp.headers['cache-control'])
self._process_successful_results(json.loads(resp.data.decode('utf-8')))
# if it is a 236, we have been told to stop
if resp.status == 236:
self._stopped = True
elif resp.status == 404: # no such key
self._repository.notify("failed", None)
self._cancel = True
log.error("Specified API Key does not exist %s", self._url)
elif resp.status == 503:
# dacha is busy, just wait
return
# otherwise its likely a transient failure, so keep trying
def _cache_control_polling_interval(self, cache_control: str):
max_age = re.findall(self._cache_control_pattern, cache_control)
if max_age: # not none and not empty
new_interval = int(max_age[0])
if new_interval > 0:
self._interval = new_interval
# this is essentially a repeating task because it "calls itself"
# another way to do this is with a separate class that is itself a thread descendant
# which waits for the requisite time, then triggers a callback and then essentially does the same thing
# if we need a repeating task elsewhere, we should consider refactoring this
async def poll_with_interval(self):
if not self._cancel and not self._stopped:
await self._get_updates()
if not self._cancel and self._interval > 0:
self._thread = threading.Timer(self._interval, self.poll_again)
self._thread.daemon = True # allow it to just disappear off if the app closes down
self._thread.start()
# this ends up being synchronous
def poll_again(self):
asyncio.run(self.poll_with_interval())
# async polls, you can choose not to wait for updates
# if the interval is zero, this will just issue a get updates and stop
async def poll(self):
self._cancel = False
await self.poll_with_interval()
def client_evaluated(self):
return self._client_eval
def close(self):
self._cancel = True
if self._thread is not None:
self._thread.cancel()
self._thread = None
async def context_change(self, header: str):
old_context = self._context
self._context = header
self._sha_context = sha256(header.encode('utf-8')).hexdigest()
if old_context != header:
await self._get_updates()
@property
def cancelled(self):
return self._cancel
@property
def stopped(self):
return self._stopped
@property
def interval(self):
return self._interval
# we get returned a bunch of environments for a GET/Poll API so we need to cycle through them
# the result is different for streaming
def _process_successful_results(self, data):
log.debug("featurehub polling data was %s", data)
for feature_apikey in data:
if feature_apikey:
self._repository.notify("features", feature_apikey['features'])