In [1]:
import os
import glob
from PIL import Image
import onnx
import random
import gc
import onnxruntime as ort
from onnxruntime.quantization import quantize_static, CalibrationDataReader, QuantType, QuantFormat
from onnxruntime.quantization.calibrate import CalibrationMethod

import time
import onnxruntime as ort
import numpy as np
import pandas as pd

from tqdm import tqdm

!python3 -m onnxruntime.quantization.preprocess --input model_efficientnetv2s_11.onnx --output model_efficientnetv2s_11-infer.onnx

In [2]:
IMG_SIZE = 224
MEAN = np.array([0.485, 0.456, 0.406], dtype=np.float32).reshape(3,1,1)
STD  = np.array([0.229, 0.224, 0.225], dtype=np.float32).reshape(3,1,1)

# Preprocessing function for the images
def preprocess(p):
    img = Image.open(p).convert("RGB").resize((IMG_SIZE, IMG_SIZE), Image.BILINEAR)
    x = np.asarray(img, dtype=np.float32) / 255.0
    x = (x.transpose(2,0,1) - MEAN) / STD
    img.close()
    return x[np.newaxis, :]  # NCHW

# Calibration reader class with balanced class sampling
class RandomSampleCalibrationDataReader(CalibrationDataReader):
    def __init__(self, img_dir, model_path, batch_size=1, sample_size=1000, seed=42, csv_path=None):
        sess = ort.InferenceSession(model_path, providers=["CPUExecutionProvider"])
        self.input_name = sess.get_inputs()[0].name
        del sess
        gc.collect()
        
        random.seed(seed)
        
        if csv_path:
            if not os.path.exists(csv_path):
                print(f"Warning: CSV file not found at '{csv_path}'")
                print("Falling back to directory-based sampling")
            else:
                try:
                    # Load paths from CSV with balanced sampling
                    # Try semicolon separator first (common in European locales), then comma
                    try:
                        df = pd.read_csv(csv_path, sep=',')
                    except:
                        df = pd.read_csv(csv_path)
                    
                    # Ensure required columns exist
                    if 'parking' not in df.columns or 'path' not in df.columns:
                        raise ValueError(f"CSV must have 'parking' and 'path' columns. Found: {df.columns.tolist()}")
                    
                    # Group by class
                    classes = df['parking'].unique()
                    samples_per_class = sample_size // len(classes)
                    
                    balanced_paths = []
                    class_counts = {}  # Track counts during sampling
                    
                    for cls in classes:
                        cls_df = df[df['parking'] == cls]
                        n_samples = min(len(cls_df), samples_per_class)
                        sampled = cls_df.sample(n=n_samples, random_state=seed)
                        sampled_paths = sampled['path'].tolist()
                        balanced_paths.extend(sampled_paths)
                        class_counts[cls] = len(sampled_paths)
                    
                    # Shuffle the balanced dataset
                    random.shuffle(balanced_paths)
                    self.paths = balanced_paths[:sample_size]
                    
                    print(f"Loaded {len(self.paths)} balanced samples from CSV: {csv_path}")
                    for cls in classes:
                        print(f"  Class '{cls}': {class_counts[cls]} samples")
                    
                    # Successfully loaded from CSV, skip fallback
                    self.batch_size = batch_size
                    self.idx = 0
                    return
                    
                except Exception as e:
                    print(f"Error loading CSV: {e}")
                    print("Falling back to directory-based sampling")
        
        # Fallback to directory-based balanced sampling
        # Try to detect class subdirectories
        subdirs = [d for d in os.listdir(img_dir) if os.path.isdir(os.path.join(img_dir, d))]
        
        if subdirs and len(subdirs) > 1:
            # Sample from subdirectories (assuming they represent classes)
            samples_per_class = sample_size // len(subdirs)
            balanced_paths = []
            
            for subdir in subdirs:
                subdir_path = os.path.join(img_dir, subdir)
                class_paths = sorted(sum([glob.glob(os.path.join(subdir_path, f"*{ext}")) 
                                         for ext in [".jpg",".jpeg",".png"]], []))
                n_samples = min(len(class_paths), samples_per_class)
                balanced_paths.extend(random.sample(class_paths, n_samples))
            
            random.shuffle(balanced_paths)
            self.paths = balanced_paths[:sample_size]
            print(f"Loaded {len(self.paths)} balanced samples from {len(subdirs)} class directories")
        else:
            # No class structure detected, use random sampling
            self.paths = sorted(sum([glob.glob(os.path.join(img_dir, f"*{ext}")) 
                                    for ext in [".jpg",".jpeg",".png"]], []))
            self.paths = random.sample(self.paths, min(len(self.paths), sample_size))
            print(f"Warning: No class structure detected. Using {len(self.paths)} random samples")
        
        self.batch_size = batch_size
        self.idx = 0

    def get_next(self):
        if self.idx >= len(self.paths):
            return None
        
        batch = []
        while len(batch) < self.batch_size and self.idx < len(self.paths):
            img_data = preprocess(self.paths[self.idx])
            self.idx += 1
            if img_data is not None:
                batch.append(img_data)
        
        if not batch:
            return None
            
        data = np.concatenate(batch, axis=0)
        
        # Progress indicator
        if self.idx % 50 == 0:
            print(f"Processed {self.idx}/{len(self.paths)} calibration images")
        
        return {self.input_name: data}

