forked from goktug97/TargetPersonTracker
-
Notifications
You must be signed in to change notification settings - Fork 0
/
zeromq_example.py
130 lines (97 loc) · 3.58 KB
/
zeromq_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import tracker
import cv2
import utils
import numpy as np
import zmq
import ctypes
import time
import struct
import threading
def publisher(args, event):
# Publisher socket
context = zmq.Context()
socket = context.socket(zmq.PUB)
topic = 'image'
socket.setsockopt(zmq.SNDHWM, 5)
socket.bind('tcp://*:5555')
cap = cv2.VideoCapture(args.input)
# Camera Settings
cap.set(cv2.CAP_PROP_FRAME_WIDTH, args.camera_width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, args.camera_height)
if (args.camera_fps):
cap.set(cv2.CAP_PROP_FPS, args.camera_fps)
index = 0
while not event.is_set():
index += 1
ret, frame = cap.read()
if not ret:
break
# timestamp in ms
timestamp = int(round(time.time() * 1000))
# Metadata to recover the array
md = {
'dtype': str(frame.dtype),
'shape': frame.shape
}
# Send messages
socket.send_string(topic, zmq.SNDMORE)
socket.send(ctypes.c_longlong(timestamp), zmq.SNDMORE)
socket.send(ctypes.c_int(index), zmq.SNDMORE)
socket.send_json(md, zmq.SNDMORE)
socket.send(frame, 0, copy=True, track=False)
# print('Sent frame {}'.format(index))
class ZMQExample(tracker.Tracker):
"""Example usage of Tracker class."""
def __init__(self, args, event):
"""Initiliaze tracker and image source."""
tracker.Tracker.__init__(self, args)
# Subscriber socket
self.event = event
self.event.clear()
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.setsockopt(zmq.SUBSCRIBE, b'image')
# socket.setsockopt(zmq.RCVHWM, 5)
self.socket.connect('tcp://localhost:5555')
def get_frame(self):
"""Overload get_frame function."""
cur_time = int(time.time() * 1000)
topic = self.socket.recv_string()
timestamp = struct.unpack('<q', self.socket.recv())[0]
index = struct.unpack('<i', self.socket.recv())[0]
# Recv the frame and recontruct it with metadata
md = self.socket.recv_json(flags=0)
bframe = self.socket.recv(flags=0, copy=True, track=False)
frame = np.frombuffer(bframe, dtype=md['dtype'])
self.frame = frame.reshape(md['shape'])
if self.args.input == 0:
# Restart the connection if there is a latency
if (cur_time - timestamp) > 10:
self.socket.disconnect('tcp://localhost:5555')
self.socket.connect('tcp://localhost:5555')
print('Latency, The connection has been restarted')
return True, self.frame
def output_function(self):
"""Overload output function."""
# Print length of features around tracked points
print('Features around the tracked points: {}'.format(len(self.kps)))
# Descriptors are self.des
# Print length of tracked features
print('Total number of learned features: {}'.format(len(self.tkps)))
# Descriptors are self.tdes
# Return false to close tracking
return True
def finish(self):
self.event.set()
if __name__ == '__main__':
from options import args
event = threading.Event()
tracker = ZMQExample(args, event)
tracker_thread = threading.Thread(target=tracker.run, args=())
# Start camera
camera_thread = threading.Thread(target=publisher, args=(args, event,))
camera_thread.start()
tracker.initiliaze_target()
tracker_thread.start()
camera_thread.join()
tracker_thread.join()