# MedCellTypes - YOLO Real-Time Inference

**Ref:**
1. https://docs.ultralytics.com/modes/predict/#inference-sources

In [0]:
data = {
  "inputs" : ["Hello, I'm a language model,"],
  "params" : {"max_new_tokens": 10, "temperature": 1}
}

headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"}

response = requests.post(
    url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations", json=data, headers=headers
)

print(json.dumps(response.json()))

In [0]:
%pip install -U ultralytics==8.3.40 opencv-python==4.10.0.84 ray==2.39.0
# %pip install -U opencv-python==4.10.0.84 ray==2.39.0 git+https://github.com/BlitzBricksterYY-db/ultralytics.git@main
dbutils.library.restartPython()

## Real-Time Inference (With Model Serving)

**Ref**
1. [Serve on Google Vertex AI blog](https://datatonic.com/insights/deploying-image-segmentation-models-vertex-ai/)

In [0]:
from ultralytics import YOLO
import torch
import mlflow
import torch.distributed as dist
from ultralytics import settings
from mlflow.types.schema import Schema, ColSpec
from mlflow.models.signature import ModelSignature

input_schema = Schema(
    [
        ColSpec("string", "image_source"),
    ]
)
output_schema = Schema([ColSpec("string","class_name"),
                        ColSpec("integer","class_num"),
                        ColSpec("double","confidence")]
                       )

signature = ModelSignature(inputs=input_schema, 
                           outputs=output_schema)

# settings.update({"mlflow":False}) # Specifically, it disables the integration with MLflow. By setting the mlflow key to False, you are instructing the ultralytics library not to use MLflow for logging or tracking experiments.

mlflow.end_run()

In [0]:
############################################################################
## Create YOLOC class to capture model results d a predict() method ##
############################################################################

class YOLOC(mlflow.pyfunc.PythonModel):
  def __init__(self, point_file):
    self.point_file=point_file

  def load_context(self, context):
    from ultralytics import YOLO
    self.model = YOLO(context.artifacts['best_point'])

  def predict(self, context, model_input):
    # ref: https://docs.ultralytics.com/modes/predict/
    return self.model(model_input)
  

#: use model() vs model.predict()
# The model.predict() method in YOLO supports various arguments such as conf, iou, imgsz, device, and more. These arguments allow you to customize the inference process, setting parameters like confidence thresholds, image size, and the device used for computation. Detailed descriptions of these arguments can be found in the inference arguments section.

In [0]:
# load model back
import mlflow
import ultralytics
from ultralytics import YOLO

model_uri = 'runs:/7d7584ec603e4994b746582c64ca76aa/model' # this is pytorch flavour model
# Replace INPUT_EXAMPLE with your own input example to the model
# A valid input example is a data instance suitable for pyfunc prediction
test_path = "/Volumes/mmt_mlops_demos/cv/data/Nuclei_Instance_Dataset/yolo_dataset/val/images/"
input_data = [test_path + f for f in ["human_liver_39.png", "human_melanoma_05.png", "mouse_liver_35.png", "mouse_spleen_02.png"]]
print("We will inference on the following images:", input_data)
model = mlflow.pytorch.load_model(model_uri) # for pytorch flavour model, load using mlflow.pytorch
# results = model(input_data, stream=False) # enable stream produces a generator instead of a list. a List consumes more memory.

# #: (Don't Run this cell, it will fail). The below is not working for image data as 'input_data,' as it Failed to deserialize input string data to JSON. 
# # Verify the model with the provided input data using the logged dependencies.
# # For more details, refer to:
# # https://mlflow.org/docs/latest/models.html#validate-models-before-deployment
# mlflow.models.predict(
#     model_uri=model_uri,
#     input_data=input_data,
#     env_manager="uv",
# )

In [0]:
model()

In [0]:
with mlflow.start_run() as run:
  mlflow.pytorch.log_model(YOLO(str(model.trainer.best)), "model") # this succeeded


In [0]:
YOLO("runs:/4a8cef6f866d414096370c4282dbf55f/artifacts/model/artifacts/best.pt")

In [0]:
import mlflow.pyfunc
from PIL import Image
import torch

class YOLOPyFuncWrapper(mlflow.pyfunc.PythonModel):
    def load_context(self, context):
        self.model = torch.load(context.artifacts["model"])

    def predict(self, context, model_input):
        if isinstance(model_input, list):
            input_data = [Image.open(img) if isinstance(img, str) else img for img in model_input]
        else:
            input_data = [Image.open(model_input) if isinstance(model_input, str) else model_input]
        results = self.model(input_data, stream=False)
        return results
    
model_uri = 'runs:/7d7584ec603e4994b746582c64ca76aa/model'
with mlflow.start_run() as run:    
    # Save the model
    model_path = "yolo_pyfunc_model"
    mlflow.pyfunc.log_model(
        path=model_path,
        python_model=YOLOPyFuncWrapper(),
        artifacts={"model": model_uri}
    )

# # Load the model
# loaded_model = mlflow.pyfunc.load_model(model_path)

# # Predict using the model
# results = loaded_model.predict(input_data)

In [0]:
in_data = input_data[0]

In [0]:
in_data = Image.open(input_data[0])
import numpy as np

in_data_array = np.array(in_data)

In [0]:
in_data_array

In [0]:
model_uri = 'runs:/bdc5d9bc5c04443b97109032d36b4ed8/yolo_pyfunc_model' # logged pytorch model



# Verify the model with the provided input data using the logged dependencies.
# For more details, refer to:
# https://mlflow.org/docs/latest/models.html#validate-models-before-deployment
mlflow.models.predict(
    model_uri=model_uri,
    input_data=in_data_array
)

In [0]:
input_data

In [0]:
import mlflow
import numpy as np

model_uri = 'runs:/4a8cef6f866d414096370c4282dbf55f/model' # logged pyfunc model
model_uri = 'runs:/7d7584ec603e4994b746582c64ca76aa/model' # logged pytorch model


# Replace INPUT_EXAMPLE with your own input example to the model
# A valid input example is a data instance suitable for pyfunc prediction
input_data = np.random.rand(1, 3, 224, 224)  # Example input data, replace with your own
indata = {'image_source': input_data.tolist()}  # Convert numpy array to list for JSON serialization
print(indata)


# Verify the model with the provided input data using the logged dependencies.
# For more details, refer to:
# https://mlflow.org/docs/latest/models.html#validate-models-before-deployment
mlflow.models.predict(
    model_uri=model_uri,
    input_data=indata
)

In [0]:
import mlflow

model_uri = 'runs:/4a8cef6f866d414096370c4282dbf55f/model' # logged pyfunc model
# model_uri = 'runs:/7d7584ec603e4994b746582c64ca76aa/model' # logged pytorch model


# Replace INPUT_EXAMPLE with your own input example to the model
# A valid input example is a data instance suitable for pyfunc prediction
input_data
in_data = 
# Verify the model with the provided input data using the logged dependencies.
# For more details, refer to:
# https://mlflow.org/docs/latest/models.html#validate-models-before-deployment
mlflow.models.predict(
    model_uri=model_uri,
    input_data=input_data
)

In [0]:
os.environ['DATABRICKS_TOKEN'] = db_token = dbutils.secrets.get(scope="yyang_secret_scope", key="pat")

In [0]:
# import os
# import requests
# import numpy as np
# import pandas as pd
# import json

# def create_tf_serving_json(data):
#     return {'inputs': {name: data[name].tolist() for name in data.keys()} if isinstance(data, dict) else data.tolist()}

# def score_model(dataset):
#     url = 'https://adb-984752964297111.11.azuredatabricks.net/serving-endpoints/cell-instance-segmentation/invocations'
#     headers = {'Authorization': f'Bearer {os.environ.get("DATABRICKS_TOKEN")}', 'Content-Type': 'application/json'}
#     ds_dict = {'dataframe_split': dataset.to_dict(orient='split')} if isinstance(dataset, pd.DataFrame) else create_tf_serving_json(dataset)
#     data_json = json.dumps(ds_dict, allow_nan=True)
#     response = requests.request(method='POST', headers=headers, url=url, data=data_json)
#     if response.status_code != 200:
#         raise Exception(f'Request failed with status {response.status_code}, {response.text}')
#     return response.json()

In [0]:
input_data

In [0]:
import numpy as np
import json
import cv2

def convert_image_to_array(img_loc):
  img = cv2.imread(img_loc)
  img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  return img_rgb

# Convert each input_data to a numpy array
numpy_arrays = [convert_image_to_array(f) for f in input_data]


import json

# Convert numpy arrays to JSON
json_data = json.dumps([array.tolist() for array in numpy_arrays])

display(json_data)


In [0]:
url = 'https://adb-984752964297111.11.azuredatabricks.net/serving-endpoints/cell-instance-segmentation/invocations'
headers = {'Authorization': f'Bearer {os.environ.get("DATABRICKS_TOKEN")}', 'Content-Type': 'application/json'}
print(headers)

In [0]:
requests.request(method='POST', headers=headers, url=url, data=json_data, verify=True)