In [17]:
fp32_model = "model_efficientnetv2s_11-infer.onnx"          
int8_model = "efficientnetv2s_11_int8_qdq.onnx"      
calib_dir  = "/home/jupyter/mnt/datasets/scooters_data/"
calib_csv = "data/parking_label_train.csv"

dr = RandomSampleCalibrationDataReader(calib_dir, fp32_model, batch_size=24, sample_size=300, seed=42, csv_path=calib_csv)

# Perform the quantization
quantize_static(
        model_input=fp32_model,
        model_output=int8_model,
        calibration_data_reader=dr,
        calibrate_method=CalibrationMethod.MinMax,  # You can also try Entropy or MinMax
        per_channel=False, # because opset should be 11, not 13<
        weight_type=QuantType.QInt8,                    
        activation_type=QuantType.QUInt8,               
        reduce_range=False,
        quant_format=QuantFormat.QDQ,
        op_types_to_quantize=["Conv", "MatMul"]
    )
# Cleanup and validate
del dr
gc.collect()

Loaded 300 balanced samples from CSV: data/parking_label_train.csv
  Class 'inside': 100 samples
  Class 'hard_to_say': 100 samples
  Class 'outside': 100 samples
Processed 300/300 calibration images


0

In [18]:
# Validate model (skip strict opset checks for quantized models)
try:
    model = onnx.load(int8_model)
    onnx.checker.check_model(model, skip_opset_compatibility_check=True)
    print("✓ Quantized model saved and validated:", int8_model)
except Exception as e:
    # Model might be valid but checker is too strict - verify it loads in ONNX Runtime
    print(f"Note: ONNX checker warning (can be ignored): {e}")
    try:
        sess = ort.InferenceSession(int8_model, providers=["CPUExecutionProvider"])
        print("✓ Quantized model saved and loadable:", int8_model)
        del sess
    except Exception as load_error:
        print(f"✗ Error: Model cannot be loaded: {load_error}")
        raise

✓ Quantized model saved and validated: efficientnetv2s_11_int8_qdq.onnx


### Check quality 

In [19]:
img_path = "/home/jupyter/mnt/datasets/scooters_data/b698239449d24ded86cace09b20841b7.jpeg"
x = preprocess(img_path)

def run(model):
    sess = ort.InferenceSession(model, providers=["CPUExecutionProvider"])
    name_in  = sess.get_inputs()[0].name
    name_out = sess.get_outputs()[0].name
    for _ in range(5): sess.run([name_out], {name_in: x})
    t = []
    for _ in range(20):
        s = time.time(); y = sess.run([name_out], {name_in: x}); t.append(time.time()-s)
    return np.array(t), y[0]

t_fp32, y_fp32 = run("model_efficientnetv2s_11-infer.onnx")
t_int8, y_int8 = run("efficientnetv2s_11_int8_qdq.onnx")

print("FP32 ms median:", np.median(t_fp32)*1000)
print("INT8 ms median:", np.median(t_int8)*1000)
print("Cosine sim logits:", (y_fp32.flatten()@y_int8.flatten())/(np.linalg.norm(y_fp32.flatten())*np.linalg.norm(y_int8.flatten())))

FP32 ms median: 18.711328506469727
INT8 ms median: 22.83501625061035
Cosine sim logits: 0.99468005


In [3]:
CSV_PATH= "data/parking_label_test.csv"

IMG_SIZE = 224
NUM_CLASSES = 3
N_SAMPLES = 8978

LABEL_MAP = {
    "hard_to_say": 0,
    "inside": 1,
    "outside": 2
}

def preprocess_image_from_path(img_path, img_size=IMG_SIZE):
    try:
        img = Image.open(img_path).convert("RGB")
        img = img.resize((img_size, img_size))
        img_array = np.array(img).astype(np.float32) / 255.0
        img_array = np.transpose(img_array, (2, 0, 1))
        img_array = np.expand_dims(img_array, axis=0)
        return img_array
    except Exception as e:
        print(f"[Ошибка] Не удалось обработать {img_path}: {e}")
        return None


def predict_image(img_array):
    outputs = session.run([output_name], {input_name: img_array})
    preds = outputs[0]
    predicted_class = int(np.argmax(preds, axis=1)[0])
    return predicted_class

