In [1]:
import cv2
import time
import os
import numpy as np
from openvino.runtime import Core
from statistics import mean
import node as nd

In [2]:
def getLayers():
    ie = Core()
    classification_model_xml = "./best.xml"
    model = ie.read_model(model=classification_model_xml)
    compiled_model = ie.compile_model(model=model, device_name="CPU") # CPU or MYRIAD
    input_layer = compiled_model.input(0)
    output_layer = compiled_model.output(0)
    return input_layer, output_layer, compiled_model

def build_model(is_cuda):
    net = cv2.dnn.readNet("yolov5s.onnx")
    if is_cuda:
        print("Attempty to use CUDA")
        net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
        net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA_FP16)
    else:
        print("Running on CPU")
        net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
        net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
    return net

def detect(image, input_layer, output_layer, compiled_model):
    blob = cv2.dnn.blobFromImage(image, 1/255.0, (INPUT_WIDTH, INPUT_HEIGHT), swapRB=True, crop=False)
    preds = compiled_model([blob])[output_layer]
    return preds

def load_capture(video_path):
    capture = cv2.VideoCapture(video_path)
    return capture

def load_classes():
    class_list = []
    with open("classes.txt", "r") as f:
        class_list = [cname.strip() for cname in f.readlines()]
    return class_list

def wrap_detection(input_image, output_data):
    class_ids = []
    confidences = []
    boxes = []

    rows = output_data.shape[0]

    image_width, image_height, _ = input_image.shape

    x_factor = image_width / INPUT_WIDTH
    y_factor =  image_height / INPUT_HEIGHT

    for r in range(rows):
        row = output_data[r]
        confidence = row[4]
        if confidence >= 0.4:

            classes_scores = row[5:]
            _, _, _, max_indx = cv2.minMaxLoc(classes_scores)
            class_id = max_indx[1]
            if (classes_scores[class_id] > .25):

                confidences.append(confidence)

                class_ids.append(class_id)

                x, y, w, h = row[0].item(), row[1].item(), row[2].item(), row[3].item() 
                left = int((x - 0.5 * w) * x_factor)
                top = int((y - 0.5 * h) * y_factor)
                width = int(w * x_factor)
                height = int(h * y_factor)
                box = np.array([left, top, width, height])
                boxes.append(box)

    indexes = cv2.dnn.NMSBoxes(boxes, confidences, 0.25, 0.45) 

    result_class_ids = []
    result_confidences = []
    result_boxes = []

    for i in indexes:
        result_confidences.append(confidences[i])
        result_class_ids.append(class_ids[i])
        result_boxes.append(boxes[i])

    return result_class_ids, result_confidences, result_boxes

def format_yolov5(frame):

    row, col, _ = frame.shape
    _max = max(col, row)
    result = np.zeros((_max, _max, 3), np.uint8)
    result[0:row, 0:col] = frame
    return result

In [3]:
INPUT_WIDTH = 640
INPUT_HEIGHT = 640
SCORE_THRESHOLD = 0.2
NMS_THRESHOLD = 0.4
CONFIDENCE_THRESHOLD = 0.7
LIGHT_CALIBRATION_FRAME_COUNT = 20

In [4]:
class_list = load_classes()
colors = [(255, 0, 0), (0, 255, 0), (0, 255, 255), (255, 0, 0)]

# is_cuda = len(sys.argv) > 1 and sys.argv[1] == "cuda"

# net = build_model(is_cuda)
capture = load_capture("test_data/chloe_video.MOV")

start = time.time_ns()
frame_count = 0
total_frames = 0
fps = -1
input_layer, output_layer, compiled_model = getLayers()

lights_holder = [] # [[light_id, x, y, count], ...]
lights = [] # Holds mean of all light positions
frame_data_holder = []  # Holds data for multiple frames to find averages
current_state = []

