In [None]:
!pip install ultralytics onnx onnxruntime onnxruntime-tools opencv-python numpy tqdm

In [2]:
cp /kaggle/input/models/rambhattaee22b047/best3/pytorch/default/1/best3.pt /kaggle/working/best3.pt

In [None]:
!yolo export model=best3.pt format=onnx

In [24]:
!python -m onnxruntime.quantization.preprocess --input best3.onnx --output pre2.onnx --skip_symbolic_shape TRUE

In [None]:
import argparse
import os
import time
import cv2
import numpy as np
import onnxruntime
import shutil # <--- ADD THIS IMPORT
from onnxruntime.quantization import QuantFormat, QuantType, quantize_static, CalibrationDataReader, CalibrationMethod

# ... [KEEP YOUR YOLODataReader AND benchmark FUNCTIONS EXACTLY THE SAME] ...



# --- 1. YOLO DATA READER ---
def _preprocess_images(images_folder: str, height: int, width: int):
    """Loads and preprocesses images strictly for YOLO (RGB, 0-1 norm, CHW)."""
    valid_extensions = {".jpg", ".jpeg", ".png"}
    image_names = [f for f in os.listdir(images_folder) if any(f.lower().endswith(ext) for ext in valid_extensions)]
    
    unconcatenated_batch_data = []

    for image_name in image_names:
        image_filepath = os.path.join(images_folder, image_name)
        img = cv2.imread(image_filepath)
        if img is None: continue
        
        # Exact YOLO Preprocessing for 640p
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (width, height))
        img = img.astype(np.float32) / 255.0
        nchw_data = np.transpose(img, (2, 0, 1)) 
        
        unconcatenated_batch_data.append(nchw_data)

    return unconcatenated_batch_data


# --- 1. MEMORY-SAFE LAZY DATA READER ---
class YOLODataReader(CalibrationDataReader):
    def __init__(self, calibration_image_folder: str, model_path: str):
        self.image_folder = calibration_image_folder
        valid_extensions = {".jpg", ".jpeg", ".png"}
        
        # Store only the filenames in RAM, not the actual images!
        self.image_names = [
            f for f in os.listdir(calibration_image_folder) 
            if any(f.lower().endswith(ext) for ext in valid_extensions)
        ]
        self.iterator = iter(self.image_names)

        # Dynamically read the 640x640 shape
        session = onnxruntime.InferenceSession(model_path, providers=['CPUExecutionProvider'])
        shape = session.get_inputs()[0].shape
        self.height, self.width = shape[2], shape[3]
        self.input_name = session.get_inputs()[0].name

    def get_next(self):
        # ONNX calls this to get images ONE AT A TIME
        try:
            image_name = next(self.iterator)
        except StopIteration:
            return None # Calibration is done

        image_filepath = os.path.join(self.image_folder, image_name)
        img = cv2.imread(image_filepath)
        
        # Failsafe for corrupted files
        if img is None:
            return self.get_next() 
            
        # YOLO Preprocessing 
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (self.width, self.height))
        img = img.astype(np.float32) / 255.0
        nchw_data = np.transpose(img, (2, 0, 1)) 
        
        return {self.input_name: np.expand_dims(nchw_data, axis=0)}

    def rewind(self):
        # Resets the iterator
        self.iterator = iter(self.image_names)

# --- 2. BENCHMARK TOOL ---
def benchmark(model_path):
    opts = onnxruntime.SessionOptions()
    opts.intra_op_num_threads = 4  # Simulate quad-core
    
    session = onnxruntime.InferenceSession(model_path, sess_opts=opts, providers=['CPUExecutionProvider'])
    input_name = session.get_inputs()[0].name
    shape = session.get_inputs()[0].shape
    batch = shape[0] if isinstance(shape[0], int) else 1
    
    total = 0.0
    runs = 15 
    input_data = np.zeros((batch, shape[1], shape[2], shape[3]), np.float32)
    
    print(f"Benchmarking shape: {input_data.shape}")
    
    for _ in range(3): # Warmup
        _ = session.run([], {input_name: input_data})
        
    for i in range(runs):
        start = time.perf_counter()
        _ = session.run([], {input_name: input_data})
        end = (time.perf_counter() - start) * 1000
        total += end
    
    total /= runs
    print(f"Avg Latency: {total:.2f} ms | Est. FPS: {1000/total:.2f}\n")