In [22]:
# model_efficientnetv2s
from concurrent.futures import ThreadPoolExecutor, as_completed

ONNX_PATH = "model_efficientnetv2s_11.onnx"
N_WORKERS = 30  # Adjust based on your CPU cores

session = ort.InferenceSession(ONNX_PATH, providers=["CPUExecutionProvider"])
input_name = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name

df = pd.read_csv(CSV_PATH)
df = df.iloc[:N_SAMPLES, :]
df = df.rename(columns={df.columns[0]: "parking", df.columns[1]: "path"})

def process_row(row):
    """Process a single row and return results"""
    label_str = str(row["parking"]).strip().lower()
    if label_str not in LABEL_MAP:
        return None, None, f"{label_str=} not in LABEL_MAP"
    
    true_label = LABEL_MAP[label_str]
    img_path = row["path"]
    
    img_array = preprocess_image_from_path(img_path)
    if img_array is None:
        return None, None, f"{img_path} preprocessing failed"
    
    pred = predict_image(img_array)
    return true_label, pred, None

# Parallel processing
correct = 0
total = 0
per_class_correct = {0: 0, 1: 0, 2: 0}
per_class_total = {0: 0, 1: 0, 2: 0}

with ThreadPoolExecutor(max_workers=N_WORKERS) as executor:
    # Submit all tasks
    futures = {executor.submit(process_row, row): idx for idx, row in df.iterrows()}
    
    # Process results as they complete
    for future in tqdm(as_completed(futures), total=len(futures), desc="Обработка изображений"):
        true_label, pred, error = future.result()
        
        if error:
            print(error)
            continue
        
        if pred == true_label:
            correct += 1
            per_class_correct[true_label] += 1
        per_class_total[true_label] += 1
        total += 1

accuracy = correct / total * 100 if total > 0 else 0
print("\n========= РЕЗУЛЬТАТ =========")
print(f"Всего изображений: {total}")
print(f"Общая точность: {accuracy:.2f}%\n")

print("Точность по классам:")
for k, v in per_class_total.items():
    if v > 0:
        acc = per_class_correct[k] / v * 100
        print(f"Класс {k}: {acc:.2f}% ({per_class_correct[k]}/{v})")

Обработка изображений:  26%|██▋       | 2372/8978 [00:46<02:10, 50.79it/s] 


KeyboardInterrupt: 

In [4]:
# efficientnetv2s_int8_qdq
from concurrent.futures import ThreadPoolExecutor, as_completed

ONNX_PATH = "efficientnetv2s_11_int8_qdq.onnx"
N_WORKERS = 30  # Adjust based on your CPU cores

session = ort.InferenceSession(ONNX_PATH, providers=["CPUExecutionProvider"])
input_name = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name

df = pd.read_csv(CSV_PATH)
df = df.iloc[:N_SAMPLES, :]
df = df.rename(columns={df.columns[0]: "parking", df.columns[1]: "path"})

def process_row(row):
    """Process a single row and return results"""
    label_str = str(row["parking"]).strip().lower()
    if label_str not in LABEL_MAP:
        return None, None, f"{label_str=} not in LABEL_MAP"
    
    true_label = LABEL_MAP[label_str]
    img_path = row["path"]
    
    img_array = preprocess_image_from_path(img_path)
    if img_array is None:
        return None, None, f"{img_path} preprocessing failed"
    
    pred = predict_image(img_array)
    return true_label, pred, None

# Parallel processing
correct = 0
total = 0
per_class_correct = {0: 0, 1: 0, 2: 0}
per_class_total = {0: 0, 1: 0, 2: 0}

with ThreadPoolExecutor(max_workers=N_WORKERS) as executor:
    # Submit all tasks
    futures = {executor.submit(process_row, row): idx for idx, row in df.iterrows()}
    
    # Process results as they complete
    for future in tqdm(as_completed(futures), total=len(futures), desc="Обработка изображений"):
        true_label, pred, error = future.result()
        
        if error:
            print(error)
            continue
        
        if pred == true_label:
            correct += 1
            per_class_correct[true_label] += 1
        per_class_total[true_label] += 1
        total += 1

accuracy = correct / total * 100 if total > 0 else 0
print("\n========= РЕЗУЛЬТАТ =========")
print(f"Всего изображений: {total}")
print(f"Общая точность: {accuracy:.2f}%\n")

print("Точность по классам:")
for k, v in per_class_total.items():
    if v > 0:
        acc = per_class_correct[k] / v * 100
        print(f"Класс {k}: {acc:.2f}% ({per_class_correct[k]}/{v})")


Обработка изображений: 100%|██████████| 8978/8978 [01:51<00:00, 80.88it/s] 


Всего изображений: 8978
Общая точность: 65.35%

Точность по классам:
Класс 0: 27.39% (582/2125)
Класс 1: 90.30% (4963/5496)
Класс 2: 23.73% (322/1357)



