-
Notifications
You must be signed in to change notification settings - Fork 0
/
revisor.py
2489 lines (2022 loc) · 76.4 KB
/
revisor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=UserWarning)
from datetime import timedelta, datetime
from threading import Thread, Event
from shapely.geometry import Point
import pycuda.driver as cuda
from queue import Queue
from time import sleep
import numpy as np
import threading
import psutil
import shutil
import csv
import cv2
import sys
import gc
from cfg import cfg
from components.person_orientation import get_person_orientation, get_voting_orientation, interpolate_bboxes
from components.visualization import get_caps_overlays, draw_ballot_boxes, draw_voter_bbox
from evopose2d.preprocessing import get_joints_dict, get_joints_confs, prepare_evo_input
from get_videos import get_local_videos, get_boxes_rec_vid
from trackers.tracker_attributes import TrackingVoter
from yolact.utils.functions import MovingAverage
from utils import (
get_avg_box_coefs, get_voting_day_times, get_voted_ballot_box,
voter_stopped_near_box, get_yolo_trt_bboxes, del_ballot_boxes,
append_row_to_csv, get_bbox_params, save_output_json,
create_folder, read_csv, detect_hardware,
replace_dict_frame_ids,
)
from utils_video import (
get_stream_params, frame_drop_needed, get_rec_zone_coords,
vid_saver, get_cap_polygons, upscale_boxes,
convert_to_coordinates
)
from components.global_variables import (
# Global variables:
processing_boxes_tasks, processing_counting_tasks, taken_gpus,
processed_joints_queues_info, vote_times, counters, frames_memory,
evo_batches, ballot_boxes_data, cap_polygons_data,
# Global locks:
vote_times_lock, processing_boxes_tasks_lock, processing_counting_tasks_lock,
frames_memory_lock, counters_lock, taken_gpus_lock, saving_lock,
processed_joints_queues_info_lock, videos_counted_lock,
evo_batches_lock, ballot_boxes_lock
)
videos_counted = 0
gc.enable()
def main():
allowed_modes = ['local', 'api']
if cfg.data_source not in allowed_modes:
raise ValueError(
f'Revisor mode ("{cfg.data_source}") is not allowed'
)
if cfg.data_source == 'api':
raise AssertionError(
'\nFully automatic "API" mode is not implemented in this version.'
'\nContact us if you would like to process your data using the Revisor.'
)
import torch
torch.cuda.empty_cache()
# Create temp, stats and outputs directories if there aren't any
check_dirs()
# Create ballot boxes recognition queue, saving votes queue and API queue
queues, threads, events = initialize_base_queues()
print('Pipeline | Initialized')
# Local mode (cfg.data_source='local' and dataset maker modes disabled):
# * Find videos of the target directories listed in target_dirs.csv
# * Detect ballot boxes
# * Count voter turnout
#
# Dataset maker mode (cfg.dataset_maker_mode=True):
# * Find videos of the target directories listed in target_dirs.csv
# * Save samples of all persons who intersected ballot box lid zone by hands
#
# ReID dataset maker mode (cfg.reid_dataset_maker_mode=True):
# * Find videos of the target directories listed in target_dirs.csv
# * Save samples of all persons who appeared in the frame
#
# API mode (not revealed in current version of the Revisor):
# * Get task from API (boxes detection / turnout counting)
# * Do the task
# * Send results to API
# * Get next task...
process_local_data(queues, threads, events)
# Clear torch GPU memory
torch.cuda.empty_cache()
print('Pipeline | Restarting...')
def process_local_data(queues, threads, events):
"""
Count voter turnout of videos stored locally.
"""
# Parse target video directories and put required data in "tasks" dict
tasks = get_local_videos()
# Detect ballot boxes on each station
for uik_id, uik in tasks.items():
# Find video to detect ballot boxes
video_path = get_boxes_rec_vid(uik['videos'])
are_boxes, objects_num, objects, objects_images, camera_quality = \
recognize_ballot_boxes(
queues, video_path, uik['cam_id'],
uik['region_number'], uik['station_number'], uik['cam_number'],
'boxes', uik_id
)
tasks[uik_id]['are_boxes'] = are_boxes
tasks[uik_id]['boxes_number'] = objects_num
tasks[uik_id]['boxes'] = objects
tasks[uik_id]['boxes_images'] = objects_images
tasks[uik_id]['camera_quality'] = camera_quality
# Start turnout counting
uiks_threads = {}
if tasks:
# Initialize counting threads, models and queues
print('Pipeline | Initializing counting models...')
evo_ctx, yolo_ctx, queues, threads, yolo_detector = initialize_counting(
queues, threads, gpu_id=0
)
print('Pipeline | Counting models initialized!')
for uik_id, task in tasks.items():
# Make sure we have free counting thread available
uiks_threads = wait_for_free_slot(uiks_threads)
# If none boxes are found, skip voting station.
# In ReID dataset maker mode we do not use ballot boxes.
if not task['are_boxes'] and not cfg.reid_dataset_maker_mode:
append_row_to_csv(cfg.processed_videos_csv, [task['target_dir']])
continue
# Distribute a unique queue of analyzed
# pose coordinates data to the task
distribute_task(uik_id)
# Create new turnout counting thread
uiks_threads[uik_id] = count_voters(
uik_id, task, queues, yolo_ctx, yolo_detector, block=True
)
# Join all created threads
print('Pipeline | Clearing counting models...')
join_counting(queues, threads)
del yolo_detector
clear_gpu_context(yolo_ctx)
clear_gpu_context(evo_ctx)
uiks_threads = join_all_counting_threads(uiks_threads)
print('Pipeline | Joining API...')
join_pipeline(queues, threads, events, join_counting_pipeline=False)
def recognize_ballot_boxes(
queues, video_path, cam_id,
region_number, station_number, cam_number,
task_type, video_id, gpu_id=0,
video_info=None, seconds_to_skip=0
):
are_boxes = None
objects_num = None
objects = None
objects_images = None
camera_quality = None
if cfg.reid_dataset_maker_mode:
return are_boxes, objects_num, objects, objects_images, camera_quality
print(
'R {}, UIK {}, CAM {} |\tRecognizing boxes\t| '
'Recognizing boxes...'.format(
region_number, station_number, cam_number
)
)
boxes_thread = BoxesDetector(
queues['find_boxes'], queues['recognized_boxes'],
queues['api'], gpu_id
)
boxes_thread.setDaemon(True)
boxes_thread.start()
if video_info is not None:
seconds_to_skip, _ = get_voting_day_times(video_info)
with processing_boxes_tasks_lock:
processing_boxes_tasks.append(cam_id)
queues['find_boxes'].put((
task_type, video_id, cam_id,
video_path, region_number, station_number,
cam_number, seconds_to_skip
))
boxes_thread.join()
sleep(0.1)
while True:
with processing_boxes_tasks_lock:
processing_boxes_uiks = len(processing_boxes_tasks)
if queues['find_boxes'].qsize() == 0 and \
queues['recognized_boxes'].qsize() == 0 and \
processing_boxes_uiks == 0:
break
(are_boxes, objects_num, objects, objects_images, camera_quality) = \
queues['recognized_boxes'].get()
sleep(0.05)
if camera_quality is not None:
print(
'R {}, UIK {}, CAM {} |\tRecognizing boxes\t| '
'Found {} box(-es)! Camera quality {:.2f}%'.format(
region_number, station_number, cam_number,
objects_num, camera_quality * 100
)
)
if cfg.data_source == 'api' and camera_quality is None:
# Post error with boxes recognition to API
pass
return are_boxes, objects_num, objects, objects_images, camera_quality
def count_voters(task_id, task, queues, yolo_ctx, yolo_detector, block=False):
"""
Start turnout counting thread.
Args:
task_id: unique identification number of the task.
task: dictionary with task data.
queues: counting queues.
yolo_ctx: GPU context of the YOLO TRT model.
yolo_detector: YOLO TRT model object.
block: whether to block code execution until thread will end processing.
You should set to True only if cfg.parallel_counting_videos == 1.
(When the Revisor processes cameras one by one)
Returns:
task_thread: created thread's object
"""
print(
'TASK_ID {}, R {}, UIK {}, CAM {} |\tCounting\t| '
'Preparing for counting...'.format(
task_id, task['region_number'], task['station_number'],
task['cam_number']
)
)
task_thread = VotesCounter(
task_id, task['uik_id'], task, queues, yolo_ctx, yolo_detector
)
task_thread.setDaemon(True)
task_thread.start()
if block:
# Wait until turnout counting thread will end processing task's videos
task_thread.join()
else:
# Wait until turnout counting thread initialize
sleep(10)
return task_thread
class VotesCounter(threading.Thread):
"""
Polling station's camera processing thread
"""
def __init__(self, task_id, uik_id, task_info, queues, yolo_ctx, yolo_detector):
threading.Thread.__init__(self)
from yolov4_trt.utils.yolo_classes import get_cls_dict
self.task_id = task_id
self.uik_id = uik_id
self.video_id = None
self.video_path = None
self.queues = queues
self.task_info = task_info
with processed_joints_queues_info_lock:
self.processed_joints_queues_info = processed_joints_queues_info
self.ctx = yolo_ctx
self.yolo_detector = yolo_detector
self.yolo_classes = get_cls_dict(80)
def run(self):
cuda.init()
global videos_counted
with evo_batches_lock:
evo_batches[self.task_id] = {}
with processing_counting_tasks_lock:
processing_counting_tasks[self.task_id] = True
with vote_times_lock:
vote_times[(
self.task_id,
self.uik_id,
self.task_info['cam_id'],
self.task_info['region_number'],
self.task_info['station_number']
)] = []
with counters_lock:
counters[self.task_id] = {
'votes': 0,
'backwards_votes': 0,
'rejected_votes': 0,
'cap_intersections': 0,
}
# Pick the least loaded GPU in the system.
with taken_gpus_lock:
gpu_id = min(taken_gpus, key=taken_gpus.get)
taken_gpus[gpu_id] += 1
try:
print(
'TASK_ID {}, R {}, UIK {}, CAM {} |\tCounting\t| '
'Thread created'.format(
self.task_id,
self.task_info['region_number'],
self.task_info['station_number'],
self.task_info['cam_number']
))
# Find vote number of current dir - for dataset maker mode only
if cfg.dataset_maker_mode or cfg.reid_dataset_maker_mode:
files_list = os.listdir(
os.path.join(cfg.source_videos_dir, self.task_info['target_dir'])
)
if files_list:
dataset_videos = os.listdir(os.path.join(cfg.dataset_dir, 'videos'))
current_target_files_name = '{}_{}_{}'.format(
self.task_info['region_number'], self.task_info['station_number'],
self.task_info['cam_number']
)
existing_votes = []
for vid_filename in dataset_videos:
if current_target_files_name in vid_filename:
existing_vote_num = int(
(vid_filename.split('_')[-1]).split('.')[0]
)
existing_votes.append(existing_vote_num)
with counters_lock:
counters[self.task_id]['votes'] = \
max(existing_votes) + 1 if existing_votes else 0
if cfg.data_source == 'local':
region_num, uik_num, camera_num, camera_quality = \
self.task_info['region_number'], self.task_info['station_number'], \
self.task_info['cam_number'], self.task_info['camera_quality']
processed_files_csv_path = os.path.join(
cfg.processed_videos_dir,
'{}_{}_{}_processed_videos.csv'.format(
region_num, uik_num, camera_num
)
)
# Find mean dist and width parameters of ballot boxes
dist_avg, width_avg = None, None
if not cfg.reid_dataset_maker_mode:
dist_avg, width_avg = get_avg_box_coefs(self.task_info['boxes'])
files_total = len(self.task_info['videos'].keys())
for files_processed, (file_id, video_info) in enumerate(
self.task_info['videos'].items()):
# Add to processed videos csv current video filename.
# For dataset maker mode only
if cfg.dataset_maker_mode or cfg.reid_dataset_maker_mode:
append_row_to_csv(
processed_files_csv_path,
[video_info['filename']]
)
votes_at_start_vid = counters[self.task_id]['votes']
rej_votes_at_start_vid = counters[self.task_id]['rejected_votes']
box_intersections_at_start_vid = counters[self.task_id]['cap_intersections']
backwards_votes_at_start_vid = counters[self.task_id]['backwards_votes']
start = datetime.now()
# Count voter turnout
try:
video_stream = cv2.VideoCapture(video_info['path'])
process_video_stream(
video_stream=video_stream,
uik_data=self.task_info,
file_info=video_info,
yolo_classes=self.yolo_classes,
yolo_detector=self.yolo_detector,
task_id=self.task_id,
uiks_processed_joints_queues=self.processed_joints_queues_info,
gpu_id=gpu_id,
queues=self.queues
)
except Exception as e:
print(
'Counting local | Skipping video {}. Exception: {}'.format(
video_info['filename'], e
)
)
votes_per_video = counters[self.task_id]['votes'] - votes_at_start_vid
rejected_votes_per_video = \
counters[self.task_id]['rejected_votes'] - rej_votes_at_start_vid
box_intersections_per_video = \
counters[self.task_id]['cap_intersections'] - box_intersections_at_start_vid
backwards_votes_per_video = \
counters[self.task_id]['backwards_votes'] - backwards_votes_at_start_vid
processing_time = (datetime.now() - start).total_seconds()
append_row_to_csv(cfg.stats_filepath, [
'{}_{}'.format(region_num, uik_num), video_info['filename'],
votes_per_video, rejected_votes_per_video,
box_intersections_per_video, counters[self.task_id]['votes'],
backwards_votes_per_video, camera_quality, dist_avg, width_avg,
processing_time
])
# Add to processed videos csv current video filename
if not cfg.dataset_maker_mode and not cfg.reid_dataset_maker_mode:
append_row_to_csv(processed_files_csv_path, [video_info['filename']])
estimated_time_left = int((files_total - files_processed) * processing_time)
progress = files_processed / files_total if files_total != 0 else 1.0
print(
'TASK_ID {}, R {}, UIK {}, CAM {} |\tCounting\t| '
'Video processed, Processing time {} sec\n'
'Progress {:.2f}%, ETA {} sec'.format(
self.task_id,
self.task_info['region_number'],
self.task_info['station_number'],
self.task_info['cam_number'],
processing_time,
progress * 100,
estimated_time_left
))
video_stream.release()
append_row_to_csv(cfg.processed_videos_csv, [self.task_info['target_dir']])
elif cfg.data_source == 'api':
for files_processed, (file_id, video_info) in enumerate(self.task_info['videos'].items()):
processing_start_time = datetime.now()
self.video_id, self.video_path = video_info['video_id'], video_info['path']
video_stream = cv2.VideoCapture(self.video_path)
# - Find number of seconds to skip from start of the video
# (if video starts earlier than opening hour of the station)
# - Find second when we need to stop processing
# (if some part of the vid > closing hour of the station)
seconds_to_skip, stop_time = get_voting_day_times(video_info)
# Count voter turnout
process_video_stream(
video_stream=video_stream,
uik_data=self.task_info,
file_info=video_info,
yolo_classes=self.yolo_classes,
yolo_detector=self.yolo_detector,
task_id=self.task_id,
uiks_processed_joints_queues=self.processed_joints_queues_info,
gpu_id=gpu_id,
queues=self.queues,
seconds_to_skip=seconds_to_skip,
stop_time=stop_time
)
processing_time = (
datetime.now() - processing_start_time
).total_seconds()
video_stream.release()
print(
'TASK_ID {}, R {}, UIK {}, CAM {} |\tCounting\t| '
'Video processed, Processing time {} sec'.format(
self.task_id,
self.task_info['region_number'],
self.task_info['station_number'],
self.task_info['cam_number'],
processing_time
)
)
except Exception as e:
print('Counting | Exception:', e)
if cfg.data_source == 'api':
# Post error to API
pass
finally:
with videos_counted_lock:
videos_counted += 1
with taken_gpus_lock:
taken_gpus[gpu_id] -= 1
if cfg.data_source == 'api':
# Post to API that current tas has been finished
pass
with processed_joints_queues_info_lock:
if self.task_id in processed_joints_queues_info.keys():
processed_joints_queues_info.pop(self.task_id)
# Delete source videos if specified
if cfg.delete_processed_videos:
if cfg.data_source == 'local':
if os.path.exists(self.task_info['videos_dir']):
shutil.rmtree(self.task_info['videos_dir'])
elif cfg.data_source == 'api' and self.video_path is not None:
os.remove(self.video_path)
with processing_counting_tasks_lock:
processing_counting_tasks[self.task_id] = False
gc.collect()
print(
'TASK_ID {}, R {}, UIK {}, CAM {} |\tCounting\t| '
'Task processed! Thread joined'.format(
self.task_id, self.task_info['region_number'],
self.task_info['station_number'],
self.task_info['cam_number']
)
)
sys.exit()
class BoxesDetector(threading.Thread):
"""
Ballot boxes recognition thread
"""
def __init__(self, camera_to_rec_queue, recognized_boxes_queue, api_queue, gpu_id):
threading.Thread.__init__(self)
self.camera_to_rec_queue = camera_to_rec_queue
self.recognized_boxes_queue = recognized_boxes_queue
self.api_queue = api_queue
self.gpu_id = gpu_id
def run(self):
from ballot_boxes_finder import find_cam_quality
import torch
(
task_type, video_id, cam_id, video_path,
region_num, uik_num, camera_num, seconds_to_skip
) = self.camera_to_rec_queue.get()
try:
if cam_id is not None:
torch.cuda.empty_cache()
temp_cam_files = os.path.join(
cfg.temp_dir, '{}_{}'.format(region_num, uik_num)
)
boxes, objects_num, objects, objects_images, camera_quality = \
find_cam_quality(
camera_num, video_path, temp_cam_files,
verbose=False, gpu_id=self.gpu_id,
start_second=seconds_to_skip
)
torch.cuda.empty_cache()
# Remove temp directory if specified
if cfg.delete_temp_files:
shutil.rmtree(temp_cam_files)
self.recognized_boxes_queue.put((
boxes, objects_num, objects,
objects_images, camera_quality
))
except Exception as e:
print(
'Recognizing boxes | '
'Exception occurred while recognizing ballot boxes:', e
)
finally:
with processing_boxes_tasks_lock:
processing_boxes_tasks.remove(cam_id)
self.camera_to_rec_queue.task_done()
class JointsTransformer(threading.Thread):
"""
Preprocessing of pose estimation model's inputs.
"""
def __init__(self, joints_queue, joints_to_rec_queues):
threading.Thread.__init__(self)
self.joints_queue = joints_queue
self.joints_to_rec_queues = joints_to_rec_queues
self.poses_max_batches = 2
self.wait_time = 0.05
def run(self):
while True:
(
task_id, voter_id, voter_bboxes, first_visible_frame, last_visible_frame,
resized_width, resized_height, uiks_processed_joints_queues, gpu_id
) = self.joints_queue.get()
# Wait until empty slot in preprocessing queue will free up.
# If pose estimation queue (joints_to_rec_queue) will have
# a lot of preprocessed data, GPU OOM will be raised.
while True:
if voter_id == 'STOP':
break
prepared_joints_q_size = self.joints_to_rec_queues[gpu_id].qsize()
if prepared_joints_q_size < self.poses_max_batches:
if cfg.log_pose_queue_sizes:
print('PREP_POSES_Q: {}, POSES_Q: {}'.format(
self.joints_queue.qsize(), prepared_joints_q_size)
)
break
else:
if cfg.log_pose_queue_sizes:
print(
'Waiting {:.2f} sec for free batch: '
'evo_prep_q_size {} > max_q_size {}'.format(
self.wait_time, prepared_joints_q_size,
self.poses_max_batches
)
)
sleep(self.wait_time)
try:
if voter_id is not None and voter_id != 'STOP':
# Fill missed bbox coordinates with interpolated values
# (from the nearest bboxes)
voter_bboxes = interpolate_bboxes(voter_bboxes)
# Load from global variable frames which will be used for pose estimation
prepared_frames, min_prep_frame_id, max_prep_frame_id = prepare_voting_vid(
frames_memory[task_id], first_visible_frame,
last_visible_frame, out_type='dict'
)
if prepared_frames:
batches_images, batches_Ms, origins, old_frame_ids = \
prepare_evo_input(
voter_bboxes, prepared_frames,
min_prep_frame_id, max_prep_frame_id,
batch_size=cfg.models.pose.batch_size,
pose_input_shape=cfg.models.pose.input_shape
)
evo_batches[task_id][voter_id] = (
batches_images, batches_Ms, origins, old_frame_ids
)
self.joints_to_rec_queues[gpu_id].put((
voter_id, uiks_processed_joints_queues,
task_id, gpu_id
))
elif voter_id == 'STOP':
break
except Exception as e:
print('JT Exception!', e)
finally:
self.joints_queue.task_done()
gc.collect()
sys.exit()
class JointsEstimator(threading.Thread):
"""
Pose estimation thread
"""
def __init__(
self, joints_to_rec_queues, joints_to_analyze_queue,
gpu_id, ctx, api_queue
):
threading.Thread.__init__(self)
self.joints_to_rec_queues = joints_to_rec_queues
self.joints_to_analyze_queue = joints_to_analyze_queue
self.gpu_id = gpu_id
self.ctx = ctx
self.api_queue = api_queue
def run(self):
cuda.init()
from evopose2d.evo_trt import TrtEVO
# Initialize EvoPose2D TensorRT engine
try:
evo_model = TrtEVO(
cfg.models.pose.model_path,
cfg.models.pose.input_shape,
cfg.models.pose.output_shape,
batch_size=cfg.models.pose.batch_size,
cuda_ctx=self.ctx
)
except Exception as e:
evo_model = None
while True:
(voter_id, uiks_processed_joints_queues, task_id, gpu_id) = \
self.joints_to_rec_queues[self.gpu_id].get()
try:
if voter_id is not None and voter_id != 'STOP':
# Receive batches from global variable
# (that's faster than receiving from queues)
(batches_images, batches_Ms, origins, old_frame_ids) = \
evo_batches[task_id][voter_id]
if cfg.log_pose_processing:
print(
'TASK ID {}, GPU ID {} |\tCounting\t| '
'Joints rec. thread got {} batches from queue. Person #{}'.format(
task_id, gpu_id, len(batches_Ms), voter_id
)
)
raw_preds = []
for batch_id, batch_images in enumerate(batches_images):
pred = evo_model.detect_batch(batch_images, batches_Ms[batch_id])
raw_preds.append(pred)
if cfg.log_pose_processing:
print(
'TASK ID {}, GPU ID {} |\tCounting\t| '
'Joints rec. thread processed {} batches of #{} person'.format(
task_id, gpu_id, len(batches_Ms), voter_id
)
)
self.joints_to_analyze_queue.put((
voter_id, raw_preds, origins, old_frame_ids,
uiks_processed_joints_queues, task_id
))
del evo_batches[task_id][voter_id]
elif voter_id == 'STOP':
break
except Exception as e:
print('JE Exception!', e)
# Send errors to API
if cfg.data_source == 'api':
if evo_model is None:
err_message = 'trt evo loading error'
else:
err_message = str(e)
# Post error to API
pass
finally:
self.joints_to_rec_queues[self.gpu_id].task_done()
del evo_model
gc.collect()
sys.exit()
class JointsAnalyzer(threading.Thread):
"""
Joints coordinates analyzing thread - check for hands intersection
of the ballot box lid
"""
def __init__(self, joints_to_analyze_queue, processed_joints_queues):
threading.Thread.__init__(self)
self.joints_to_analyze_queue = joints_to_analyze_queue
self.processed_joints_queues = processed_joints_queues
def run(self):
while True:
(
voter_id, batch_predictions, origins,
old_frame_ids, uiks_processed_joints_queues, task_id
) = self.joints_to_analyze_queue.get()
try:
if voter_id is not None:
intersected_ballot_box, box_intersections_num = [], {}
voting_start_frame_id, voting_end_frame_id = None, None
intersected_cap = False
output_joints, output_confidences = {}, {}
image_id = 0
# Repeat origin coordinates to add to them cropped joints
# coordinates later (for getting source coords)
origins_repeated = np.repeat(origins, 17, axis=0).reshape(
(origins.shape[0], 17, 2)
)
# Concat evo output to a single array and add cropped coordinates
batches_predictions_np = np.concatenate(batch_predictions, axis=0)
transformed_x = batches_predictions_np[:, :, :-2] + origins_repeated[:, :, :1]
transformed_y = batches_predictions_np[:, :, 1:-1] + origins_repeated[:, :, 1:]
transformed_xy = np.concatenate(
(transformed_x, transformed_y), axis=2
).astype(int)
confs = batches_predictions_np[:, :, -1]
for pred_id, pred in enumerate(transformed_xy):
# Source frame number
old_fr_id = old_frame_ids[image_id]
image_id += 1
# Convert coordinates and confidences to readable format:
# { 'nose': value, ... }
output_joints[old_fr_id] = get_joints_dict(pred)
output_confidences[old_fr_id] = get_joints_confs(confs[pred_id])
target_joints = [
output_joints[old_fr_id][t_joint_name] for t_joint_name in cfg.target_joints
]
if not cfg.reid_dataset_maker_mode:
for ballot_box_id, ballot_box_dict in ballot_boxes_data[task_id].items():
for [t_joint_x, t_joint_y] in target_joints:
# Detect if a ballot box lid (cap) contains target joint
joint_point = Point(t_joint_x, t_joint_y)
target_point_in_ballot_box = \
cap_polygons_data[task_id][ballot_box_id].contains(joint_point)
# If a target joint was inside lid zone,
# save that frame id and intersected ballot box id
if target_point_in_ballot_box:
intersected_cap = True
if ballot_box_id not in intersected_ballot_box:
intersected_ballot_box.append(ballot_box_id)
if voting_start_frame_id is None:
voting_start_frame_id = old_fr_id
voting_end_frame_id = old_fr_id
# Update ballot boxes intersections counter
if intersected_cap:
if ballot_box_id not in box_intersections_num.keys():
box_intersections_num[ballot_box_id] = 1
else:
box_intersections_num[ballot_box_id] += 1
# For ReID dataset maker mode all people in frame are collected
if cfg.reid_dataset_maker_mode:
intersected_cap = True
voting_start_frame_id, voting_end_frame_id = \
min(output_joints.keys()), max(output_joints.keys())
# Put processed data back to main thread's queue
self.processed_joints_queues[uiks_processed_joints_queues[task_id]].put((
voter_id, intersected_cap, output_joints,
output_confidences, intersected_ballot_box,
voting_start_frame_id, voting_end_frame_id,
box_intersections_num
))
except Exception as e:
print('JA Exception!', e)
finally:
self.joints_to_analyze_queue.task_done()
class VotesTransformer(threading.Thread):
"""
Data collecting thread of person who intersected ballot box lid zone by hands
"""
def __init__(self, votes_queue, votes_to_rec_queue, votes_to_analyze_queue):
threading.Thread.__init__(self)
self.votes_queue = votes_queue
self.votes_to_rec_queue = votes_to_rec_queue
self.votes_to_analyze_queue = votes_to_analyze_queue
def run(self):
while True:
(
voter_id, vote_info, vid_info,
file_info, uik_info, task_id
) = self.votes_queue.get()
try:
if voter_id is not None:
# Get from global variable source frames of tracked person
if cfg.show_votes_window or cfg.show_rej_votes_window or cfg.save_votes_vid \
or cfg.save_rejected_votes_vid or cfg.dataset_maker_mode \
or cfg.reid_dataset_maker_mode:
prep_output_frames, min_save_frame_id, max_save_frame_id = prepare_voting_vid(
frames_memory[task_id], vote_info['voting_start_saving_frame_id'],
vote_info['voting_end_saving_frame_id']
)
else:
prep_output_frames, min_save_frame_id, max_save_frame_id = \
None, vote_info['voting_start_saving_frame_id'], \
vote_info['voting_end_saving_frame_id']
if not cfg.reid_dataset_maker_mode:
ballot_boxes = ballot_boxes_data[task_id]
# Replace source frame ids with updated frame ids
new_joints, new_voting_start, new_voting_end = replace_dict_frame_ids(
vote_info['joints'],
min_save_frame_id, max_save_frame_id,
vote_info['voting_start_frame_id'],
vote_info['voting_end_frame_id']
)
new_joints_confidences, _, __ = replace_dict_frame_ids(
vote_info['joints_confidences'],
min_save_frame_id, max_save_frame_id,
vote_info['voting_start_frame_id'],
vote_info['voting_end_frame_id']
)
new_bboxes, _, __ = replace_dict_frame_ids(
vote_info['person_bboxes'],
min_save_frame_id, max_save_frame_id,
vote_info['voting_start_frame_id'],
vote_info['voting_end_frame_id']
)
new_orientations, _, __ = replace_dict_frame_ids(