# --- 3. MAIN EXECUTION ---
def main():
    # 1. Define Paths
    read_only_model = "/kaggle/input/models/rambhattaee22b047/pre-onnx/onnx/default/1/pre.onnx"
    writable_model = "/kaggle/working/pre2.onnx"
    output_int8_model = "/kaggle/working/int8_best3.onnx"
    calibration_dir = "/kaggle/input/datasets/rambhattaee22b047/calibration2"
    
    # 2. Copy the model to a writable directory so ONNX can create its temporary files
    print(f"Copying model to writable directory: {writable_model}")
    shutil.copy2(read_only_model, writable_model)

    print("Initializing YOLO Data Reader...")
    # Update to use the writable model
    dr = YOLODataReader(calibration_dir, writable_model)

    print("Starting high-accuracy static quantization (QDQ S8S8 + Entropy + Per-Channel)...")
    
    cpu_extra_options = {
        "ActivationSymmetric": False, 
        "WeightSymmetric": True,
        "CalibMaxIntermediateOutputs": 10
    }
    
    nodes_to_exclude = [
        '/model.23/Concat_1',
        '/model.23/Sigmoid',
        '/model.23/Concat_3',
        '/model.23/Concat_6',
        '/model.23/Split',
        '/model.23/ReduceMax',
        '/model.23/TopK',
        '/model.23/GatherElements',
        '/model.23/Gather_3',
    ]
    
    # 3. Run Quantization on the writable model
    quantize_static(
        model_input=writable_model,
        model_output=output_int8_model,
        calibration_data_reader=dr,
        quant_format=QuantFormat.QDQ,        
        weight_type=QuantType.QInt8,         
        activation_type=QuantType.QUInt8,     
        per_channel=True,                    
        calibrate_method=CalibrationMethod.Entropy, 
        extra_options=cpu_extra_options,
        nodes_to_exclude=nodes_to_exclude
    )
    
    print("Quantization complete!\n")

    print("--- BENCHMARK: FP32 MODEL ---")
    benchmark(writable_model)

    print("--- BENCHMARK: INT8 MODEL ---")
    benchmark(output_int8_model)

if __name__ == "__main__":
    main()

In [None]:
from ultralytics import YOLO
import os
import shutil
from glob import glob
from IPython.display import FileLink
from tqdm import tqdm

# --- CONFIGURATION ---
INT8_MODEL_PATH = '/kaggle/working/int8_best3.onnx'
TEST_IMG_DIR = '/kaggle/input/datasets/rambhattaee22b047/calibration2'

# We set the exact output directory where we want the images
# We will force Ultralytics to NOT create subfolders like 'predict'
OUTPUT_DIR = '/kaggle/working/int8_ultralytics_results'

# 1. Load the INT8 Model
print(f"[*] Loading model: {INT8_MODEL_PATH}")
model = YOLO(INT8_MODEL_PATH, task='detect')

# Clean up any old runs so we don't zip old data
if os.path.exists(OUTPUT_DIR): 
    shutil.rmtree(OUTPUT_DIR)

image_files = sorted(glob(os.path.join(TEST_IMG_DIR, "*.jpg")))[:50]

print("[*] Running INT8 via Ultralytics engine (CPU)...")
for img_path in tqdm(image_files):
    model.predict(
        source=img_path,
        conf=0.1, 
        device='cpu',
        save=True,
        # This combination forces it to save directly to OUTPUT_DIR without making 'predict' subfolders
        project=OUTPUT_DIR, 
        name='', 
        exist_ok=True,
        verbose=False # Keeps the notebook output clean
    )

# 3. Zip & Download
print(f"[*] Zipping results from {OUTPUT_DIR}...")
# shutil.make_archive(base_name, format, root_dir)
zip_path = shutil.make_archive('/kaggle/working/int8_ultralytics_results', 'zip', OUTPUT_DIR)
print(f"âœ… Created zip at: {zip_path}")

# Display download link
FileLink(r'int8_ultralytics_results.zip')