-
Notifications
You must be signed in to change notification settings - Fork 2
/
bx_eventhandlers.py
231 lines (172 loc) · 7.57 KB
/
bx_eventhandlers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
#!/usr/bin/env python
import sys
import dateutil.parser
import traceback
import json
import time
import random
import datetime
from contextlib import ContextDecorator
from snap import common
#from mercury import journaling as jrnl
from bx_services import S3Key
class UnrecognizedJobType(Exception):
def __init__(self, job_tag):
super().__init__(self, 'Could not determine type for job %s' % job_tag)
class NoHandlerRegisteredForJobType(Exception):
def __init__(self, job_type):
super().__init__(self, 'No handler function registered for job type "%s"' % job_type)
class start_timer(ContextDecorator):
def __init__(self):
super().__init__()
self.start_time = None
def __enter__(self):
self.start_time = time.time()
return self
def poll_seconds(self):
return int(time.time() - self.start_time)
def reset(self):
self.start_time = time.time()
def __exit__(self, *exc):
return False
'''
def get_handler_for_job_type(job_type):
handler_func = JOB_DISPATCH_TABLE.get(job_type)
if not handler_func:
raise NoHandlerRegisteredForJobType(job_type)
return handler_func
'''
def arbitrate(bidder_list, service_registry):
# Decide which bidder gets assigned a job, using a simple random selector.
# This is only for the proof of concept; we will upgrade to smarter (and user-pluggable)
# arbitration methods once we shake the system out.
print('#####------- Arbitrating bid data:')
print(common.jsonpretty(bidder_list))
random.seed(time.time())
index = random.randrange(0, len(bidder_list))
return [bidder_list[index]]
def trigger_arbitration(service_registry, **kwargs):
current_time = datetime.datetime.now()
# scan ALL open bidding windows
api_service = service_registry.lookup('job_mgr_api')
response = api_service.get_open_bid_windows()
bid_windows = response.json()['data']['bidding_windows']
print('###----- Retrieved open bid windows from API endpoint:')
print(bid_windows)
# for each open window, see who has bid;
for bwindow in bid_windows:
job_tag = bwindow['job_tag']
window_id = bwindow['bidding_window_id']
if bwindow['policy']['limit_type'] == 'num_bids':
print('++ Policy limit is %d bids.' % int(bwindow['policy']['limit']))
json_bidder_data = api_service.get_active_job_bids(job_tag)
bidding_users = json_bidder_data.json()['data']['bidders']
num_bids = len(bidding_users)
policy_limit_bids = int(bwindow['policy']['limit'])
if num_bids >= policy_limit_bids:
winners = arbitrate(bidding_users, service_registry)
if len(winners):
print('!!!!!!!!!!! WE HAVE A WINNER !!!!!!!!!!!!!!!!!!')
print(common.jsonpretty(winners))
api_service.award_job(window_id, winners)
else:
print('### No winner determined in the arbitration round ending %s.' % current_time.isoformat())
elif bwindow['policy']['limit_type'] == 'time_seconds':
# see how long the window has been open;
window_opened_at = dateutil.parser.parse(bwindow['open_ts'])
window_open_duration = (current_time - window_opened_at).seconds
policy_limit_seconds = int(bwindow['policy']['limit'])
if window_open_duration >= policy_limit_seconds:
json_bidder_data = api_service.get_active_job_bids(job_tag)
bid_data = json_bidder_data.json()['data']['bidders']
if len(bid_data):
winners = arbitrate(bid_data, service_registry)
if len(winners):
print('!!!!!!!!!!! WE HAVE A WINNER !!!!!!!!!!!!!!!!!!')
api_service.award_job(window_id, winners)
else:
print('### No winner determined in the arbitration round ending %s.' % current_time.isoformat())
else:
print('### No more bidders in this round.')
else:
# raise hell; we don't support that
raise Exception('Unrecognized bidding window policy limit_type: %s' % bwindow['policy']['limit_type'])
# use the policy data embedded in the bidding window to decide whether
# to trigger (manual | automatic) arbitration
def handle_job_posted(service_registry, **kwargs):
'''when a job is posted, broadcast the notice via SMS to all available couriers,
who may then "bid" to accept the job. The current JSON format for a job posting is:
{
"job_data": {
# <job_data DB table fields>
},
"bid_window": {
"id": <window_id>,
"limit_type": "time_seconds" | "num_bids"
"limit": <limit>
}
}
'''
job_tag = kwargs['job_data']['job_tag']
# text the tag of the available job to all couriers who were in the on-call roster
# at the time the job was posted
sms_service = service_registry.lookup('sms')
api_service = service_registry.lookup('job_mgr_api')
# get_available_couriers() should return:
# { "data": "couriers": [{ <data> }, ...]
response = api_service.get_available_couriers()
couriers = response.json()['data']['couriers']
for courier_record in couriers:
sms_service.send_sms(courier_record['mobile_number'], job_tag)
# done for now
S3_EVENT_DISPATCH_TABLE = {
'posted': handle_job_posted,
'scan': trigger_arbitration
}
def scan_handler(message, receipt_handle, service_registry):
print('### Inside top-level SCAN event handler function.')
print("### Triggering bid arbitration...")
# print(message)
trigger_arbitration(service_registry)
def msg_handler(message, receipt_handle, service_registry):
s3_svc = service_registry.lookup('s3')
print('### Inside SQS message handler function.')
print("### message follows:")
print(common.jsonpretty(message))
# unpack SQS message to get notification about S3 file upload
message_body_raw = message['Body']
message_body = json.loads(message_body_raw)
for record in message_body['Records']:
s3_data = record.get('s3')
if not record:
continue
bucket_name = s3_data['bucket']['name']
object_key = s3_data['object']['key']
# TODO: set a limit on file size?
print('#--- received object upload notification [ bucket: %s, key: %s ]' % (bucket_name, object_key))
s3key = S3Key(bucket_name, object_key)
jsondata = None
try:
jsondata = s3_svc.download_json(bucket_name, object_key)
print('### JSON payload data:')
print(common.jsonpretty(jsondata))
# we use the name of the top-level S3 "folder" to select the action to perform,
# by keying into the dispatch table
channel_id = object_key.split('/')[0]
handler = S3_EVENT_DISPATCH_TABLE.get(channel_id)
if not handler:
raise Exception('no handler registered for S3 upload events to bucket %s with key %s' % (bucket_name, object_key))
handler(service_registry, **jsondata)
except Exception as err:
print('Error handling JSON job data from URI %s.' % s3key.uri)
print(err)
traceback.print_exc(file=sys.stdout)
return
"""
# to time a block of code:
time_log = jrnl.TimeLog()
timer_label = 'job_handler_exec_time: %s' % job_tag
with jrnl.stopwatch(timer_label, time_log):
<code block>
print(time_log.readout)
"""