## AI Inference on a video stream
This notebook is a simple example of how to use DeGirum PySDK to do AI inference on a video stream.

This script works with the following inference options:

1. Run inference on DeGirum Cloud Platform;
2. Run inference on DeGirum AI Server deployed on a localhost or on some computer in your LAN or VPN;
3. Run inference on DeGirum ORCA accelerator directly installed on your computer.

To try different options, you need to specify the appropriate `target` option. 

You also need to specify your cloud API access token in [env.ini](../../env.ini) file, located in the same directory as this notebook.

You can change `video_source` to index of a local webcamera, or URL of an RTSP stream, or URL of a YouTube video, or path to another video file.


In [None]:
# make sure degirum-tools package is installed
!pip show degirum-tools || pip install degirum-tools

#### Specify where you want to run your inferences, model zoo url, model name and video source

In [4]:
# hw_location: where you want to run inference
#     @cloud to use DeGirum cloud
#     @local to run on local machine
#     IP address for AI server inference
# model_zoo_url: url/path for model zoo
#     cloud_zoo_url: valid for @cloud, @local, and ai server inference options
#     '': ai server serving models from local folder
#     path to json file: single model zoo in case of @local inference
# model_name: name of the model for running AI inference
# video_source: video source for inference
#     camera index for local web camera
#     URL of RTSP stream
#     URL of YouTube Video
#     path to video file (mp4 etc)
hw_location='@cloud'
model_zoo_url = 'https://cs.degirum.com/degirum/ultralytics_v6'
model_name= 'yolov8n_silu_coco--640x640_float_openvino_cpu_1'
video_source = '../../images/example_video.mp4'        

In [5]:
import degirum as dg, degirum_tools
# configure for Google Colab
degirum_tools.configure_colab() 
# connect to AI inference engine getting token from env.ini file
zoo = dg.connect(hw_location, model_zoo_url, degirum_tools.get_token())
# load object detection AI model for DeGirum Orca AI accelerator
model = zoo.load_model(model_name,
                       overlay_show_probabilities=True
                       )

#### The rest of the cells below should run without any modifications

In [10]:

# AI prediction loop
# Press 'x' or 'q' to stop
with degirum_tools.Display("AI Camera") as display:    
    for inference_result in degirum_tools.predict_stream(model, video_source):
        display.show(inference_result)

Successfully opened video stream '../../images/example_video.mp4'


In [None]:
model.overlay_color

In [11]:
import supervision as sv
import numpy as np
box_annotator = sv.BoundingBoxAnnotator()
label_annotator = sv.LabelAnnotator()
blur_annotator = sv.BlurAnnotator()
with degirum_tools.Display("AI Camera") as display:    
    for inference_result in degirum_tools.predict_stream(model, video_source):
        detections = sv.Detections(np.array([obj["bbox"] for obj in res.results]), 
                                   confidence=np.array([obj["score"] for obj in res.results]),
                                   class_id=np.array([obj["category_id"] for obj in res.results]))
        labels = [ f"{model.label_dictionary[class_id]} {confidence:0.2f}"
          for _, _, confidence, class_id, track_id in detections
         ]
        annotated_frame = box_annotator.annotate(scene=res.image.copy(),
                                                 detections=detections
                                                 )
        annotated_frame = blur_annotator.annotate(scene=annotated_frame,
                                                 detections=detections
                                                 )
        annotated_labeled_frame = label_annotator.annotate(scene=annotated_frame,
                                                           detections=detections,
                                                           labels=labels
                                                           )
        display.show(annotated_labeled_frame)
            

Successfully opened video stream '../../images/example_video.mp4'


In [None]:
import cv2
import supervision as sv
import numpy as np
class ZoneCounter:
    """
    Class to count detected object bounding boxes in polygon zones
    """

    # Triggering position within the bounding box
    CENTER = sv.Position.CENTER
    CENTER_LEFT = sv.Position.CENTER_LEFT
    CENTER_RIGHT = sv.Position.CENTER_RIGHT
    TOP_CENTER = sv.Position.TOP_CENTER
    TOP_LEFT = sv.Position.TOP_LEFT
    TOP_RIGHT = sv.Position.TOP_RIGHT
    BOTTOM_LEFT = sv.Position.BOTTOM_LEFT
    BOTTOM_CENTER = sv.Position.BOTTOM_CENTER
    BOTTOM_RIGHT = sv.Position.BOTTOM_RIGHT

    def __init__(
        self,
        zone_polygons,        
        zone_colors,
        *,        
        triggering_position=BOTTOM_CENTER,
        window_name=None,
    ):
        """Constructor

        Args:
            zone_polygons - list of polygons to count objects in; each polygon is a list of points (x,y)
            triggering_position: the position within the bounding box that triggers the zone
            window_name - optional OpenCV window name to configure for interactive zone adjustment
        """

        self._wh = None
        self._zones = None
        self._win_name = window_name
        self._mouse_callback_installed = False
        self._triggering_position = triggering_position
        self._polygons = [
            np.array(polygon, dtype=np.int32) for polygon in zone_polygons
        ]
        self._zone_annotators=None
        self._zone_colors=zone_colors

    def _lazy_init(self, result):
        """
        Complete deferred initialization steps
            - initialize polygon zones from model result object
            - install mouse callback
        """
        if self._zones is None:
            self._wh = (result.image.shape[1], result.image.shape[0])
            print(self._wh)
            self._zones = [
                sv.PolygonZone(polygon, self._wh, self._triggering_position)
                for polygon in self._polygons
            ]
        if self._zone_annotators is None:
            self._zone_annotators = [
                sv.PolygonZoneAnnotator(zone, color)
                for zone, color in zip(self._zones,self._zone_colors)
            ]
        if not self._mouse_callback_installed and self._win_name is not None:
            self._install_mouse_callback()

    def window_attach(self, win_name):
        """Attach OpenCV window for interactive zone adjustment by installing mouse callback
        Args:
            win_name - OpenCV window name to attach to
        """

        self._win_name = win_name
        self._mouse_callback_installed = False

    def count(self, res, detections):
        """
        Count detected object bounding boxes in polygon zones

        Args:
            result - model result object
        Returns:
            list of object counts found in each polygon zone
        """
        self._lazy_init(res)
        if self._zones is not None:
            return [
                (zone.trigger(detections).sum() if len(detections) > 0 else 0)
                for zone in self._zones
            ]
        return None
    def annotate(self, image, zone_counts):
        """
        Display polygon zones and counts on given image

        Args:
            result - result object to take display settings from
            image - image to display on
            zone_counts - list of object counts found in each polygon zone
        Returns:
            annotated image
        """
        if self._zone_annotators is not None:
            print(image.shape)
            image= [
                (zone_annotator.annotate(image, zone_count))
                for zone_annotator, zone_count in zip(self._zone_annotators, zone_counts)
            ]
            print(image)
        return image   
    def display(self, result, image, zone_counts):
        """
        Display polygon zones and counts on given image

        Args:
            result - result object to take display settings from
            image - image to display on
            zone_counts - list of object counts found in each polygon zone
        Returns:
            annotated image
        """

        def color_complement(color):
            adj_color = (color[0] if isinstance(color, list) else color)[::-1]
            return tuple([255 - c for c in adj_color])

        zone_color = color_complement(result.overlay_color)
        background_color = color_complement(result.overlay_fill_color)

        for zi in range(len(self._polygons)):
            cv2.polylines(
                image, [self._polygons[zi]], True, zone_color, result.overlay_line_width
            )
            degirum_tools.Display.put_text(
                image,
                f"Zone {zi}: {zone_counts[zi]}",
                self._polygons[zi][0],
                zone_color,
                background_color,
                cv2.FONT_HERSHEY_PLAIN,
                result.overlay_font_scale,
            )
        return image
    def _mouse_callback(event, x, y, flags, self):
        """Mouse callback for OpenCV window for interactive zone operations"""

        click_point = np.array((x, y))

        def zone_update():
            idx = self._gui_state["update"]
            if idx >= 0 and self._wh is not None:
                self._zones[idx] = sv.PolygonZone(
                    self._polygons[idx], self._wh, self._triggering_position
                )

        if event == cv2.EVENT_LBUTTONDOWN:
            for idx, polygon in enumerate(self._polygons):
                if cv2.pointPolygonTest(polygon, (x, y), False) > 0:
                    zone_update()
                    self._gui_state["dragging"] = polygon
                    self._gui_state["offset"] = click_point
                    self._gui_state["update"] = idx
                    break

        if event == cv2.EVENT_RBUTTONDOWN:
            for idx, polygon in enumerate(self._polygons):
                for pt in polygon:
                    if np.linalg.norm(pt - click_point) < 10:
                        zone_update()
                        self._gui_state["dragging"] = pt
                        self._gui_state["offset"] = click_point
                        self._gui_state["update"] = idx
                        break

        elif event == cv2.EVENT_MOUSEMOVE:
            if self._gui_state["dragging"] is not None:
                delta = click_point - self._gui_state["offset"]
                self._gui_state["dragging"] += delta
                self._gui_state["offset"] = click_point

        elif event == cv2.EVENT_LBUTTONUP or event == cv2.EVENT_RBUTTONUP:
            self._gui_state["dragging"] = None
            zone_update()
            self._gui_state["update"] = -1

    def _install_mouse_callback(self):
        try:
            cv2.setMouseCallback(self._win_name, ZoneCounter._mouse_callback, self)  # type: ignore[attr-defined]
            self._gui_state = {"dragging": None, "update": -1}
            self._mouse_callback_installed = True
        except Exception:
            pass  # ignore errors

In [None]:
class PredictStreamResult:
    def __init__(self, res, detections, labels, zone_counter=None, zone_counts=None):
        self._result = res
        self._detections=detections
        self._labels=labels
        self._zone_counts = zone_counts
        self._zone_counter = zone_counter
        
    def __getattr__(self, item):
        return getattr(self._result, item)

    @property
    def image_overlay(self):
        if self._zone_counter is not None:
            return self._zone_counter.annotate(self._result.image_overlay.copy(), self.zone_counts)            
        else:           
            return self._result.image_overlay

def predict_stream(model,
                   video_source,
                   zone_counter=None                   
                   ):
      
    model.image_backend = "opencv"
    model.input_numpy_colorspace = "BGR"
    do_zone_count = zone_counter is not None
    zone_counts=None

    with degirum_tools.open_video_stream(video_source) as stream:
        for inference_result in model.predict_batch(degirum_tools.video_source(stream)):            
            if len(res.results):
                detections = sv.Detections(np.array([obj["bbox"] for obj in res.results]), 
                                            confidence=np.array([obj["score"] for obj in res.results]),
                                            class_id=np.array([obj["category_id"] for obj in res.results]))
                labels = [ f"{model.label_dictionary[class_id]} {confidence:0.2f}"
                          for _, _, confidence, class_id, track_id in detections
                          ]                
            else:
                detections=sv.Detections.empty()
                labels=[]
            if do_zone_count:
                zone_counts=zone_counter.count(res,detections)
                           
            yield PredictStreamResult(res,detections,labels, zone_counter,zone_counts)

            

In [None]:
# load model
model = zoo.load_model(model_name, overlay_line_width=1)

# define polygon zone coordinates
polygons = [
    [[10, 50], [600, 50], [600, 400], [10, 400]],
]

# AI prediction loop
# Press 'x' or 'q' to stop
# Drag zone by left mouse button to move zone
# Drag zone corners by right mouse button to adjust zone shape
with degirum_tools.Display("AI Camera") as display:
    # create zone counter
    zone_counter = ZoneCounter(polygons,
                               zone_colors=[sv.Color(255,255,0)],
                               triggering_position=ZoneCounter.CENTER,
                               window_name=display.window_name,  # attach display window for interactive zone adjustment
                               )

    # do AI predictions on video stream
    for result in predict_stream( model, video_source, zone_counter=zone_counter):
        display.show(result.image_overlay)

In [None]:
zone_counter.annotate

In [None]:
sv.Color()

In [None]:
import supervision as sv
import numpy as np
byte_tracker = sv.ByteTrack()
box_annotator = sv.BoundingBoxAnnotator()
label_annotator = sv.LabelAnnotator()
video_source='../../images/people_720p.mp4'
frame=0
with degirum_tools.Display("AI Camera") as display: 
    for inference_result in degirum_tools.predict_stream(model, video_source):
        if len(res.results):
            detections = sv.Detections(np.array([obj["bbox"] for obj in res.results]), 
                                        confidence=np.array([obj["score"] for obj in res.results]),
                                        class_id=np.array([obj["category_id"] for obj in res.results]))
            detections = byte_tracker.update_with_detections(detections)
            labels = [ f"#{tracker_id} {model.label_dictionary[class_id]} {confidence:0.2f}"
            for _, _, confidence, class_id, tracker_id in detections
            ]
        
        else:
            detections=sv.Detections.empty()
            labels=[]
        annotated_frame = box_annotator.annotate(scene=res.image.copy(),
                                                detections=detections                                                      
                                                )
        annotated_labeled_frame = label_annotator.annotate(scene=annotated_frame, 
                                                        detections=detections,
                                                        labels=labels
                                                        )
        display.show(annotated_labeled_frame)
                                

In [None]:
ds = sv.DetectionDataset.from_coco(images_directory_path='C:/Users/ShashiChilappagari/Documents/Python_Scripts/Datasets/coco/images/val2017/',
                                   annotations_path='C:/Users/ShashiChilappagari/Documents/Python_Scripts/Datasets/coco/annotations/instances_val2017.json'
                                   )

In [None]:
# load model
model = zoo.load_model(model_name, overlay_line_width=1)

# define polygon zone coordinates
polygons = [
    [[10, 50], [600, 50], [600, 400], [10, 400]],
]

# AI prediction loop
# Press 'x' or 'q' to stop
# Drag zone by left mouse button to move zone
# Drag zone corners by right mouse button to adjust zone shape
with degirum_tools.Display("AI Camera") as display:
    # create zone counter
    zone_counter = degirum_tools.ZoneCounter(
        polygons,
        class_list=["person"],
        triggering_position=degirum_tools.ZoneCounter.CENTER,
        window_name=display.window_name,  # attach display window for interactive zone adjustment
    )

    # do AI predictions on video stream
    for inference_result in degirum_tools.predict_stream(
        model, video_source, zone_counter=zone_counter
    ):
        display.show(inference_result)