-
Notifications
You must be signed in to change notification settings - Fork 0
/
lug_nut_counter.py
151 lines (124 loc) · 5.45 KB
/
lug_nut_counter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import os
import asyncio
import cv2
import sys
import signal
import time
import uuid
from edge_impulse_linux.image import ImageImpulseRunner
from azure.iot.device.aio import IoTHubDeviceClient
from azure.iot.device import Message
import cli_parser
cli_args = cli_parser.parser.parse_args()
runner = None
WHEEL_COLOR = (200, 10, 0)
LUG_NUT_COLOR = (0, 250, 15)
def now():
return round(time.time() * 1000)
def get_camera() -> int:
if cli_args.port != None:
camera = cv2.VideoCapture(cli_args.port)
ret = camera.read()[0]
if ret:
print("Camera found on port {}. Resolution = ({} x {}),\
name = '{}'".format(cli_args.port, camera.get(3), camera.get(4),
camera.getBackendName()))
camera.release()
return cli_args.port
else:
raise Exception("Couldn't initialize camera")
ports = []
for port in range(5):
camera = cv2.VideoCapture(port)
if camera.isOpened():
ret = camera.read()[0]
if ret:
print("Camera found on port {}. Resolution = ({} x {}),\
name = '{}'".format(port, camera.get(3), camera.get(4),
camera.getBackendName()))
ports.append(port)
camera.release()
if len(ports) > 1:
raise Exception("More than one camera found! Use the -p option.")
if len(ports) == 0:
raise Exception("No cameras found!")
return ports[0]
def sigint_handler(sig, frame):
print('Interrupted')
if runner:
runner.stop()
sys.exit(0)
signal.signal(signal.SIGINT, sigint_handler)
async def main():
# Fetch the connection string from an enviornment variable or cmd line
conn_str = cli_args.conn_string if cli_args.conn_string != None \
else os.getenv("IOTHUB_DEVICE_CONNECTION_STRING")
if conn_str == None:
raise Exception("No Azure IoT connection string found!")
print("Running lug nut counter with the following options:\n\
Model file: {0}\nLug nut count: {1}\nConnection string: {2}\n"
.format(cli_args.model_file[0], cli_args.count, conn_str))
# Create instance of the device client using the authentication provider
device_client = IoTHubDeviceClient.create_from_connection_string(conn_str)
# Connect the device client.
await device_client.connect()
# This function will run when the count is not correct
async def send_alert(lug_nut_count: int):
msg = Message("Found {0} missing lug nuts!"
.format(cli_args.count - lug_nut_count))
msg.message_id = uuid.uuid4()
msg.custom_properties["counted"] = lug_nut_count
msg.content_type = "application/json"
print("Sending message {0} to Azure IoT Hub".format(msg))
await device_client.send_message(msg)
print("Message successfully sent!")
dir_path = os.path.dirname(os.path.realpath(__file__))
modelfile = os.path.join(dir_path, cli_args.model_file[0])
with ImageImpulseRunner(modelfile) as runner:
try:
model_info = runner.init()
project_info = model_info['project']
print("Loaded model for {}/{}".format(project_info['owner'],
project_info['name']))
labels = model_info['model_parameters']['labels']
print(f"Labels: {labels}")
capture_device = get_camera()
next_frame = 0
for res, img in runner.classifier(capture_device):
if next_frame > now():
time.sleep((next_frame - now()) / 1000)
if "bounding_boxes" in res['result'].keys():
found_lug_nuts = 0
found_wheel = False
for bb in res['result']['bounding_boxes']:
print(f"Found {bb['label']} ({bb['value']: 0.2f}) at\
x={bb['x']}, y={bb['y']}, w={bb['width']}, h={bb['height']}")
if bb['label'] == 'lug':
found_lug_nuts += 1
elif bb['label'] == 'tire':
found_wheel = True
if cli_args.show_cam:
label = bb['label']
color = WHEEL_COLOR if label == 'tire' else LUG_NUT_COLOR
cv2.rectangle(img, (bb['x'], bb['y']),
(bb['x'] + bb['width'], bb['y'] + bb['height']),
color, 2)
cv2.putText(img, bb['label'], (bb['x'], bb['y'] + bb['height'] + 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (20, 240, 80), 2)
cv2.imshow('Camera', img)
if cv2.waitKey(1) == ord('q'):
raise KeyboardInterrupt("Exited")
# Send a message if the count is wrong
if found_lug_nuts < cli_args.count and found_wheel == True:
await send_alert(found_lug_nuts)
# avoid sending too many messages
time.sleep(2)
# 20 fps max
next_frame = now() + 50
finally:
if runner:
runner.stop()
if device_client.connected():
await device_client.disconnect()
if __name__ == "__main__":
asyncio.run(main())