## PyTorch Sample

In [None]:
from torchvision.io.image import decode_image
from torchvision.models.detection import fasterrcnn_resnet50_fpn_v2, FasterRCNN_ResNet50_FPN_V2_Weights
from torchvision.utils import draw_bounding_boxes
from torchvision.transforms.functional import to_pil_image

img = decode_image("pexels-jeffrey-czum-2346165.jpg")

# Step 1: Initialize model with the best available weights
weights = FasterRCNN_ResNet50_FPN_V2_Weights.DEFAULT
model = fasterrcnn_resnet50_fpn_v2(weights=weights, box_score_thresh=0.9)
model.eval()

# Step 2: Initialize the inference transforms
preprocess = weights.transforms()

# Step 3: Apply inference preprocessing transforms
batch = [preprocess(img)]

# Step 4: Use the model and visualize the prediction
prediction = model(batch)[0]
labels = [weights.meta["categories"][i] for i in prediction["labels"]]
box = draw_bounding_boxes(img, boxes=prediction["boxes"],
                          labels=labels,
                          colors="red",
                          width=4, font="Helvetica.ttf", font_size=30)
im = to_pil_image(box.detach())
im.resize([i//3 for i in im.size])

## Native Python PyTorch

In [None]:
import os
import random
import math
from torchvision.io.image import decode_image
from torchvision.models.detection import fasterrcnn_resnet50_fpn_v2, FasterRCNN_ResNet50_FPN_V2_Weights
from torchvision.models.detection import fcos_resnet50_fpn, FCOS_ResNet50_FPN_Weights
from torchvision.utils import draw_bounding_boxes
from torchvision.transforms.functional import to_pil_image
import torch
from tqdm import tqdm
import logging

def batched(array, size):
    array_iter = iter(array) 
    while True: 
        b = []
        try: [b.append(next(array_iter)) for _ in range(size)]
        except: StopIteration
        if b: yield b 
        else: break

def annotate(img, prediction, categories):
    labels = [categories[i] for i in prediction["labels"]]
    box = draw_bounding_boxes(
        image=img, 
        boxes=prediction['boxes'], 
        labels=labels, 
        colors="red", 
        width=4, 
        font="Helvetica.ttf" if os.path.exists("Helvetica.ttf") else None, 
        font_size=30
    )
    im = to_pil_image(box.detach())
    return im

def detect_annotate_save(images, save_files, preprocess, model, categories, thresh=0.5): 
    processed_images = [preprocess(image) for image in images]
    predictions = model(processed_images) 
    
    for image, prediction, fname in zip(images, predictions, save_files): 
        prediction = {
            k: prediction[k][prediction['scores']>thresh] for k in prediction
        }
        im = annotate(image, prediction, categories)
        im.save(fname)

def annotate_batch(images, predictions, categories, thresh=0.5): 
    annotated_images = [] 
    for image, prediction in zip(images, predictions): 
        prediction = {
            k: prediction[k][prediction['scores']>thresh] for k in prediction
        }
        annotated_images.append(annotate(image, prediction, categories)) 
    return annotated_images

def get_model_and_weights(builder_func=fcos_resnet50_fpn, weight_class=FCOS_ResNet50_FPN_Weights): 
    weights = weight_class.DEFAULT
    model = builder_func(weights=weights, box_score_thresh=0.9)
    model.eval()
    # model.backbone = torch.jit.script(model.backbone) 
    return model, weights

def detection2D(files, output_dir='output', batch_size=5, ):
    logger = logging.getLogger("detection2D")
    if not os.path.isdir(output_dir): 
        logger.warning(f"Output directory '{output_dir}' was not found. Creating directory '{output_dir}'")
        os.mkdir(output_dir)
    fnames = [os.path.basename(f) for f in files]
    save_files = [os.path.join(output_dir, p) for p in fnames]
    imgs = [(decode_image(f), sf) for f, sf in zip(files, save_files)]
    batches = batched(imgs, batch_size) 
    batches = (tuple(zip(*b)) for b in batches)
    
    logger.info("Loading model")
    model, weights = get_model_and_weights(fasterrcnn_resnet50_fpn_v2, FasterRCNN_ResNet50_FPN_V2_Weights)
    preprocess = weights.transforms()
    def infer(images):
        processed_images = [preprocess(img) for img in images] 
        return model(processed_images) 

    logger.info("Starting detection")
    for batch in tqdm(batches, total=math.ceil(len(imgs)/batch_size)):
        images, save_names = batch
        predictions = infer(images) 
        
        # detect_annotate_save(images, save_names, preprocess, model, weights.meta["categories"]) 
        annotated_images = annotate_batch(images, predictions, weights.meta['categories']) 
        [img.save(sn) for img, sn in zip(annotated_images, save_names)]

In [None]:
files = [os.path.join("pics", f) for f in os.listdir("pics")] 
logging.basicConfig(level=logging.INFO)
detection2D(files[:5],output_dir="output",batch_size=2) 

## Export to PyTorch to ONNX 

In [None]:
import torch
print(torch.__version__)

import onnxscript
print(onnxscript.__version__)

from onnxscript import opset18  # opset 18 is the latest (and only) supported version for now

import onnxruntime
print(onnxruntime.__version__)

In [None]:
model, wieghts = get_model_and_weights()

In [None]:
dummy_input = torch.randn(1, 3, 1000,1000)
torch.onnx.export(
    model,
    dummy_input,
    "fcos.onnx",
    export_params=True,
    opset_version=11,
    do_constant_folding=True,
    input_names=['input'],
    output_names=['boxes', 'scores', 'labels'], 
    # output_names=['batch_size'], 
    dynamic_axes={
        'input': {0: 'batch_size', 2: 'height', 3: 'width'},  # Make batch, height, and width dynamic
        'output': {0: 'batch_size'}  # Example: make output batch dimension dynamic
    }
)


In [None]:
import onnx
onnx_model = onnx.load("fcos.onnx")
onnx.checker.check_model(onnx_model)

In [None]:
import onnxruntime as ort
import numpy as np 
import torch
class ONNX_model: 
    def __init__(self, model_file): 
        self.session = ort.InferenceSession(model_file, provider_options=ort.get_available_providers()) # from https://onnxruntime.ai/docs/api/python/tutorial.html
    def infer(self, input_img): 
        input_img = np.asarray(input_img)
        input_img = input_img.reshape(1, *input_img.shape)
        input_name = self.session.get_inputs()[0].name
        output_names = [out.name for out in self.session.get_outputs()] 
        outputs = self.session.run(output_names, {input_name: input_img}) 
        return { 
            name: torch.from_numpy(op) for name, op in zip(output_names, outputs) 
        }
    def __call__(self, input_batch):
        return [self.infer(img) for img in input_batch] 

## Test on individual image 

In [None]:
img_files = ["pexels-jeffrey-czum-2346165.jpg", "pexels-lam-kiên-15008127.jpg"]
# img_files = files[:5] 
imgs = [decode_image(img_file) for img_file in img_files]

In [None]:
model, weights = get_model_and_weights()

In [None]:
batch = [weights.transforms()(img) for img in imgs] 

### native pytorch 

In [None]:
model, weights = get_model_and_weights()

In [None]:
model(torch.rand(1,1,1000,1000))[0].keys()

### Onnx

In [None]:
# model = ONNX_model('FasterRCNN_ResNet50_FPN_V2_Weights.COCO_V.onnx')
model = ONNX_model('fcos.onnx')

### Test

In [None]:
predictions = model(batch)

In [None]:
predictions[0]

In [None]:
annotated_imgs = [annotate(img, prediction, weights.meta['categories']) for img, prediction in zip(imgs, predictions)]

In [None]:
i=0
annotated_imgs[i].resize([i//4 for i in annotated_imgs[i].size])

In [None]:
str(FCOS_ResNet50_FPN_Weights.DEFAULT)

## Test on Batch of same size image 
(Incomplete)

In [None]:
model, wieghts = get_model_and_weights()

In [None]:
batch_size = 5
dummy_input = torch.randn(batch_size, 3, 1000,1000)
torch.onnx.export(
    model,
    dummy_input,
    "fcos.onnx",
    export_params=True,
    opset_version=11,
    do_constant_folding=True,
    input_names=['input'],
    output_names=['boxes', 'scores', 'labels'], 
    # output_names=['batch_size'], 
    dynamic_axes={
        'input': {2: 'height', 3: 'width'},  # Make batch, height, and width dynamic
        # 'output': {0: 'batch_size'}  # Example: make output batch dimension dynamic
    }
)


In [None]:
import onnx
onnx_model = onnx.load("fcos.onnx")
onnx.checker.check_model(onnx_model)

In [None]:
import onnxruntime as ort
import numpy as np 
import torch
class ONNX_model: 
    def __init__(self, model_file): 
        self.session = ort.InferenceSession(model_file, provider_options=ort.get_available_providers()) # from https://onnxruntime.ai/docs/api/python/tutorial.html
    def infer(self, input_img): 
        input_img = np.asarray(input_img)
        input_img = input_img.reshape(1, *input_img.shape)
        input_name = self.session.get_inputs()[0].name
        output_names = [out.name for out in self.session.get_outputs()] 
        outputs = self.session.run(output_names, {input_name: input_img}) 
        return { 
            name: torch.from_numpy(op) for name, op in zip(output_names, outputs) 
        }
    def infer_batch(self, input_img): 
        input_img = np.asarray(input_img)
        input_name = self.session.get_inputs()[0].name
        output_names = [out.name for out in self.session.get_outputs()] 
        outputs = self.session.run(output_names, {input_name: input_img}) 
        return outputs 
        # return { 
        #     name: torch.from_numpy(op) for name, op in zip(output_names, outputs) 
        # }
    def __call__(self, input_batch):
        return [self.infer(img) for img in input_batch] 

In [None]:
img_files = ["pexels-jeffrey-czum-2346165.jpg", "pexels-lam-kiên-15008127.jpg"]
# img_files = files[:5] 
imgs = [decode_image(img_file) for img_file in img_files]

In [None]:
batch = 

# Combined Implementation

In [None]:
import os 
import torch
import logging

def get_model_and_weights(builder_func=fcos_resnet50_fpn, weight_class=FCOS_ResNet50_FPN_Weights, get_onnx=False): 
    logger = logging.getLogger("model_and_weights")
    weights = weight_class.DEFAULT
    if not get_onnx: 
        model = builder_func(weights=weights, box_score_thresh=0.9)
        model.eval()
        return model, weights
    else: 
        model_file = str(weights.DEFAULT)+'.onnx' 
        if os.path.exists(model_file): 
            model = ONNX_model(model_file) 
            return model, weights 
        else: 
            logger.info(f"{model_file} not found, exporting pytorch model to {model_file}") 
            model, _ = get_model_and_weights(builder_func, weight_class, get_onnx=False) 
            dummy_input = torch.randn(1, 3, 1000,1000)
            torch.onnx.export(
                model,
                dummy_input,
                model_file,
                export_params=True,
                opset_version=11,
                do_constant_folding=True,
                input_names=['input'],
                output_names=list(model(dummy_input)[0].keys()) ,
                dynamic_axes={
                    'input': {0: 'batch_size', 2: 'height', 3: 'width'},  # Make batch, height, and width dynamic
                    'output': {0: 'batch_size'}  # Example: make output batch dimension dynamic
                }
            )
            return get_model_and_weights(builder_func, weight_class, get_onnx=True) 
            
            

# def detection2D(files, output_dir='output', batch_size=5, use_onnx=False):
#     logger = logging.getLogger("detection2D")
#     if not os.path.isdir(output_dir): 
#         logger.warning(f"Output directory '{output_dir}' was not found. Creating directory '{output_dir}'")
#         os.mkdir(output_dir)
#     fnames = [os.path.basename(f) for f in files]
#     save_files = [os.path.join(output_dir, p) for p in fnames]
#     imgs = [(decode_image(f), sf) for f, sf in zip(files, save_files)]
#     batches = batched(imgs, batch_size) 
#     batches = (tuple(zip(*b)) for b in batches)
    
#     logger.info("Loading model")
#     model, weights = get_model_and_weights(fasterrcnn_resnet50_fpn_v2, FasterRCNN_ResNet50_FPN_V2_Weights, use_onnx)
#     preprocess = weights.transforms()

    
    
#     logger.info("Starting detection")
#     for batch in tqdm(batches, total=math.ceil(len(imgs)/batch_size)):
#         images, save_names = batch
#         detect_annotate_save(images, save_names, preprocess, model, weights.meta["categories"])


def detection2D(files, output_dir='output', batch_size=5, use_onnx=False):
    logger = logging.getLogger("detection2D")
    if not os.path.isdir(output_dir): 
        logger.warning(f"Output directory '{output_dir}' was not found. Creating directory '{output_dir}'")
        os.mkdir(output_dir)
    fnames = [os.path.basename(f) for f in files]
    save_files = [os.path.join(output_dir, p) for p in fnames]
    imgs = [(decode_image(f), sf) for f, sf in zip(files, save_files)]
    batches = batched(imgs, batch_size) 
    batches = (tuple(zip(*b)) for b in batches)
    
    logger.info("Loading model")
    model, weights = get_model_and_weights(fasterrcnn_resnet50_fpn_v2, FasterRCNN_ResNet50_FPN_V2_Weights, use_onnx)
    preprocess = weights.transforms()
    def infer(images):
        processed_images = [preprocess(img) for img in images] 
        return model(processed_images) 

    logger.info("Starting detection")
    for batch in tqdm(batches, total=math.ceil(len(imgs)/batch_size)):
        images, save_names = batch
        predictions = infer(images) 
        
        # detect_annotate_save(images, save_names, preprocess, model, weights.meta["categories"]) 
        annotated_images = annotate_batch(images, predictions, weights.meta['categories']) 
        [img.save(sn) for img, sn in zip(annotated_images, save_names)]
    

In [None]:
files = [os.path.join("pics", f) for f in os.listdir("pics")] 


In [None]:
logging.basicConfig(level=logging.INFO)
detection2D(files,output_dir="output_onnx",batch_size=2, use_onnx=True) 
# detection2D(files,output_dir="output_torch",batch_size=2, use_onnx=False) 

In [None]:
logging.basicConfig(level=logging.INFO)
detection2D(files,output_dir="output_torch",batch_size=2, use_onnx=False) 

# Checkpoint

In [None]:
import os
import random
import math
from torchvision.io.image import decode_image
from torchvision.models.detection import fasterrcnn_resnet50_fpn_v2, FasterRCNN_ResNet50_FPN_V2_Weights
from torchvision.models.detection import fcos_resnet50_fpn, FCOS_ResNet50_FPN_Weights
from torchvision.utils import draw_bounding_boxes
from torchvision.transforms.functional import to_pil_image
import torch
import onnxruntime as ort
import numpy as np 
from tqdm import tqdm
import logging

def batched(array, size):
    array_iter = iter(array) 
    while True: 
        b = []
        try: [b.append(next(array_iter)) for _ in range(size)]
        except: StopIteration
        if b: yield b 
        else: break

def annotate(img, prediction, categories):
    labels = [categories[i] for i in prediction["labels"]]
    boxes = prediction['boxes']
    box = draw_bounding_boxes(
        image=img, 
        boxes=boxes, 
        labels=labels, 
        colors="red", 
        width=4, 
        font="Helvetica.ttf" if os.path.exists("Helvetica.ttf") else None, 
        font_size=30
    )
    im = to_pil_image(box.detach())
    return im

def annotate_batch(images, predictions, categories, thresh=0.5): 
    annotated_images = [] 
    for image, prediction in zip(images, predictions): 
        prediction = {
            k: prediction[k][prediction['scores']>thresh] for k in prediction
        }
        annotated_images.append(annotate(image, prediction, categories)) 
    return annotated_images

def filter_prediction(prediction, thresh=0.5): 
    if not isinstance(prediction, dict):
        return [filter_predictions(pred, thresh) for pred in predictions]
    return {
        k: prediction[k][prediction['scores']>thresh] for k in prediction
    }


class ONNX_model: 
    def __init__(self, model_file): 
        self.session = ort.InferenceSession(model_file, provider_options=ort.get_available_providers()) # from https://onnxruntime.ai/docs/api/python/tutorial.html
    def infer(self, input_img): 
        input_img = np.asarray(input_img)
        input_img = input_img.reshape(1, *input_img.shape)
        input_name = self.session.get_inputs()[0].name
        output_names = [out.name for out in self.session.get_outputs()] 
        outputs = self.session.run(output_names, {input_name: input_img}) 
        return { 
            name: torch.from_numpy(op) for name, op in zip(output_names, outputs) 
        }
    def __call__(self, input_batch):
        return [self.infer(img) for img in input_batch] 


def get_model_and_weights(builder_func=fcos_resnet50_fpn, weight_class=FCOS_ResNet50_FPN_Weights, get_onnx=False): 
    logger = logging.getLogger("model_and_weights")
    weights = weight_class.DEFAULT
    if not get_onnx: 
        model = builder_func(weights=weights, box_score_thresh=0.9)
        model.eval()
        return model, weights
    else: 
        model_file = str(weights.DEFAULT)+'.onnx' 
        if os.path.exists(model_file): 
            model = ONNX_model(model_file) 
            return model, weights 
        else: 
            logger.info(f"{model_file} not found, exporting pytorch model to {model_file}") 
            model, _ = get_model_and_weights(builder_func, weight_class, get_onnx=False) 
            dummy_input = torch.randn(1, 3, 1000,1000)
            torch.onnx.export(
                model,
                dummy_input,
                model_file,
                export_params=True,
                opset_version=11,
                do_constant_folding=True,
                input_names=['input'],
                output_names=list(model(dummy_input)[0].keys()) ,
                dynamic_axes={
                    'input': {0: 'batch_size', 2: 'height', 3: 'width'},  # Make batch, height, and width dynamic
                    'output': {0: 'batch_size'}  # Example: make output batch dimension dynamic
                }
            )
            return get_model_and_weights(builder_func, weight_class, get_onnx=True) 
            

def detection2D(files, output_dir='output', batch_size=5, builder_and_weights=(fasterrcnn_resnet50_fpn_v2, FasterRCNN_ResNet50_FPN_V2_Weights), use_onnx=False):
    logger = logging.getLogger("detection2D")
    if not os.path.isdir(output_dir): 
        logger.warning(f"Output directory '{output_dir}' was not found. Creating directory '{output_dir}'")
        os.mkdir(output_dir)
    fnames = [os.path.basename(f) for f in files]
    save_files = [os.path.join(output_dir, p) for p in fnames]
    imgs = [(decode_image(f), sf) for f, sf in zip(files, save_files)]
    batches = batched(imgs, batch_size) 
    batches = (tuple(zip(*b)) for b in batches)
    
    logger.info("Loading model")
    model, weights = get_model_and_weights(*builder_and_weights, use_onnx)
    preprocess = weights.transforms()
    def infer(images):
        processed_images = [preprocess(img) for img in images] 
        return model(processed_images) 

    logger.info("Starting detection")
    for batch in tqdm(batches, total=math.ceil(len(imgs)/batch_size)):
        images, save_names = batch
        predictions = infer(images) 
        
        # detect_annotate_save(images, save_names, preprocess, model, weights.meta["categories"]) 
        annotated_images = annotate_batch(images, predictions, weights.meta['categories']) 
        [img.save(sn) for img, sn in zip(annotated_images, save_names)]
    

In [None]:
files = [os.path.join("pics", f) for f in os.listdir("pics")] 


In [None]:
logging.basicConfig(level=logging.INFO)
detection2D(files,output_dir="output_onnx",batch_size=2, use_onnx=True) 
# detection2D(files,output_dir="output_torch",batch_size=2, use_onnx=False) 

In [None]:
logging.basicConfig(level=logging.INFO)
detection2D(files[:5],output_dir="output_onnx_fcos_test",batch_size=2, builder_and_weights=(fcos_resnet50_fpn, FCOS_ResNet50_FPN_Weights), use_onnx=True)

# Video Inference

In [None]:
import cv2 
def get_infer_method(model, preproc):
    def infer(imgs): 
        proced = [preproc(img) for img in imgs] 
        return model(proced) 
    return infer
def cv2torch(frame): 
    return torch.from_numpy(
        np.transpose(
            cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), 
            (2,0,1) 
        )
    )
def pil2cv2(img): 
    return cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)
def frame_generator(cap): 
    ret, frame = cap.read()
    while ret: 
        yield frame
        ret, frame = cap.read()

In [None]:
def video_detection2d(video_path, output_path, batch_size=2):
    logger = logging.getLogger("video_detection2d")
    output_dir = os.path.dirname(output_path) 
    if not os.path.isdir(output_dir): 
        logger.info(f"output dir {output_dir} not found. Creating output dir {output_dir}") 
        os.mkdir(output_dir) 
        
    cap = cv2.VideoCapture(video_path) 
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) 
    # print(cap.get(cv2.CAP_PROP_FRAME_WIDTH),cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))

    model, weights = get_model_and_weights(fcos_resnet50_fpn, FCOS_ResNet50_FPN_Weights, get_onnx=True) 
    infer = get_infer_method(model, weights.transforms())
    
    batched_frames = batched(frame_generator(cap), batch_size)
    for i, batch in enumerate(tqdm(batched_frames, total=math.ceil(frame_count/batch_size))): 
        # batch = next(iter(batched_frames))
        if i%2==0: continue
        try: 
            imgs = [cv2torch(img) for img in batch] 
            preds = infer(imgs) 
            annotated_batch = annotate_batch(imgs, preds, weights.meta['categories'])
            out_frames = [pil2cv2(img) for img in annotated_batch]  
            for frame in out_frames: out.write(frame) 
        except KeyboardInterrupt: 
            break
        
    
    cap.release()
    out.release()


In [None]:
video_detection2d("vids/2048206-hd_1920_1080_30fps.mp4", "vids/out.mp4")