# Person Detection and Tracking with state machine and audio output

In [40]:
import base64
from IPython.display import Image, display
import matplotlib.pyplot as plt

def mm(graph):
    graphbytes = graph.encode("utf8")
    base64_bytes = base64.b64encode(graphbytes)
    base64_string = base64_bytes.decode("ascii")
    display(Image(url="https://mermaid.ink/img/" + base64_string))

mm("""
graph LR
    IDLE[Idle]
    COOLDOWN[Cooldown]
    HELLO[Hello]

    IDLE --> COOLDOWN
    COOLDOWN --> HELLO
    HELLO --> COOLDOWN
""")

### Imports

In [41]:
import collections
import sys
import time
import numpy as np
import cv2
import matplotlib.pyplot as plt
import openvino as ov
import notebook_utils as utils
from deepsort_utils.tracker import Tracker
from deepsort_utils.nn_matching import NearestNeighborDistanceMetric
from deepsort_utils.detection import Detection, compute_color_for_labels, xywh_to_xyxy, xywh_to_tlwh, tlwh_to_xyxy

### Model loader class

In [42]:
class ModelLoader:
    def __init__(self, model_path, core, batchsize=1, device="AUTO"):
        self.model = core.read_model(model=model_path)
        self.input_layer = self.model.input(0)
        self.input_shape = self.input_layer.shape
        self.height = self.input_shape[2]
        self.width = self.input_shape[3]

        for layer in self.model.inputs:
            input_shape = layer.partial_shape
            input_shape[0] = batchsize
            self.model.reshape({layer: input_shape})
        self.compiled_model = core.compile_model(model=self.model, device_name=device)
        self.output_layer = self.compiled_model.output(0)

    def predict(self, input):
        result = self.compiled_model(input)[self.output_layer]
        return result

### Frame processor class

In [43]:
class FrameProcessor:
    def preprocess(self, frame, height, width):
        resized_image = cv2.resize(frame, (width, height))
        resized_image = resized_image.transpose((2, 0, 1))
        input_image = np.expand_dims(resized_image, axis=0).astype(np.float32)
        return input_image

    def batch_preprocess(self, img_crops, height, width):
        img_batch = np.concatenate([
            self.preprocess(img, height, width)
            for img in img_crops
        ], axis=0)
        return img_batch

    def process_results(self, h, w, results, thresh=0.5):
        # The 'results' variable is a [1, 1, N, 7] tensor.
        detections = results.reshape(-1, 7)
        boxes = []
        labels = []
        scores = []
        for i, detection in enumerate(detections):
            _, label, score, xmin, ymin, xmax, ymax = detection
            # Filter detected objects.
            if score > thresh:
                # Create a box with pixels coordinates from the box with normalized coordinates [0,1].
                boxes.append(
                    [(xmin + xmax) / 2 * w, (ymin + ymax) / 2 * h, (xmax - xmin) * w, (ymax - ymin) * h]
                )
                labels.append(int(label))
                scores.append(float(score))

        if len(boxes) == 0:
            boxes = np.array([]).reshape(0, 4)
            scores = np.array([])
            labels = np.array([])
        return np.array(boxes), np.array(scores), np.array(labels)

    def draw_boxes(self, img, bbox, identities=None):
        for i, box in enumerate(bbox):
            x1, y1, x2, y2 = [int(i) for i in box]
            # box text and bar
            id = int(identities[i]) if identities is not None else 0
            color = compute_color_for_labels(id)
            label = '{}{:d}'.format("", id)
            t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 2, 2)[0]
            cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
            cv2.rectangle(
                img, (x1, y1), (x1 + t_size[0] + 3, y1 + t_size[1] + 4), color, -1)
            cv2.putText(
                img,
                label,
                (x1, y1 + t_size[1] + 4),
                cv2.FONT_HERSHEY_PLAIN,
                1.6,
                [255, 255, 255],
                2
            )
        return img

    # def cosin_metric(self, x1, x2):
    #     return np.dot(x1, x2) / (np.linalg.norm(x1) * np.linalg.norm(x2))

### Initialize tracker function

