-
Notifications
You must be signed in to change notification settings - Fork 2
/
bx_transforms.py
1908 lines (1457 loc) · 77.3 KB
/
bx_transforms.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
import sys
import re
import uuid
import json
import traceback
import datetime
from urllib.parse import unquote_plus
from collections import namedtuple
from snap import snap, common
from snap import core
# from snap.loggers import transform_logger as log
# from sqlalchemy.sql import text
import git
# import constants as const
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy import and_, or_
from bxcommon import ListOutputResponder
'''
TODO: if a job's core information changes AFTER the job has been accepted, auto-generate message(s) for the courier
containing the updated information
TODO: figure out credentials and groups (add Sasha)
--aws secrets manager?
TODO: phased jobs require the concept of "teams" == any jobdata pushed to S3 should have a "team_size" attribute
TODO: if a courier accepts only one phase of an advertised job:
1. continue broadcasting with an updated notice that only the remaining phase is required, and
2. update the job's status to "accepted_partial".
Once all phases are accepted, ensure that the multiple assigned couriers are notified of their respective
assignments.
'''
SMSCommandSpec = namedtuple('SMSCommandSpec', 'command definition synonyms tag_required')
SMSGeneratorSpec = namedtuple('SMSGeneratorSpec', 'command definition specifier filterchar')
SMSPrefixSpec = namedtuple('SMSPrefixSpec', 'command definition defchar')
SYSTEM_ID = 'bxlog'
NETWORK_ID = 'ccr'
SMS_SYSTEM_COMMAND_SPECS = {
'on': SMSCommandSpec(command='on', definition='Courier coming on duty', synonyms=[], tag_required=False),
'off': SMSCommandSpec(command='off', definition='Courier going off duty', synonyms=[], tag_required=False),
'bid': SMSCommandSpec(command='bid', definition='Bid for a delivery job', synonyms=[], tag_required=True),
'acc': SMSCommandSpec(command='acc', definition='Accept a delivery job', synonyms=['ac'], tag_required=True),
'dt': SMSCommandSpec(command='dt', definition='Detail (find out what a particular job entails)', synonyms=[], tag_required=True),
'ert': SMSCommandSpec(command='ert', definition='En route to either pick up or deliver for a job', synonyms=['er'], tag_required=True),
'can': SMSCommandSpec(command='can', definition='Cancel (courier can no longer complete an accepted job)', synonyms=[], tag_required=True),
'mdel': SMSCommandSpec(command='mdel', definition='Delete a user message', synonyms=[], tag_required=False),
'fin': SMSCommandSpec(command='fin', definition='Finished a delivery', synonyms=['f'], tag_required=True),
'911': SMSCommandSpec(command='911', definition='Courier is having a problem and needs assistance', synonyms=[], tag_required=False),
'hlp': SMSCommandSpec(command='hlp', definition='Display help prompts', synonyms=['?'], tag_required=False)
}
SMS_GENERATOR_COMMAND_SPECS = {
'my': SMSGeneratorSpec(command='my', definition='List my pending (already accepted) jobs', specifier='.', filterchar='?'),
'opn': SMSGeneratorSpec(command='opn', definition='List open (available) jobs', specifier='.', filterchar='?'),
'awd': SMSGeneratorSpec(command='awd', definition='List my awarded (but not yet accepted) jobs', specifier='.', filterchar='?'),
'prg': SMSGeneratorSpec(command='prg', definition='List jobs in progress', specifier='.', filterchar='?'),
'msg': SMSGeneratorSpec(command='msg', definition='Display messages belonging to user', specifier='.', filterchar='?'),
'bst': SMSGeneratorSpec(command='bst', definition='Bidding status (jobs you have bid on)', specifier='.', filterchar='?')
}
SMS_PREFIX_COMMAND_SPECS = {
'$': SMSPrefixSpec(command='$', definition='create a user-defined macro', defchar=':'),
'@': SMSPrefixSpec(command='@', definition="send a message to a user's log via his or her handle", defchar=' '),
'&': SMSPrefixSpec(command='&', definition='create a user handle for yourself', defchar=' '),
'#': SMSPrefixSpec(command='#', definition='look up an abbreviation', defchar=':')
}
SMS_RESPONSES = {
'assign_job': 'Thank you for responding -- job tag {tag} has been assigned to you.',
'assigned_to_other': 'Another courier in the network responded first, but thank you for stepping up.'
}
ABBREVIATIONS = {
'etrm': 'East Tremont',
'mhvn': 'Mott Haven',
'bsty': 'Bed-Stuy',
'ftgr': 'Fort Greene',
'bhts': 'Brooklyn Heights',
'fltb': 'Flatbush',
'bwck': 'Bushwick',
'wbrg': 'Williamsburg',
'grpt': 'Greenpoint',
'dtbk': 'Downtown Brooklyn',
'ctnh': 'Clinton Hill',
}
REPLY_ASSIGN_JOB_TPL = 'Thank you for responding -- job tag {tag} has been assigned to you.'
REPLY_ASSIGNED_TO_OTHER = 'Another courier in the network responded first, but thank you for stepping up.'
REPLY_NOT_IN_NETWORK = "You have sent a control message to our logistics network, but you haven't been registered as one of our couriers."
REPLY_GET_INVOLVED_TPL = "If you'd like to become a courier, please send email to {contact_email} and someone will contact you."
REPLY_CMD_FORMAT = "You have texted a command that requires a job tag. Text the job tag, a space, and then the command."
REPLY_CMD_HELP_AVAILABLE = 'Text "help" to the target number to get a list of command strings and what they do.'
REPLY_INVALID_TAG_TPL = 'The job tag you have specified (%s) appears to be invalid.'
JOB_STATUS_BROADCAST = 0
JOB_STATUS_AWARDED = 1
JOB_STATUS_ACCEPTED = 3
JOB_STATUS_IN_PROGRESS = 4
JOB_STATUS_COMPLETED = 5
def generate_assign_job_reply(**kwargs):
return REPLY_ASSIGN_JOB_TPL.format(**kwargs)
def generate_get_involved_reply(**kwargs):
return REPLY_GET_INVOLVED_TPL.format(**kwargs)
def copy_fields_from(source_dict, *fields):
output_dict = {}
for field in fields:
output_dict[field] = source_dict.get(field)
return output_dict
def generate_job_tag(name):
id = uuid.uuid4()
raw_tag = '%s-%s' % (name, id)
return raw_tag.replace('_', '-')
def is_valid_job_tag(tag):
# TODO: get a regex going for this
if not tag.startswith(SYSTEM_ID):
return False
if tag.find(' ') != -1:
return False
return True
def normalize_mobile_number(number_string):
return number_string.lstrip('+').lstrip('1').replace('(', '').replace(')', '').replace('-', '').replace('.', '').replace(' ', '')
class ObjectFactory(object):
@classmethod
def create_courier(cls, db_svc, **kwargs):
Courier = db_svc.Base.classes.couriers
return Courier(**kwargs)
@classmethod
def create_courier_transport_method(cls, db_svc, **kwargs):
CourierTransportMethod = db_svc.Base.classes.courier_transport_methods
return CourierTransportMethod(**kwargs)
@classmethod
def create_courier_borough(cls, db_svc, **kwargs):
CourierBorough = db_svc.Base.classes.courier_boroughs
return CourierBorough(**kwargs)
@classmethod
def create_client(cls, db_svc, **kwargs):
Client = db_svc.Base.classes.clients
return Client(**kwargs)
@classmethod
def create_job(cls, db_svc, **kwargs):
Job = db_svc.Base.classes.job_data
kwargs['created_ts'] = datetime.datetime.now()
return Job(**kwargs)
@classmethod
def create_job_status(cls, db_svc, **kwargs):
JobStatus = db_svc.Base.classes.job_status
return JobStatus(**kwargs)
@classmethod
def create_job_bid(cls, db_svc, **kwargs):
JobBid = db_svc.Base.classes.job_bids
kwargs['write_ts'] = datetime.datetime.now()
return JobBid(**kwargs)
@classmethod
def create_bidding_window(cls, db_svc, **kwargs):
policy = {
"limit_type": "time_seconds",
"limit": 15,
}
BiddingWindow = db_svc.Base.classes.bidding_windows
kwargs['open_ts'] = datetime.datetime.now()
kwargs['policy'] = policy
return BiddingWindow(**kwargs)
@classmethod
def create_macro(cls, db_svc, **kwargs):
UserMacro = db_svc.Base.classes.user_macros
return UserMacro(**kwargs)
@classmethod
def create_user_log(cls, db_svc, **kwargs):
UserLog = db_svc.Base.classes.messages
kwargs['created_ts'] = datetime.datetime.now()
return UserLog(**kwargs)
@classmethod
def create_user_handle(cls, db_svc, **kwargs):
UserHandle = db_svc.Base.classes.user_handle_maps
return UserHandle(**kwargs)
@classmethod
def create_job_assignment(cls, db_svc, **kwargs):
JobAssignment = db_svc.Base.classes.job_assignments
return JobAssignment(**kwargs)
def lookup_transport_method_ids(name_array, session, db_svc):
TransportMethod = db_svc.Base.classes.transport_methods
ids = []
for name in name_array:
# one() will throw an exception if no match
try:
result = session.query(TransportMethod).filter(TransportMethod.value == name).one()
ids.append(result.id)
except NoResultFound:
pass
return ids
def lookup_borough_ids(name_array, session, db_svc):
Borough = db_svc.Base.classes.boroughs
ids = []
for name in name_array:
try:
result = session.query(Borough).filter(Borough.value == name).one()
ids.append(result.id)
except NoResultFound:
pass
return ids
def lookup_payment_method_id(name, session, db_svc):
PaymentMethod = db_svc.Base.classes.lookup_payment_methods
try:
method = session.query(PaymentMethod).filter(PaymentMethod.value == name).one()
return method.id
except NoResultFound:
return None
def lookup_couriers_by_status(status, session, db_svc):
couriers = []
Courier = db_svc.Base.classes.couriers
resultset = session.query(Courier).filter(Courier.duty_status == status) # inactive is 0, active is 1
for record in resultset:
couriers.append({
'id': record.id,
'first_name': record.first_name,
'last_name': record.last_name,
'mobile_number': record.mobile_number
})
return couriers
def lookup_job_data_by_tag(tag, session, db_svc):
Jobdata = db_svc.Base.classes.job_data
try:
job = session.query(Jobdata).filter(and_(Jobdata.job_tag == tag,
Jobdata.deleted_ts == None)).one()
return job
except NoResultFound:
return None
def lookup_courier_by_id(courier_id, session, db_svc):
Courier = db_svc.Base.classes.couriers
try:
return session.query(Courier).filter(Courier.id == courier_id).one()
except NoResultFound:
return None
def lookup_bidding_window_by_id(window_id, session, db_svc):
BiddingWindow = db_svc.Base.classes.bidding_windows
try:
return session.query(BiddingWindow).filter(BiddingWindow.id == window_id).one()
except NoResultFound:
return None
def lookup_open_bidding_window_by_job_tag(job_tag, session, db_svc):
current_time = datetime.datetime.now()
BiddingWindow = db_svc.Base.classes.bidding_windows
try:
return session.query(BiddingWindow).filter(and_(BiddingWindow.job_tag == job_tag,
BiddingWindow.open_ts <= current_time,
or_(BiddingWindow.close_ts == None,
BiddingWindow.close_ts > current_time))).one()
except NoResultFound:
return None
def lookup_open_bidding_windows(session, db_svc):
current_time = datetime.datetime.now()
BiddingWindow = db_svc.Base.classes.bidding_windows
resultset = session.query(BiddingWindow).filter(and_(BiddingWindow.open_ts <= current_time,
or_(BiddingWindow.close_ts == None,
BiddingWindow.close_ts > current_time))).all()
for record in resultset:
yield record
def lookup_live_courier_handle(courier_id, session, db_svc):
UserHandle = db_svc.Base.classes.user_handle_maps
try:
return session.query(UserHandle).filter(and_(UserHandle.user_id == courier_id,
UserHandle.expired_ts == None)).one()
except NoResultFound:
return None
def lookup_courier_by_handle(handle, session, db_svc):
HandleMap = db_svc.Base.classes.user_handle_maps
try:
handle_map = session.query(HandleMap).filter(and_(HandleMap.handle == handle,
HandleMap.expired_ts == None)).one()
return lookup_courier_by_id(handle_map.user_id, session, db_svc)
except NoResultFound:
return None
def lookup_current_job_status(job_tag, session, db_svc):
JobStatus = db_svc.Base.classes.job_status
try:
return session.query(JobStatus).filter(and_(JobStatus.job_tag == job_tag,
JobStatus.expired_ts == None)).one()
except NoResultFound:
return None
def lookup_user_job_bid(job_tag, courier_id, session, db_svc):
JobBid = db_svc.Base.classes.job_bids
try:
return session.query(JobBid).filter(and_(JobBid.job_tag == job_tag,
JobBid.courier_id == courier_id,
JobBid.expired_ts == None)).one()
except NoResultFound:
return None
def lookup_bid_by_id(bid_id, session, db_svc):
JobBid = db_svc.Base.classes.job_bids
try:
return session.query(JobBid).filter(and_(JobBid.id == bid_id,
JobBid.expired_ts == None)).one()
except NoResultFound:
return None
def job_is_available(job_tag, session, db_svc):
JobStatus = db_svc.Base.classes.job_status
try:
# status 0 is "broadcast" (available for bidding)
session.query(JobStatus).filter(and_(JobStatus.expired_ts == None,
JobStatus.job_tag == job_tag,
JobStatus.status == JOB_STATUS_BROADCAST)).one()
return True
except NoResultFound:
return False
def job_is_awarded(job_tag, session, db_svc):
JobStatus = db_svc.Base.classes.job_status
try:
# status 1 is "awarded" (bidding is complete and there is a winner)
session.query(JobStatus).filter(and_(JobStatus.expired_ts == None,
JobStatus.job_tag == job_tag,
JobStatus.status == JOB_STATUS_AWARDED)).one()
return True
except NoResultFound:
return False
def job_belongs_to_courier(job_tag, courier_id, session, db_svc):
JobAssignment = db_svc.Base.classes.job_assignments
try:
session.query(JobAssignment).filter(and_(JobAssignment.job_tag == job_tag,
JobAssignment.courier_id == courier_id)).one()
return True
except NoResultFound:
return False
def list_user_bids(courier_id, session, db_svc):
BidWindow = db_svc.Base.classes.bidding_windows
JobBid = db_svc.Base.classes.job_bids
bids = []
for window, bid in session.query(BidWindow, JobBid).filter(and_(JobBid.bidding_window_id == BidWindow.id,
BidWindow.close_ts == None,
JobBid.courier_id == courier_id,
JobBid.expired_ts == None)).all():
bids.append({
'bid_id': bid.id,
'window_id': window.id,
'job_tag': bid.job_tag,
'timestamp': bid.write_ts
})
return bids
def list_awarded_jobs(courier_id, session, db_svc):
jobs = []
JobBid = db_svc.Base.classes.job_bids
for bid in session.query(JobBid).filter(and_(JobBid.courier_id == courier_id,
JobBid.expired_ts == None,
JobBid.accepted_ts != None)).all():
if job_is_awarded(bid.job_tag, session, db_svc):
job = lookup_job_data_by_tag(bid.job_tag, session, db_svc)
if not job:
raise Exception('There is an orphan job in this dataset. Please contact your administrator.')
else:
jobs.append(job)
return jobs
'''
# status 1 is "awarded" (granted to winning bidder, but not yet accepted)
status_record = session.query(JobStatus).filter(and_(JobStatus.expired_ts == None,
JobStatus.job_tag == job_tag,
JobStatus.status == 1)).one()
'''
def list_accepted_jobs(courier_id, session, db_svc):
JobAssignment = db_svc.Base.classes.job_assignments
JobStatus = db_svc.Base.classes.job_status
# resultset = session.query(JobAssignment).filter(JobAssignment.courier_id == dlg_context.courier.id).all()
jobs = []
for ja, stat in session.query(JobAssignment,
JobStatus).filter(and_(JobStatus.job_tag == JobAssignment.job_tag,
JobStatus.status == JOB_STATUS_ACCEPTED,
JobStatus.expired_ts == None,
JobAssignment.courier_id == courier_id)).all():
jobs.append(stat)
return jobs
def list_available_jobs(session, db_svc):
jobs = []
JobStatus = db_svc.Base.classes.job_status
resultset = session.query(JobStatus).filter(and_(JobStatus.expired_ts == None,
JobStatus.status == 0)).all()
for record in resultset:
jobs.append(record)
return jobs
def list_in_progress_jobs_for_courier(courier_id, session, db_svc):
jobs = []
JobAssignment = db_svc.Base.classes.job_assignments
JobStatus = db_svc.Base.classes.job_status
for jas, jstat in session.query(JobAssignment, JobStatus).filter(and_(JobAssignment.job_tag == JobStatus.job_tag,
JobStatus.expired_ts == None,
JobStatus.status == JOB_STATUS_IN_PROGRESS,
JobAssignment.courier_id == courier_id)).all():
jobs.append(jstat)
return jobs
def prepare_courier_record(input_data, session, db_svc):
output_record = copy_fields_from(input_data, 'first_name', 'last_name', 'email')
output_record['mobile_number'] = normalize_mobile_number(input_data['mobile_number'])
output_record['duty_status'] = 0 # 0 is inactive, 1 is active
return output_record
def prepare_bid_window_record(input_data, session, db_svc):
output_record = copy_fields_from(input_data, 'job_tag')
job = lookup_job_data_by_tag(input_data['job_tag'], session, db_svc)
if not job:
raise Exception('no job found with tag %s.' % input_data['job_tag'])
output_record['job_id'] = job.id
return output_record
def prepare_job_record(input_data, session, db_svc):
output_record = copy_fields_from(input_data,
'client_id',
'delivery_address',
'delivery_borough',
'delivery_zip',
'delivery_neighborhood',
'pickup_address',
'pickup_borough',
'pickup_neighborhood',
'pickup_zip',
'items',
'delivery_window_open',
'delivery_window_close')
borough_tag = input_data['delivery_borough'].lstrip().rstrip().lower().replace(' ', '_')
output_record['payment_method'] = lookup_payment_method_id(input_data['payment_method'], session, db_svc)
output_record['job_tag'] = generate_job_tag('bxlog_%s_%s' % (borough_tag, input_data['delivery_zip']))
return output_record
def ok_status(message, **kwargs):
result = {
'status': 'ok',
'message': message
}
if kwargs:
result['data'] = kwargs
return json.dumps(result)
def exception_status(err, **kwargs):
result = {
'status': 'error',
'error_type': err.__class__.__name__,
'message': 'an exception of type %s occurred: %s' % (err.__class__.__name__, str(err))
}
if kwargs:
result.update(**kwargs)
return json.dumps(result)
def update_job_status(job_tag, new_status, session, db_svc):
current_time = datetime.datetime.now()
current_status_record = lookup_current_job_status(job_tag, session, db_svc)
if current_status_record.status == new_status:
return False
print('### updating the status for job tag %s...' % job_tag)
# expire the existing status
current_status_record.expired_ts = current_time
session.add(current_status_record)
# add a new status record
new_job_status = ObjectFactory.create_job_status(db_svc,
job_tag=job_tag,
status=new_status,
write_ts=current_time)
session.add(new_job_status)
return True
def ping_func(input_data, service_objects, **kwargs):
repo = git.Repo(search_parent_directories=True)
sha = repo.head.object.hexsha
return core.TransformStatus(ok_status('The BXLOGIC web listener is alive.', commit_id=sha))
def new_courier_func(input_data, service_objects, **kwargs):
db_svc = service_objects.lookup('postgres')
courier_id = None
with db_svc.txn_scope() as session:
methods = [m.lstrip().rstrip() for m in input_data['transport_methods'].split(',')]
transport_method_ids = lookup_transport_method_ids(methods, session, db_svc)
boroughs = [b.lstrip().rstrip() for b in input_data['boroughs'].split(',')]
borough_ids = lookup_borough_ids(boroughs, session, db_svc)
raw_record = prepare_courier_record(input_data, session, db_svc)
courier = ObjectFactory.create_courier(db_svc, **raw_record)
session.add(courier)
session.flush()
courier_id = courier.id
for id in transport_method_ids:
session.add(ObjectFactory.create_courier_transport_method(db_svc,
courier_id=courier.id,
transport_method_id=id))
for id in borough_ids:
session.add(ObjectFactory.create_courier_borough(db_svc,
courier_id=courier.id,
borough_id=id))
return core.TransformStatus(ok_status('new Courier created', id=courier_id))
def new_job_func(input_data, service_objects, **kwargs):
db_svc = service_objects.lookup('postgres')
job_id = None
raw_record = None
with db_svc.txn_scope() as session:
try:
raw_record = prepare_job_record(input_data, session, db_svc)
job = ObjectFactory.create_job(db_svc, **raw_record)
session.add(job)
session.flush()
job_id = job.id
status_record = ObjectFactory.create_job_status(db_svc,
job_tag=raw_record['job_tag'],
status=0,
write_ts=datetime.datetime.now())
session.add(status_record)
# we've created a job, now open it up for bids
bidding_window = ObjectFactory.create_bidding_window(db_svc,
job_id=job_id,
job_tag=raw_record['job_tag'])
session.add(bidding_window)
# now push the job notification to S3, which will broadcast the event
# to the courier network
raw_record['id'] = job_id
pipeline_svc = service_objects.lookup('job_pipeline')
s3_svc = service_objects.lookup('s3')
pipeline_svc.post_job_notice(raw_record['job_tag'],
s3_svc,
job_data=raw_record)
return core.TransformStatus(ok_status('new Job created', data=raw_record))
except Exception as err:
session.rollback()
return core.TransformStatus(exception_status(err), False, message=str(err))
def new_client_func(input_data, service_objects, **kwargs):
db_svc = service_objects.lookup('postgres')
client_id = None
with db_svc.txn_scope() as session:
client = ObjectFactory.create_client(db_svc, **input_data)
session.add(client)
session.flush()
client_id = client.id
input_data['id'] = client_id
return core.TransformStatus(ok_status('new Client created', data=input_data))
def lookup_sms_command(cmd_string):
for key, cmd_spec in SMS_SYSTEM_COMMAND_SPECS.items():
if cmd_string == key:
return cmd_spec
if cmd_string in cmd_spec.synonyms:
return cmd_spec
return None
def lookup_generator_command(cmd_string):
for key, cmd_spec in SMS_GENERATOR_COMMAND_SPECS.items():
delimiter = cmd_spec.specifier
filter_char = cmd_spec.filterchar
print('the filter character is [%s].' % filter_char)
if cmd_string.split(delimiter)[0] == key:
return cmd_spec
if cmd_string.split(filter_char)[0] == key:
return cmd_spec
return None
class UnrecognizedSMSCommand(Exception):
def __init__(self, cmd_string):
super().__init__(self, 'Invalid SMS command %s' % cmd_string)
class IncompletePrefixCommand(Exception):
def __init__(self, cmd_string):
super().__init__(self, 'Incomplete prefix command %s' % cmd_string)
SystemCommand = namedtuple('SystemCommand', 'job_tag cmdspec modifiers')
GeneratorCommand = namedtuple('GeneratorCommand', 'cmd_string cmdspec modifiers')
PrefixCommand = namedtuple('PrefixCommand', 'mode name body cmdspec')
CommandInput = namedtuple('CommandInput', 'cmd_type cmd_object') # command types: generator, syscommand, prefix
def parse_sms_message_body(raw_body):
job_tag = None
command_string = None
modifiers = []
# make sure there's no leading whitespace, then see what we've got
body = unquote_plus(raw_body).lstrip().rstrip().lower()
print('\n\n###__________ inside parse logic. Raw message body is:')
print(body)
print('###__________\n\n')
if body.startswith('bxlog-'):
# remove the URL encoded whitespace chars;
# remove any trailing/leading space chars as well
tokens = [token.lstrip().rstrip() for token in body.split(' ') if token]
print('###------ message tokens: %s' % common.jsonpretty(tokens))
job_tag = tokens[0]
if len(tokens) == 2:
command_string = tokens[1].lower()
if len(tokens) > 2:
command_string = tokens[1].lower()
modifiers = tokens[2:]
print('#--------- looking up system SMS command: %s...' % command_string)
command_spec = lookup_sms_command(command_string)
if command_spec:
return CommandInput(cmd_type='syscommand', cmd_object=SystemCommand(job_tag=job_tag,
cmdspec=command_spec,
modifiers=modifiers))
raise UnrecognizedSMSCommand(command_string)
elif body[0] in SMS_PREFIX_COMMAND_SPECS.keys():
prefix = body[0]
prefix_spec = SMS_PREFIX_COMMAND_SPECS[prefix]
print('### probable prefix command "%s". Body length is %d.' % (prefix, len(body)))
if len(body) == 1:
raise IncompletePrefixCommand(command_string)
raw_body = body[1:].lower()
defchar_index = raw_body.find(prefix_spec.defchar)
# when a prefix command is issued containing a defchar, that is its "extended" mode
if defchar_index > 0:
command_mode = 'extended'
command_name = raw_body[0:defchar_index]
command_data = raw_body[defchar_index+1:]
# prefix commands issued without the defchar are running in "simple" mode
else:
command_mode = 'simple'
command_name = raw_body
command_data = None
return CommandInput(cmd_type='prefix',
cmd_object=PrefixCommand(mode=command_mode,
name=command_name,
body=command_data,
cmdspec=prefix_spec))
else:
tokens = [token.lstrip().rstrip() for token in body.split(' ') if token]
command_string = tokens[0].lower()
modifiers = tokens[1:]
# see if we received a generator
# (a command which generates a list or a slice of a list)
print('******************* LOOKING UP GENERATOR CMD for string [%s]...' % command_string)
command_spec = lookup_generator_command(command_string)
if command_spec:
print('###------------ detected GENERATOR-type command: %s' % command_string)
return CommandInput(cmd_type='generator',
cmd_object=GeneratorCommand(cmd_string=command_string,
cmdspec=command_spec,
modifiers=modifiers))
# if we didn't find a generator, perhaps the user issued a regular sms comand
command_spec = lookup_sms_command(command_string)
if command_spec:
print('###------------ detected system command: %s' % command_string)
return CommandInput(cmd_type='syscommand',
cmd_object=SystemCommand(job_tag=job_tag,
cmdspec=command_spec,
modifiers=modifiers))
raise UnrecognizedSMSCommand(command_string)
def lookup_courier_by_mobile_number(mobile_number, session, db_svc):
Courier = db_svc.Base.classes.couriers
try:
return session.query(Courier).filter(Courier.mobile_number == mobile_number).one()
except:
return None
def lookup_macro(courier_id, macro_name, session, db_svc):
Macro = db_svc.Base.classes.user_macros
try:
return session.query(Macro).filter(Macro.user_id == courier_id, Macro.name == macro_name).one()
except NoResultFound:
return None
def courier_is_on_duty(courier_id, session, db_svc):
Courier = db_svc.Base.classes.couriers
try:
courier = session.query(Courier).filter(Courier.id == courier_id).one()
if courier.duty_status == 1:
return True
elif courier.duty_status == 0:
return False
else:
raise Exception('Unrecognized courier duty_status value %s.' % courier.duty_status)
except NoResultFound:
# TODO: maybe raise some more hell if we got an invalid courier ID,
# but this is fine for now
return False
def courier_has_bid(courier_id, job_tag, session, db_svc):
JobBid =db_svc.Base.classes.job_bids
Courier = db_svc.Base.classes.couriers
try:
bid = session.query(JobBid).filter(and_(JobBid.courier_id == courier_id,
JobBid.job_tag == job_tag,
JobBid.expired_ts == None,
JobBid.accepted_ts == None)).one()
return True
except NoResultFound:
return False
def compile_help_string():
lines = []
lines.append('________')
lines.append('[ LIST commands ]:')
for key, cmd_spec in SMS_GENERATOR_COMMAND_SPECS.items():
lines.append('%s : %s' % (key, cmd_spec.definition))
lines.append('________')
lines.append('[ GENERAL commands ]:')
for key, cmd_spec in SMS_SYSTEM_COMMAND_SPECS.items():
lines.append('%s : %s' % (key, cmd_spec.definition))
lines.append('________')
lines.append('[ PREFIX commands ]:')
for key, cmd_spec in SMS_PREFIX_COMMAND_SPECS.items():
lines.append('%s : %s' % (key, cmd_spec.definition))
lines.append('________')
return '\n\n'.join(lines)
def handle_delete_user_message(cmd_object, dlg_context, service_registry, **kwargs):
# TODO: figure out messages tagging logic
# and then fill in the blanks
#
return 'placeholder for deleting user message'
def handle_on_duty(cmd_object, dlg_context, service_registry, **kwargs):
db_svc = service_registry.lookup('postgres')
with db_svc.txn_scope() as session:
courier = lookup_courier_by_id(dlg_context.courier.id, session, db_svc)
if courier.duty_status == 1:
return ' '.join([
'Hello %s, you are already on the duty roster.' % dlg_context.courier.first_name,
'The system will automatically notify you when a job is posted.'
])
else:
courier.duty_status = 1
session.flush()
return ' '.join([
'Hello %s, welcome to the on-call roster.' % dlg_context.courier.first_name,
'Reply to advertised job tags with the tag and "acc" to accept a job.',
'Text "hlp" or "?" at any time to see the command codes.'
])
def handle_off_duty(cmd_object, dlg_context, service_registry, **kwargs):
# TODO remove courier from any open bidding pools
db_svc = service_registry.lookup('postgres')
with db_svc.txn_scope() as session:
courier = lookup_courier_by_id(dlg_context.courier.id, session, db_svc)
if courier.duty_status == 0:
return ' '.join([
'Hello %s, you are already off duty.' % dlg_context.courier.first_name,
'Enjoy the downtime!'
])
else:
courier.duty_status = 0
session.flush()
return ' '.join([
'Hello %s, you are now leaving the on-call roster.' % dlg_context.courier.first_name,
'Thank you for your service. Have a good one!'
])
def handle_bid_for_job(cmd_object, dlg_context, service_registry, **kwargs):
if not cmd_object.job_tag:
return 'Bid for a job by texting the job tag, space, and "bid".'
if not is_valid_job_tag(cmd_object.job_tag):
return REPLY_INVALID_TAG_TPL % cmd_object.job_tag
db_svc = service_registry.lookup('postgres')
with db_svc.txn_scope() as session:
# make sure the job is open
if not job_is_available(cmd_object.job_tag, session, db_svc):
return ' '.join(['The job with tag:',
cmd_object.job_tag,
'is not in the pool of available jobs.',
'Text "opn" for a list of open jobs.'
])
if not courier_is_on_duty(dlg_context.courier.id, session, db_svc):
# automatically place this courier on the duty roster
payload = {
'id': dlg_context.courier.id,
'status': 1 # 1 means on-duty
}
transform_status = update_courier_status_func(payload, service_registry, **kwargs)
if not transform_status.ok:
print(transform_status)
return 'There was an error attempting to auto-update your duty status. Please contact your administrator.'
# only one bid per user (TODO: pluggable bidding policy)
if courier_has_bid(dlg_context.courier.id, cmd_object.job_tag, session, db_svc):
return ' '.join([
'You have already bid on the job:',
cmd_object.job_tag,
"Once the bid window closes, we'll text you if you get the assignment.",
"Good luck!"
])
bidding_window = lookup_open_bidding_window_by_job_tag(cmd_object.job_tag, session, db_svc)
if not bidding_window:
return ' '.join([
"Sorry, the bidding window for job:",
cmd_object.job_tag,
"has closed."
])
# job exists and is available, so bid for it
kwargs['job_tag'] = cmd_object.job_tag
bid = ObjectFactory.create_job_bid(db_svc,
bidding_window_id=bidding_window.id,
courier_id=dlg_context.courier.id,
job_tag=cmd_object.job_tag)
session.add(bid)
return ' '.join([
"Thank you! You've made a bid to accept job:",
cmd_object.job_tag,
"If you get the assignment, we'll text you when the bidding window closes."
])
def handle_accept_job(cmd_object, dlg_context, service_registry, **kwargs):
# TODO: verify (non-stale) assignment, update status table
current_time = datetime.datetime.now()
job_tag = cmd_object.job_tag
if not job_tag:
return 'To accept a job assignment, text the job tag, a space, and "acc".'
db_svc = service_registry.lookup('postgres')
with db_svc.txn_scope() as session:
try:
# first, does this user even own this job?
job_bid = lookup_user_job_bid(job_tag, dlg_context.courier.id, session, db_svc)
if not job_bid:
return 'Sorry, it appears the job with tag %s is either expired or not yours to accept.' % job_tag
if lookup_open_bidding_window_by_job_tag(job_tag, session, db_svc):
return 'Sorry -- the bidding window for this job is still open.'
jobstat = lookup_current_job_status(job_tag, session, db_svc)
if jobstat.status == JOB_STATUS_ACCEPTED: