In [None]:
%pip install -q "openvino>=2024.0.0" "nncf>=2.9.0"
%pip install -q "torch>=2.1" "torchvision>=0.16" "ultralytics==8.2.24" onnx opencv-python tqdm --extra-index-url https://download.pytorch.org/whl/cpu

from pathlib import Path
# Fetch `notebook_utils` module
import requests
from PIL import Image
from ultralytics import YOLO

r = requests.get(
    url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/notebook_utils.py",
)

open("notebook_utils.py", "w").write(r.text)
from notebook_utils import download_file, VideoPlayer

# Download a test sample
IMAGE_PATH = Path("./data/coco_bike.jpg")
download_file(
    url="https://storage.openvinotoolkit.org/repositories/openvino_notebooks/data/data/image/coco_bike.jpg",
    filename=IMAGE_PATH.name,
    directory=IMAGE_PATH.parent,
)

models_dir = Path("./models")
models_dir.mkdir(exist_ok=True)



SEG_MODEL_NAME = "yolov8n-seg"

seg_model = YOLO(models_dir / f"{SEG_MODEL_NAME}.pt")
label_map = seg_model.model.names

res = seg_model(IMAGE_PATH)
Image.fromarray(res[0].plot()[:, :, ::-1])

In [None]:
# instance segmentation model
seg_model_path = models_dir / f"{SEG_MODEL_NAME}_openvino_model/{SEG_MODEL_NAME}.xml"
if not seg_model_path.exists():
    seg_model.export(format="openvino", dynamic=True, half=True)

In [None]:
import ipywidgets as widgets
import openvino as ov

core = ov.Core()

device = widgets.Dropdown(
    options=core.available_devices + ["AUTO"],
    value="AUTO",
    description="Device:",
    disabled=False,
)

device

In [4]:
core = ov.Core()
seg_ov_model = core.read_model(seg_model_path)

ov_config = {}
if device.value != "CPU":
    seg_ov_model.reshape({0: [1, 3, 640, 640]})
if "GPU" in device.value or ("AUTO" in device.value and "GPU" in core.available_devices):
    ov_config = {"GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES"}
seg_compiled_model = core.compile_model(seg_ov_model, device.value, ov_config)

In [5]:
import torch


def infer(*args):
    result = seg_compiled_model(args)
    return torch.from_numpy(result[0]), torch.from_numpy(result[1])


seg_model.predictor.inference = infer
seg_model.predictor.model.pt = False

In [None]:
res = seg_model(IMAGE_PATH)
Image.fromarray(res[0].plot()[:, :, ::-1])

In [None]:
from zipfile import ZipFile

from ultralytics.data.utils import DATASETS_DIR


DATA_URL = "http://images.cocodataset.org/zips/val2017.zip"
LABELS_URL = "https://github.com/ultralytics/yolov5/releases/download/v1.0/coco2017labels-segments.zip"
CFG_URL = "https://raw.githubusercontent.com/ultralytics/ultralytics/v8.1.0/ultralytics/cfg/datasets/coco.yaml"

OUT_DIR = DATASETS_DIR

DATA_PATH = OUT_DIR / "val2017.zip"
LABELS_PATH = OUT_DIR / "coco2017labels-segments.zip"
CFG_PATH = OUT_DIR / "coco.yaml"

download_file(DATA_URL, DATA_PATH.name, DATA_PATH.parent)
download_file(LABELS_URL, LABELS_PATH.name, LABELS_PATH.parent)
download_file(CFG_URL, CFG_PATH.name, CFG_PATH.parent)

if not (OUT_DIR / "coco/labels").exists():
    with ZipFile(LABELS_PATH, "r") as zip_ref:
        zip_ref.extractall(OUT_DIR)
    with ZipFile(DATA_PATH, "r") as zip_ref:
        zip_ref.extractall(OUT_DIR / "coco/images")

In [8]:
import numpy as np
from tqdm.notebook import tqdm
from ultralytics.utils.metrics import ConfusionMatrix


def test(
    model: ov.Model,
    core: ov.Core,
    data_loader: torch.utils.data.DataLoader,
    validator,
    num_samples: int = None,
):
    """
    OpenVINO YOLOv8 model accuracy validation function. Runs model validation on dataset and returns metrics
    Parameters:
        model (Model): OpenVINO model
        data_loader (torch.utils.data.DataLoader): dataset loader
        validator: instance of validator class
        num_samples (int, *optional*, None): validate model only on specified number samples, if provided
    Returns:
        stats: (Dict[str, float]) - dictionary with aggregated accuracy metrics statistics, key is metric name, value is metric value
    """
    validator.seen = 0
    validator.jdict = []
    validator.stats = dict(tp_m=[], tp=[], conf=[], pred_cls=[], target_cls=[])
    validator.batch_i = 1
    validator.confusion_matrix = ConfusionMatrix(nc=validator.nc)
    model.reshape({0: [1, 3, -1, -1]})
    num_outputs = len(model.outputs)
    compiled_model = core.compile_model(model)
    for batch_i, batch in enumerate(tqdm(data_loader, total=num_samples)):
        if num_samples is not None and batch_i == num_samples:
            break
        batch = validator.preprocess(batch)
        results = compiled_model(batch["img"])
        if num_outputs == 1:
            preds = torch.from_numpy(results[compiled_model.output(0)])
        else:
            preds = [
                torch.from_numpy(results[compiled_model.output(0)]),
                torch.from_numpy(results[compiled_model.output(1)]),
            ]
        preds = validator.postprocess(preds)
        validator.update_metrics(preds, batch)
    stats = validator.get_stats()
    return stats


def print_stats(stats: np.ndarray, total_images: int, total_objects: int):
    """
    Helper function for printing accuracy statistic
    Parameters:
        stats: (Dict[str, float]) - dictionary with aggregated accuracy metrics statistics, key is metric name, value is metric value
        total_images (int) -  number of evaluated images
        total objects (int)
    Returns:
        None
    """
    print("Boxes:")
    mp, mr, map50, mean_ap = (
        stats["metrics/precision(B)"],
        stats["metrics/recall(B)"],
        stats["metrics/mAP50(B)"],
        stats["metrics/mAP50-95(B)"],
    )
    # Print results
    print("    Best mean average:")
    s = ("%20s" + "%12s" * 6) % (
        "Class",
        "Images",
        "Labels",
        "Precision",
        "Recall",
        "mAP@.5",
        "mAP@.5:.95",
    )
    print(s)
    pf = "%20s" + "%12i" * 2 + "%12.3g" * 4  # print format
    print(pf % ("all", total_images, total_objects, mp, mr, map50, mean_ap))
    if "metrics/precision(M)" in stats:
        s_mp, s_mr, s_map50, s_mean_ap = (
            stats["metrics/precision(M)"],
            stats["metrics/recall(M)"],
            stats["metrics/mAP50(M)"],
            stats["metrics/mAP50-95(M)"],
        )
        # Print results
        print("    Macro average mean:")
        s = ("%20s" + "%12s" * 6) % (
            "Class",
            "Images",
            "Labels",
            "Precision",
            "Recall",
            "mAP@.5",
            "mAP@.5:.95",
        )
        print(s)
        pf = "%20s" + "%12i" * 2 + "%12.3g" * 4  # print format
        print(pf % ("all", total_images, total_objects, s_mp, s_mr, s_map50, s_mean_ap))

In [9]:
from ultralytics.utils import DEFAULT_CFG
from ultralytics.cfg import get_cfg
from ultralytics.data.converter import coco80_to_coco91_class
from ultralytics.data.utils import check_det_dataset
from ultralytics.utils import ops

args = get_cfg(cfg=DEFAULT_CFG)
args.data = str(CFG_PATH)

In [None]:
seg_validator = seg_model.task_map[seg_model.task]["validator"](args=args)
seg_validator.data = check_det_dataset(args.data)
seg_validator.stride = 32
seg_data_loader = seg_validator.get_dataloader(OUT_DIR / "coco/", 1)

seg_validator.is_coco = True
seg_validator.class_map = coco80_to_coco91_class()
seg_validator.names = seg_model.model.names
seg_validator.metrics.names = seg_validator.names
seg_validator.nc = seg_model.model.model[-1].nc
seg_validator.nm = 32
seg_validator.process = ops.process_mask
seg_validator.plot_masks = []
NUM_TEST_SAMPLES = 300
fp_seg_stats = test(seg_ov_model, core, seg_data_loader, seg_validator, num_samples=NUM_TEST_SAMPLES)
print_stats(fp_seg_stats, seg_validator.seen, seg_validator.nt_per_class.sum())

In [None]:
import ipywidgets as widgets

int8_model_seg_path = models_dir / f"{SEG_MODEL_NAME}_openvino_int8_model/{SEG_MODEL_NAME}.xml"

to_quantize = widgets.Checkbox(
    value=True,
    description="Quantization",
    disabled=False,
)

to_quantize

In [12]:
# Fetch skip_kernel_extension module
import requests

r = requests.get(
    url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/skip_kernel_extension.py",
)
open("skip_kernel_extension.py", "w").write(r.text)

%load_ext skip_kernel_extension

In [None]:
%%skip not $to_quantize.value


import nncf
from typing import Dict


def transform_fn(data_item:Dict):
    """
    Quantization transform function. Extracts and preprocess input data from dataloader item for quantization.
    Parameters:
       data_item: Dict with data item produced by DataLoader during iteration
    Returns:
        input_tensor: Input data for quantization
    """
    input_tensor = seg_validator.preprocess(data_item)['img'].numpy()
    return input_tensor


quantization_dataset = nncf.Dataset(seg_data_loader, transform_fn)

In [None]:
%%skip not $to_quantize.value

ignored_scope = nncf.IgnoredScope(
    names=[
        "__module.model.22.cv3.0.0.conv/aten::_convolution/Convolution",  # in the post-processing subgraph
            "__module.model.22.proto.cv1.conv/aten::_convolution/Convolution",
            "__module.model.22.cv4.0.0.conv/aten::_convolution/Convolution",
            "__module.model.16.conv/aten::_convolution/Convolution",
            "__module.model.22.cv2.0.0.conv/aten::_convolution/Convolution",
            "__module.model.6.cv1.conv/aten::_convolution/Convolution",
            "__module.model.22.cv3.1.1.conv/aten::_convolution/Convolution",
            "__module.model.21.cv2.conv/aten::_convolution/Convolution",
            "__module.model.21.m.0.cv1.conv/aten::_convolution/Convolution",
            "__module.model.22/aten::add/Add_6",
            "__module.model.22/aten::sub/Subtract",
            "__module.model.7.conv/aten::_convolution/Convolution",
            "__module.model.12.cv1.conv/aten::_convolution/Convolution",
            "__module.model.4.cv1.conv/aten::_convolution/Convolution",
            "__module.model.22.cv2.2.1.conv/aten::_convolution/Convolution",
            "__module.model.22.cv2.0.1.conv/aten::_convolution/Convolution",
            "__module.model.22.cv4.2.1.conv/aten::_convolution/Convolution",
            "__module.model.22.dfl.conv/aten::_convolution/Convolution",
            "__module.model.22.cv3.2.2/aten::_convolution/Convolution",
            "__module.model.22.cv3.0.2/aten::_convolution/Convolution",
            "__module.model.15.cv1.conv/aten::_convolution/Convolution",
            "__module.model.5.conv/aten::_convolution/Convolution",
            "__module.model.0.conv/aten::_convolution/Convolution"
    ]
)

# Segmentation model
quantized_seg_model = nncf.quantize(
    seg_ov_model,
    quantization_dataset,
    preset=nncf.QuantizationPreset.MIXED,
    ignored_scope=ignored_scope
)

In [None]:
%%skip not $to_quantize.value

print(f"Quantized segmentation model will be saved to {int8_model_seg_path}")
ov.save_model(quantized_seg_model, str(int8_model_seg_path))

In [16]:
%%skip not $to_quantize.value

device

In [17]:
%%skip not $to_quantize.value

ov_config = {}
if device.value != "CPU":
    quantized_seg_model.reshape({0: [1, 3, 640, 640]})
if "GPU" in device.value or ("AUTO" in device.value and "GPU" in core.available_devices):
    ov_config = {"GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES"}

quantized_seg_compiled_model = core.compile_model(quantized_seg_model, device.value, ov_config)

In [18]:
%%skip not $to_quantize.value


def infer(*args):
    result = quantized_seg_compiled_model(args)
    return torch.from_numpy(result[0]), torch.from_numpy(result[1])

seg_model.predictor.inference = infer

In [None]:
%%skip not $to_quantize.value

res = seg_model(IMAGE_PATH)
display(Image.fromarray(res[0].plot()[:, :, ::-1]))