In [44]:
def init_tracker(
    metric_name="cosine", 
    nn_budget=100, 
    max_cosine_distance=0.6, 
    max_iou_distance=0.7,
    max_age=70,
    n_init=3
    ):

    metric = NearestNeighborDistanceMetric(metric_name, max_cosine_distance, nn_budget)
    tracker = Tracker(
        metric,
        max_iou_distance=max_iou_distance,
        max_age=max_age,
        n_init=n_init
    )

    return tracker

In [45]:
from IPython.display import clear_output
from playsound import playsound
import threading

# Specify the path to the existing audio file
existing_audio_path = "/home/acer/workspace/openvino-applications/welcome.mp3"  # Replace with the actual path


class PersonTrackingStateMachine:
    def __init__(self):
        self.state = "IDLE"
        self.cooldown_start_time = 0
        self.cooldown_duration = 10  # in seconds
        self.processed_ids = []
        self.sound_thread = None

    def start_cooldown_timer(self):
        self.cooldown_start_time = time.time()

    def update_cooldown_timer(self):
        if self.state != "IDLE":
            elapsed_time = time.time() - self.cooldown_start_time
            remaining_time = max(0, self.cooldown_duration - elapsed_time)
            # clear_output(wait=True)  # Clear the previous output
            print(f"Cooldown timer: {remaining_time:.2f} seconds")

            if remaining_time == 0:
                self.state = "IDLE"
                print("Cooldown timer reached zero. Transitioning to IDLE.")
    
    def play_hello_audio(self):
        # Using playsound to play the existing audio file
        # playsound(existing_audio_path)

        # Using threading to play the existing audio file in the background
        self.sound_thread = threading.Thread(target=playsound, args=(existing_audio_path,))
        self.sound_thread.start()

    def stop_sound_thread(self):
        if self.sound_thread and self.sound_thread.is_alive():
            self.sound_thread.join()


    def process_detections(self, new_ids):
        if self.state == "IDLE":
            new_unique_ids = list(set(new_ids) - set(self.processed_ids))
            if new_unique_ids:
                # clear_output(wait=True)  # Clear the previous output
                print("Hello")
                self.play_hello_audio()  # Play the existing audio
                self.processed_ids.extend(new_unique_ids)
                self.start_cooldown_timer()
                self.state = "COOLDOWN"
        elif self.state == "COOLDOWN":
            new_unique_ids = list(set(new_ids) - set(self.processed_ids))
            self.processed_ids.extend(new_unique_ids)
        elif self.state == "HELLO_AFTER_COOLDOWN":
            new_unique_ids = list(set(new_ids) - set(self.processed_ids))
            if new_unique_ids:
                # clear_output(wait=True)  # Clear the previous output
                print("Hello")
                self.play_hello_audio()  # Play the existing audio
                self.processed_ids.extend(new_unique_ids)
                self.start_cooldown_timer()
                self.state = "COOLDOWN"