while True:

    _, vid_frame = capture.read()
    if vid_frame is None:
        print("End of stream")
        break

    inputImage = format_yolov5(vid_frame)
    
    outs = detect(inputImage, input_layer, output_layer, compiled_model)

    class_ids, confidences, boxes = wrap_detection(inputImage, outs[0])

    frame_count += 1
    total_frames += 1
    
    frame_data = [] # [class_id, confidence, box] for each frame
    for (classid, confidence, box) in zip(class_ids, confidences, boxes):
        frame_data.append([class_list[classid], confidence, box])
        
    
    if total_frames < LIGHT_CALIBRATION_FRAME_COUNT: # If we haven't collected enough data yet, continue collecting frame data
        frame_data_holder.append(frame_data)
    elif total_frames == LIGHT_CALIBRATION_FRAME_COUNT: # If we have collected enough data, calculate averages and find lights
        print("Calibration Complete")
        for frame in frame_data_holder: # Check all collected frames
            for frame_line in frame: # Check each line of each frame
                added = False # Used to check if line has been added to lights_holder yet
                for light in lights_holder: # Check each light in lights_holder
                    if abs(frame_line[2][0] - mean(light[1])) < 200: # If X is within 200 of the light
                        if frame_line[2][1] < mean(light[2]) - 250: # If Y is significantly lower, do not add it
                            continue # Light not added, but not attributed to any other light either
                        elif frame_line[2][1] < mean(light[2]) + 500: # If Y not significantly higher, add it
                            light[1].append(frame_line[2][0])
                            light[2].append(frame_line[2][1])
                            light[3] += 1
                        elif frame_line[2][1] > mean(light[2]) + 500: # If Y is significantly higher, overwrite data for light
                            light[1] = [frame_line[2][0]]
                            light[2] = [frame_line[2][1]]
                            light[3] = 1
                        added = True
                    if added == True:
                        break
                if not added:
                    lights_holder.append([len(lights_holder), [frame_line[2][0]], [frame_line[2][1]], 1]) # Add new light if no X's are near
                    
        # Aggregate data for each light
        for light in lights_holder:
            lights.append([light[0], mean(light[1]), mean(light[2]), light[3]])
        # for i in range(len(lights)): # If light has been seen in less than 1/5 of frames, remove it
        #     if lights[i][3] < LIGHT_CALIBRATION_FRAME_COUNT/5:
        #         del lights[i]
        lights = [item for item in lights if item[3] > LIGHT_CALIBRATION_FRAME_COUNT/5] # Remove lights that have been seen in less than 1/5 of frames

        # Setup node and lights
        if not os.path.exists("node_data.json"):
            nd.init_node("State University Ave", "000.000.0.0")
            
        if not os.path.exists("lights.json"):
            for x in lights:
                nd.create_light(0, 0)
        
        current_state = [None] * len(lights) # Initialize array to keep track of states
            
    else: # To run after initial calibration
        # For each light
        temp = [0] * len(lights)
        for light in lights:
            updated = False # Used to check if light has been updated
            # Look at each line in frame_data, attempt to match them to the light
            for line in frame_data:
                if abs(line[2][0] - light[1]) < 200 and line[2][1] > light[2] - 100:
                    temp[int(light[0])] = line[0] # Set state
                    # print("Light", light[0], "set to", line[0])
                    updated = True
                    break
            if not updated:
                temp[int(light[0])] = 0 # Set state to 0 if no match found
                # print("Light", light[0], "set to 0")
                # for line in frame_data:
                #     print("x: ", abs(line[2][0] - light[1]))
                #     print("y: ", line[2][1] - light[2])
                
        # When done with checking all lights, compare to current_state
        if temp != current_state:
            current_state = temp
            for i in range(len(current_state)):
                nd.patch_light(str(i), current_state[i])
                # print(i, " - ", current_state[i])


    ###############################################################################
    # Begininning of code for displaying cv2 annotated window
    ###############################################################################
    for (classid, confidence, box) in zip(class_ids, confidences, boxes):
        color = colors[0]
        cv2.rectangle(vid_frame, box, color, 2)
        cv2.rectangle(vid_frame, (box[0], box[1] - 20), (box[0] + box[2], box[1]), color, -1)
        cv2.putText(vid_frame, class_list[classid], (box[0], box[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, .5, (0,0,0))

    if frame_count >= 30:
        end = time.time_ns()
        fps = 1000000000 * frame_count / (end - start)
        frame_count = 0
        start = time.time_ns()
    
    if fps > 0:
        fps_label = "FPS: %.2f" % fps
        cv2.putText(vid_frame, fps_label, (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

    cv2.namedWindow("output", cv2.WINDOW_NORMAL) 

    cv2.imshow("output", vid_frame)
    ###############################################################################
    # End of code for displaying cv2 annotated window
    ###############################################################################
    
    if cv2.waitKey(1) > -1:
        print("finished by user")
        break


Calibration Complete
could not create node
Node not found
ERROR: Node not yet initialized.
Light added
ERROR: Node not yet initialized.
Light added
ERROR: Node not yet initialized.
Light added
ERROR: Node not yet initialized.
Light added
404 Client Error: Not Found for url: http://localhost:3000/api/node/light?nodeId=0
0  -  1
404 Client Error: Not Found for url: http://localhost:3000/api/node/light?nodeId=0
1  -  1
404 Client Error: Not Found for url: http://localhost:3000/api/node/light?nodeId=0
2  -  1
404 Client Error: Not Found for url: http://localhost:3000/api/node/light?nodeId=0
3  -  1
404 Client Error: Not Found for url: http://localhost:3000/api/node/light?nodeId=0
0  -  1
404 Client Error: Not Found for url: http://localhost:3000/api/node/light?nodeId=0
1  -  1
404 Client Error: Not Found for url: http://localhost:3000/api/node/light?nodeId=0
2  -  0
404 Client Error: Not Found for url: http://localhost:3000/api/node/light?nodeId=0
3  -  1
404 Client Error: Not Found for ur

KeyboardInterrupt: 

: 