-
Notifications
You must be signed in to change notification settings - Fork 0
/
service.py
90 lines (76 loc) · 3.29 KB
/
service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""
Main service module
"""
# pylint: disable=C0103,C0301,R0902
import threading
import datetime as dt
from videocapture import VideoCapture
from objdetector import ObjectDetector
import entities as e
class Service:
"""
Main app class with control loop
"""
def __init__(self, config, logger):
"""
Class initialization. Automatically starts event service loop thread as its last statement
config: json for instantiation of neural network and detection results sink classes
"""
self._stopEvent = False
self._detectorFree = True
self._logger = logger
self._initfromconfig(config)
self._mainthread = threading.Thread(target=self._mainLoop, name='service')
self._mainthread.start()
def _initfromconfig(self, config):
modulesconfig = config['modules']
# Video sources
self._cams = [VideoCapture(c['vsid'], c['uri'], self._logger) for c in config['cams']]
self._logger.info(f"Video sources: {[f'{c.vsid}:{c.uri}' for c in self._cams]}")
# Result subscriber
self._detectionResultSubscriber = getattr(__import__(config['resultsink']['module']), config['resultsink']['class'])(modulesconfig.get(config['resultsink']['class'], None), self._logger)
self._logger.info(f'Initialize result subscriber: {type(self._detectionResultSubscriber).__name__}')
# Neural network
nn = getattr(__import__(config['nn']['module']), config['nn']['class'])(modulesconfig.get(config['nn']['class'], None), self._logger)
self._logger.info(f'Initialize neural network: {type(nn).__name__}')
self._objDetector = ObjectDetector(nn, self._logger)
self._runinterval = config['runintervalsec']
self._logger.info(f"Service processing interval: {self._runinterval} sec")
_ = [threading.Thread(target=c.start, name=f'vsid-{c.vsid}', args=()).start() for c in self._cams]
def stop(self):
"""
stops service loop
"""
self._logger.info('Service stopping...')
self._stopEvent = True
self._objDetector.stop()
self._detectionResultSubscriber.stop()
for c in self._cams:
c.stop()
def _mainLoop(self):
ticker = threading.Event()
while not ticker.wait(self._runinterval) and not self._stopEvent:
if self._detectorFree:
self._detectionCycle()
else:
self._logger.warning('Detector is busy, skipping detection!')
self._logger.info('Service stopped')
def _detectionCycle(self):
self._detectorFree = False
for c in self._cams:
if c.isRunning:
(hasFrame, img, camid) = c.currentFrame()
if hasFrame:
frame = e.CapturedFrame(camid, dt.datetime.now(), img)
self._objDetector.pushImage(frame)
else:
c = VideoCapture(c.vsid, c.uri, self._logger)
threading.Thread(target=c.start, name=f'vsid-{c.vsid}', args=()).start()
dset = self._objDetector.getDetectedObjectsFrame()
self._detectionResultSubscriber.pushDetectedObjectsFrame(dset)
self._detectorFree = True
def join(self):
"""
waits main event loop thread to return
"""
self._mainthread.join()