In [46]:
class DetectionPipeline:
    def __init__(self, detection_model_path, reid_model_path):
        self.core = ov.Core()
        self.detector = ModelLoader(model_path=detection_model_path, core=self.core)
        self.extractor = ModelLoader(model_path=reid_model_path, core=self.core, batchsize=-1)
        self.frame_processor = FrameProcessor()
        self.tracker = init_tracker()
    

    # Main processing function to run person tracking.
    def run_person_tracking(self, source=0, flip=False, skip_first_frames=0):
        
        player = None



        try:
            # Create a video player to play with target fps.
            player = utils.VideoPlayer(
                source=source, size=(700, 450), flip=flip, fps=24, skip_first_frames=skip_first_frames
            )
            # Start capturing.
            player.start()
            title = "Person Tracking"
            cv2.namedWindow(winname=title, flags=cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)

            processing_times = collections.deque()
            processessed_ids = []

            state_machine = PersonTrackingStateMachine()

            while True:
                clear_output(wait=True)  # Clear the previous output
                # Grab the frame.
                frame = player.next()
                if frame is None:
                    print("Source ended")
                    break
                # If the frame is larger than full HD, reduce size to improve the performance.

                # Resize the image and change dims to fit neural network input.
                h, w = frame.shape[:2]
                input_image = self.frame_processor.preprocess(frame, self.detector.height, self.detector.width)

                # Measure processing time.
                start_time = time.time()
                # Get the results.
                output = self.detector.predict(input_image)
                stop_time = time.time()
                processing_times.append(stop_time - start_time)
                if len(processing_times) > 200:
                    processing_times.popleft()

                _, f_width = frame.shape[:2]
                # Mean processing time [ms].
                processing_time = np.mean(processing_times) * 1100
                fps = 1000 / processing_time

                # Get poses from detection results.
                bbox_xywh, score, label = self.frame_processor.process_results(h, w, results=output)
                
                img_crops = []
                for box in bbox_xywh:
                    x1, y1, x2, y2 = xywh_to_xyxy(box, h, w)
                    img = frame[y1:y2, x1:x2]
                    img_crops.append(img)

                # Get reidentification feature of each person.
                if img_crops:
                    # preprocess
                    img_batch = self.frame_processor.batch_preprocess(img_crops, self.extractor.height, self.extractor.width)
                    features = self.extractor.predict(img_batch)
                else:
                    features = np.array([])

                # Wrap the detection and reidentification results together
                bbox_tlwh = xywh_to_tlwh(bbox_xywh)
                detections = [Detection(bbox_tlwh[i], features[i]) for i in range(features.shape[0])]

                # predict the position of tracking target 
                self.tracker.predict()

                # update tracker
                self.tracker.update(detections)

                # update bbox identities
                outputs = []
                
                new_ids = []

                for track in self.tracker.tracks:
                    if not track.is_confirmed() or track.time_since_update > 1:
                        continue
                    box = track.to_tlwh()
                    x1, y1, x2, y2 = tlwh_to_xyxy(box, h, w)
                    track_id = track.track_id
                    outputs.append(np.array([x1, y1, x2, y2, track_id], dtype=np.int32))
                    new_ids.append(track_id)

                if len(outputs) > 0:
                    outputs = np.stack(outputs, axis=0)

                # draw box for visualization
                if len(outputs) > 0:
                    bbox_tlwh = []
                    bbox_xyxy = outputs[:, :4]
                    identities = outputs[:, -1]
                    frame = self.frame_processor.draw_boxes(frame, bbox_xyxy, identities)

                print("new_ids: ", new_ids)
                state_machine.process_detections(new_ids)
                state_machine.update_cooldown_timer()

                cv2.putText(
                    img=frame,
                    text=f"Inference time: {processing_time:.1f}ms",
                    org=(20, 30),
                    fontFace=cv2.FONT_HERSHEY_PLAIN,
                    fontScale=(f_width / 1000)*2,
                    color=(0, 255, 0),
                    thickness=2
                )

                cv2.putText(
                    img=frame,
                    text=f"{fps:.1f} FPS",
                    org=(20, 60),
                    fontFace=cv2.FONT_HERSHEY_PLAIN,
                    fontScale=(f_width / 1000)*2,
                    color=(0, 255, 0),
                    thickness=2
                )
                
                cv2.imshow(winname=title, mat=frame)
                key = cv2.waitKey(1)
                # escape = 27
                if key == 27:
                    break
                    
            # Stop the sound thread before starting the next iteration
            state_machine.stop_sound_thread()

        
        # ctrl-c
        except KeyboardInterrupt:
            print("Interrupted")
        # any different error
        except RuntimeError as e:
            print(e)
        finally:
            if player is not None:
                # Stop capturing.
                player.stop()
            cv2.destroyAllWindows()
        
    

In [47]:
def run(source, detection_model_path, reidentification_model_path):
    obj = DetectionPipeline(detection_model_path, reidentification_model_path)
    obj.run_person_tracking(source)

In [48]:
if __name__ == "__main__":
    detection_model_path = '/home/acer/workspace/intel_models/intel/person-detection-0202/FP16/person-detection-0202.xml'
    reidentification_model_path = '/home/acer/workspace/intel_models/intel/person-reidentification-retail-0287/FP16/person-reidentification-retail-0287.xml'
    
    run(source=2, detection_model_path=detection_model_path, reidentification_model_path=reidentification_model_path)

new_ids:  [1]
Cooldown timer: 4.54 seconds


In [49]:
a = [1, 2]

In [50]:
b = [3]

In [51]:
a.extend(b)

In [52]:
a

[1, 2, 3]

In [53]:
c = []

In [54]:
a.extend(c)

In [55]:
a

[1, 2, 3]