###Import Library

In [None]:
import os
import json
import time
import random
import numpy as np
import statistics
from pathlib import Path
from PIL import Image
import torch
from torch.utils.data import Dataset
from torchvision import transforms

In [None]:
import onnxruntime as ort
from mlperf_loadgen import (
    TestSettings, TestScenario, TestMode,
    QuerySample, QuerySampleResponse,
    StartTest, QuerySamplesComplete,
    ConstructQSL, ConstructSUT, DestroyQSL, DestroySUT,
    LogSettings, LoggingMode
)

In [None]:
# Set configuration
# Set to True to run accuracy test
RUN_ACCURACY = True
# Set to True to run performance test
RUN_PERFORMANCE = True

# Batch size for inference
BATCH_SIZE = 8

# Number of images to use (set to None to use all images)
# For quick tests, use a small number like 100 or 1000
NUM_IMAGES = None  # None = use all available images

# Minimum test duration in milliseconds (for performance test)
MIN_DURATION_MS = 10000  # 10 seconds for quick tests

# Minimum query count
MIN_QUERY_COUNT = 50  # Small count for quick tests

In [None]:
#Define Source paths

# === Paths ===
#Relative
base_dir = Path(os.getcwd())
dataset_dir = base_dir / "dataset/imagenet"
image_dir = dataset_dir / "val_flat"
map_file = dataset_dir / "val_map.txt"
results_dir = base_dir / "results/resnet50/Offline"
onnx_model_path = "resnet50_fp32.onnx"

#Absolutes
#base_dir = Path("C:/Users/iisc/npucloud_userdata/giulio-m-polimi/ryzenaisw/image_classification/mlperf_inference")
#dataset_dir = Path("C:/Users/iisc/npucloud_userdata/giulio-m-polimi/ryzenaisw/image_classification/mlperf_inference/dataset")
#image_dir = Path("C:/Users/iisc/npucloud_userdata/giulio-m-polimi/ryzenaisw/image_classification/mlperf_inference/dataset/imagenet/val_flat/")
#map_file = Path("C:/Users/iisc/npucloud_userdata/giulio-m-polimi/ryzenaisw/image_classification/mlperf_inference/dataset/imagenet/val_map.txt")
#results_dir = Path("C:/Users/iisc/npucloud_userdata/giulio-m-polimi/ryzenaisw/image_classification/mlperf_inference/results/resnet50/Offline")
#onnx_model_path = Path("C:/Users/iisc/npucloud_userdata/giulio-m-polimi/ryzenaisw/image_classification/mlperf_inference/resnet50_fp32.onnx")


# Ensure results directory exists
os.makedirs(results_dir, exist_ok=True)

# This ensures MLPerf logging files are created in the correct location
original_working_dir = os.getcwd()
os.chdir(results_dir)
#print(f"Changed working directory to: {os.getcwd()}")

###Daset

In [None]:
class ImagenetDataset:
    def __init__(self, image_paths, labels):
        self.image_paths = image_paths
        self.labels = labels
        self.cache = {}

    def __len__(self):
        return len(self.image_paths)

    def load_samples(self, indices):
        #print(f"📥 Loading {len(indices)} samples into RAM...")
        for idx in indices:
            img = Image.open(self.image_paths[idx]).convert("RGB")
            tensor = transform(img).unsqueeze(0).numpy()
            self.cache[idx] = (tensor, self.labels[idx])
        #print(" Sample loading complete")

    def unload_samples(self, indices):
        for idx in indices:
            self.cache.pop(idx, None)

    def get_sample(self, idx):
        return self.cache[idx]

In [None]:
# === Load val_map.txt ===
with open(map_file) as f:
    entries = [line.strip().split() for line in f]

# Keep track of the original dataset size
full_dataset_size = len(entries)

# === Apply NUM_IMAGES limitation if set ===
if NUM_IMAGES is not None and NUM_IMAGES < len(entries):
    print(f" Using subset of {NUM_IMAGES} images (from {len(entries)} total)")
    # Use consistent random sampling for reproducibility
    random.seed(42)
    entries = random.sample(entries, NUM_IMAGES)

image_paths = [image_dir / e[0] for e in entries]
ground_truth = [int(e[1]) for e in entries]
sample_indices = list(range(len(image_paths)))

print(f" Dataset size: {len(image_paths)} images")

# === Preprocessing ===
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

# === Dataset Creation ===
dataset = ImagenetDataset(image_paths, ground_truth)

###ONNX Runtime

In [None]:
available_providers = ort.get_available_providers()
print(f" Available ONNX providers: {available_providers}")

# Try to use NPU acceleration
npu_providers = [p for p in available_providers if 'NPU' in p or 'GPU' in p or 'Vitis' in p]
if npu_providers:
    providers = [npu_providers[0], "CPUExecutionProvider"]
    print(f" Using NPU acceleration with {npu_providers[0]}")
elif "VitisAIExecutionProvider" in available_providers:
    providers = ["VitisAIExecutionProvider", "CPUExecutionProvider"]
    print(" Using VitisAI Execution Provider")
elif "CUDAExecutionProvider" in available_providers:
    providers = ["CUDAExecutionProvider", "CPUExecutionProvider"]
    print(" Using CUDA Execution Provider")
else:
    providers = ["CPUExecutionProvider"]
    print("⚠️ No acceleration provider found, using CPU only")

print(f" Loading ONNX model: {onnx_model_path}")
session = ort.InferenceSession(onnx_model_path, providers=providers)
input_name = session.get_inputs()[0].name
output_name = session.get_outputs()[0].name
print(f" Model loaded with input name: {input_name}, output name: {output_name}")

###Performance Metrics Collection

In [None]:
predictions = {}
issued_sample_indices = set()
latencies = []
batch_times = []
total_samples_processed = 0
start_time = None
end_time = None

## `issue_queries(query_samples)`
Performs inference on a batch of input samples and returns the model’s predictions.

In [None]:
def issue_queries(query_samples):
    global total_samples_processed, start_time, end_time
    
    if start_time is None:
        start_time = time.time()
    
    indices = [qs.index for qs in query_samples]
    tensors = [dataset.get_sample(i)[0] for i in indices]
    
    for i in range(0, len(tensors), BATCH_SIZE):
        batch_start = time.time()
        
        # Ensure we don't go out of bounds
        end_idx = min(i + BATCH_SIZE, len(tensors))
        mini_batch = np.vstack(tensors[i:end_idx])
        batch_indices = indices[i:end_idx]
        batch_queries = query_samples[i:end_idx]
        
        # Run inference
        outputs = session.run([output_name], {input_name: mini_batch})[0]
        batch_end = time.time()
        
        # Record batch processing time
        batch_latency = batch_end - batch_start
        batch_times.append(batch_latency)
        
        # Per-sample latency (divide batch time by batch size)
        per_sample_latency = batch_latency / len(batch_indices)
        latencies.extend([per_sample_latency] * len(batch_indices))
        
        # Process results
        top1_preds = np.argmax(outputs, axis=1)
        top5_preds = np.argsort(outputs, axis=1)[:, -5:][:, ::-1]

        # Send responses
        for j, (sample_idx, top1, top5) in enumerate(zip(batch_indices, top1_preds, top5_preds)):
            issued_sample_indices.add(sample_idx)
            predictions[sample_idx] = {
                "top1": int(top1),
                "top5": [int(x) for x in top5]
            }
            response = QuerySampleResponse(batch_queries[j].id, 0, 0)
            QuerySamplesComplete([response])
            total_samples_processed += 1
            
            # Debug printing for performance mode to track progress
            if total_samples_processed % 500 == 0:
                print(f"Processed {total_samples_processed*5} samples...", end="\r")
    
    end_time = time.time()

## `flush_queries()`
Ensures all pending inference requests are processed (currently does nothing).

## `load_samples_to_ram(sample_indices)`
Loads the selected samples into RAM to prepare them for inference.

## `unload_samples_from_ram(sample_indices)`
Removes the specified samples from RAM to free up memory.

In [None]:
def flush_queries():
    pass

def load_samples_to_ram(sample_indices):
    dataset.load_samples(sample_indices)

def unload_samples_from_ram(sample_indices):
    dataset.unload_samples(sample_indices)

## `run_accuracy_test()`
Executes the accuracy evaluation by running inference on test data and comparing results to ground truth.

In [None]:
def run_accuracy_test():
    """Run MLPerf accuracy test"""
    global predictions, issued_sample_indices
    predictions = {}
    issued_sample_indices = set()
    
    print("\n Running ACCURACY test...")
    
    # Configure logging
    log_settings = LogSettings()
    log_settings.log_output.outdir = "."  # Use current directory (which is now results_dir)
    log_settings.log_output.copy_summary_to_stdout = True
    log_settings.log_output.copy_detail_to_stdout = True
    
    # Check available logging modes and use an appropriate one
    available_modes = dir(LoggingMode)
    if "ASYNC_WRITE_BACK" in available_modes:
        log_settings.log_mode = LoggingMode.ASYNC_WRITE_BACK
    elif "AsyncWriteBack" in available_modes:
        log_settings.log_mode = LoggingMode.AsyncWriteBack
    else:
        # Fall back to the default mode or another appropriate mode
        print("Warning: ASYNC_WRITE_BACK logging mode not found, using default mode")
    
    # Configure test settings for Offline scenario
    settings = TestSettings()
    settings.scenario = TestScenario.Offline
    settings.mode = TestMode.AccuracyOnly
    settings.min_query_count = MIN_QUERY_COUNT
    settings.min_duration_ms = MIN_DURATION_MS
    
    # For Offline scenario, we want maximum throughput
    # Set a very high QPS to ensure maximum throughput without throttling
    settings.offline_expected_qps = len(sample_indices) / (MIN_DURATION_MS / 1000)
    
    # Run the test
    sut = ConstructSUT(issue_queries, flush_queries)
    qsl = ConstructQSL(
        len(sample_indices), min(1024, len(sample_indices)),
        load_samples_to_ram,
        unload_samples_from_ram
    )
    
    StartTest(sut, qsl, settings)
    
    # Calculate accuracy
    accuracy_results = calculate_accuracy()
    
    # Cleanup
    DestroyQSL(qsl)
    DestroySUT(sut)
    
    return accuracy_results

## `run_performance_test()`
Measures the inference speed and throughput of the model under test conditions.

In [None]:
def run_performance_test():
    """Run MLPerf performance test"""
    global latencies, batch_times, total_samples_processed, start_time, end_time
    global predictions, issued_sample_indices
    
    predictions = {}
    issued_sample_indices = set()
    latencies = []
    batch_times = []
    total_samples_processed = 0
    start_time = None
    end_time = None
    
    print("\n Running PERFORMANCE test...")
    
    # Configure logging
    log_settings = LogSettings()
    log_settings.log_output.outdir = "."  # Use current directory (which is now results_dir)
    log_settings.log_output.copy_summary_to_stdout = True
    log_settings.log_output.copy_detail_to_stdout = True
    
    # Check available logging modes and use an appropriate one
    available_modes = dir(LoggingMode)
    if "ASYNC_WRITE_BACK" in available_modes:
        log_settings.log_mode = LoggingMode.ASYNC_WRITE_BACK
    elif "AsyncWriteBack" in available_modes:
        log_settings.log_mode = LoggingMode.AsyncWriteBack
    else:
        # Fall back to the default mode or another appropriate mode
        print("Warning: ASYNC_WRITE_BACK logging mode not found, using default mode")
    
    # Configure test settings for Offline scenario with maximum throughput
    settings = TestSettings()
    settings.scenario = TestScenario.Offline
    settings.mode = TestMode.PerformanceOnly
    settings.min_query_count = MIN_QUERY_COUNT
    settings.min_duration_ms = MIN_DURATION_MS
    
    # Important: Limit the performance queries to the same number of samples
    # This ensures we're testing the same dataset subset
    performance_count = len(sample_indices)
    
    # For Offline scenario, we want maximum throughput
    # Set a high but reasonable QPS, adjusting to our dataset size.
    settings.offline_expected_qps = performance_count / (MIN_DURATION_MS / 1000)
    
    # Run the test
    sut = ConstructSUT(issue_queries, flush_queries)
    qsl = ConstructQSL(
        performance_count, min(1024, performance_count),
        load_samples_to_ram,
        unload_samples_from_ram
    )
    
    # Pre-load timestamps to ensure we get accurate measurements
    print("Starting performance test timer...")
    start_time = time.time()
    
    # Start the test
    StartTest(sut, qsl, settings)
    
    # Ensure we captured the end time
    if end_time is None:
        end_time = time.time()
    
    # Force collection of any remaining data
    if not latencies and total_samples_processed > 0:
        print("No latencies collected during test, estimating from total time")
        avg_latency = (end_time - start_time) / total_samples_processed
        latencies = [avg_latency] * total_samples_processed
    
    if not batch_times and total_samples_processed > 0:
        print("No batch times collected, using dummy values")
        batch_times = [avg_latency * BATCH_SIZE]
    
    # Save performance statistics
    print(f" Processed {total_samples_processed} samples in {end_time - start_time:.2f} seconds")
    perf_stats = save_performance_stats()
    
    # Cleanup
    DestroyQSL(qsl)
    DestroySUT(sut)
    
    return perf_stats

## `calculate_accuracy()`
Computes accuracy metrics (e.g., top-1, top-5) based on the inference results.

In [None]:
def calculate_accuracy():
    """Calculate and save accuracy metrics"""
    if not predictions:
        print(" No predictions collected for accuracy calculation")
        return {"top1_accuracy": 0, "top5_accuracy": 0, "samples": 0}
        
    top1 = 0
    top5 = 0
    for i in predictions:
        gt = ground_truth[i]
        pred_data = predictions.get(i, {})
        top1_pred = pred_data.get("top1", -1)
        top5_list = pred_data.get("top5", [])
        if top1_pred == gt:
            top1 += 1
        if gt in top5_list:
            top5 += 1

    top1_acc = top1 / len(predictions) * 100 if predictions else 0
    top5_acc = top5 / len(predictions) * 100 if predictions else 0

    # Save accuracy results
    accuracy_txt = Path("accuracy.txt")
    with open(accuracy_txt, "w") as f:
        f.write(f"Top-1 Accuracy: {top1_acc:.2f}%\n")
        f.write(f"Top-5 Accuracy: {top5_acc:.2f}%\n")
        f.write(f"Total samples: {len(predictions)}\n")
    print(f"\n Accuracy written to {accuracy_txt}")

    acc_json = Path("mlperf_log_accuracy.json")
    with open(acc_json, "w") as f:
        json.dump({"top1": top1_acc, "top5": top5_acc}, f, indent=2)
    #print(f" mlperf_log_accuracy.json saved to {acc_json}")
    
    return {
        "top1_accuracy": top1_acc,
        "top5_accuracy": top5_acc,
        "samples": len(predictions)
    }


## `save_performance_stats()`
Gathers and stores performance statistics such as latency, throughput, and accuracy.

In [None]:
def save_performance_stats():
    """Save detailed performance statistics"""
    if not latencies:
        print(" No performance data collected")
        return {}
    
    # Calculate statistics
    test_duration = end_time - start_time if start_time and end_time else 0
    throughput = total_samples_processed / test_duration if test_duration > 0 else 0
    
    # Ensure we have data to work with
    if len(latencies) == 0:
        print(" No latency data collected, using estimates")
        latencies.append(test_duration / total_samples_processed if total_samples_processed > 0 else 0)
    
    if len(batch_times) == 0:
        print(" No batch time data collected, using estimates")
        batch_times.append(test_duration / (total_samples_processed / BATCH_SIZE) if total_samples_processed > 0 else 0)
    
    stats = {
        "total_samples": total_samples_processed,
        "test_duration_seconds": test_duration,
        "throughput_samples_per_second": throughput,
        "latency_stats": {
            "mean": statistics.mean(latencies) * 1000,  # ms
            "median": statistics.median(latencies) * 1000,  # ms
            "min": min(latencies) * 1000,  # ms
            "max": max(latencies) * 1000,  # ms
            "p90": np.percentile(latencies, 90) * 1000,  # ms
            "p95": np.percentile(latencies, 95) * 1000,  # ms
            "p99": np.percentile(latencies, 99) * 1000,  # ms
        },
        "batch_stats": {
            "mean": statistics.mean(batch_times) * 1000,  # ms
            "median": statistics.median(batch_times) * 1000,  # ms
            "min": min(batch_times) * 1000,  # ms
            "max": max(batch_times) * 1000,  # ms
        }
    }
    
    # Explicitly print key performance metrics
    print(f"\n PERFORMANCE SUMMARY:")
    print(f"Total samples: {stats['total_samples']}")
    print(f"Duration: {stats['test_duration_seconds']:.2f} seconds")
    print(f"Throughput: {stats['throughput_samples_per_second']:.2f} samples/second")
    print(f"Average latency: {stats['latency_stats']['mean']:.2f} ms/sample")
    print(f"P90 latency: {stats['latency_stats']['p90']:.2f} ms")
    
    # Save performance results
    perf_txt = Path("performance.txt")
    with open(perf_txt, "w") as f:
        f.write(f"===== PERFORMANCE RESULTS =====\n\n")
        f.write(f"Scenario: Offline\n")
        f.write(f"Total samples processed: {stats['total_samples']}\n")
        f.write(f"Test duration: {stats['test_duration_seconds']:.2f} seconds\n")
        f.write(f"Throughput: {stats['throughput_samples_per_second']:.2f} samples/second\n\n")
        
        f.write(f"===== LATENCY (ms) =====\n")
        f.write(f"Mean: {stats['latency_stats']['mean']:.2f}\n")
        f.write(f"Median: {stats['latency_stats']['median']:.2f}\n")
        f.write(f"Min: {stats['latency_stats']['min']:.2f}\n")
        f.write(f"Max: {stats['latency_stats']['max']:.2f}\n")
        f.write(f"90th percentile: {stats['latency_stats']['p90']:.2f}\n")
        f.write(f"95th percentile: {stats['latency_stats']['p95']:.2f}\n")
        f.write(f"99th percentile: {stats['latency_stats']['p99']:.2f}\n\n")
        
        f.write(f"===== BATCH PROCESSING TIME (ms) =====\n")
        f.write(f"Mean: {stats['batch_stats']['mean']:.2f}\n")
        f.write(f"Median: {stats['batch_stats']['median']:.2f}\n")
        f.write(f"Min: {stats['batch_stats']['min']:.2f}\n")
        f.write(f"Max: {stats['batch_stats']['max']:.2f}\n")
    
    print(f"\n Performance statistics written to {perf_txt}")
    
    # Save detailed performance JSON
    perf_json = Path("performance_stats.json")
    with open(perf_json, "w") as f:
        json.dump(stats, f, indent=2)
    #print(f" Performance stats saved to {perf_json}")
    
    # Create a simple summary file that mlperf_loadgen might be expecting
    summary_file = Path("mlperf_log_summary.txt")
    if not summary_file.exists():
        print(f"⚠️ MLPerf log summary not found, creating a basic version")
        with open(summary_file, "w") as f:
            f.write(f"MLPerf Inference - ResNet50 - Offline Scenario\n")
            f.write(f"Samples: {stats['total_samples']}\n")
            f.write(f"Throughput: {stats['throughput_samples_per_second']:.2f} samples/sec\n")
            f.write(f"Mean latency: {stats['latency_stats']['mean']:.2f} ms\n")
            f.write(f"90th percentile latency: {stats['latency_stats']['p90']:.2f} ms\n")
    
    return stats

###Run the tests

In [None]:
results = {
    "test_time": time.strftime("%Y-%m-%d %H:%M:%S"),
    "dataset": {
        "size": len(image_paths),
        "full_dataset_size": full_dataset_size,
        "batch_size": BATCH_SIZE
    }
}

print(f"\n MLPerf Inference ResNet50 - Offline Scenario")
print(f"Dataset size: {len(image_paths)} images, Batch size: {BATCH_SIZE}")

# Run accuracy test if enabled
if RUN_ACCURACY:
    accuracy_results = run_accuracy_test()
    results["accuracy"] = accuracy_results
    
# Run performance test if enabled  
if RUN_PERFORMANCE:
    performance_stats = run_performance_test()
    results["performance"] = performance_stats

print("\n MLPerf Inference test completed successfully")
print(f"Results saved to: {os.getcwd()}")

# Check if the required MLPerf log files were generated
summary_file = Path("mlperf_log_summary.txt")
detail_file = Path("mlperf_log_detail.json")

if not summary_file.exists():
    print(f" MLPerf log summary file not found at: {summary_file}")
    
if not detail_file.exists():
    print(f" MLPerf log detail file not found at: {detail_file}")

# Display brief summary of results
if RUN_ACCURACY and "accuracy" in results:
    acc = results["accuracy"]
    print(f"\nTop-1 Accuracy: {acc['top1_accuracy']:.2f}% ({acc['samples']} samples)")
    print(f"Top-5 Accuracy: {acc['top5_accuracy']:.2f}%")
    
if RUN_PERFORMANCE and "performance" in results:
    perf = results["performance"]
    if perf and "throughput_samples_per_second" in perf:
        print(f"\nThroughput: {perf['throughput_samples_per_second']:.2f} samples/second")
        print(f"Average latency: {perf['latency_stats']['mean']:.2f} ms/sample")
        print(f"P90 latency: {perf['latency_stats']['p90']:.2f} ms")

# change back to original dir
os.chdir(original_working_dir)

results

🔄 Changed working directory to: C:\Users\iisc\npucloud_userdata\giulio-m-polimi\ryzenaisw\image_classification\mlperf_inference\results\resnet50\Offline
📊 Dataset size: 50000 images
📋 Available ONNX providers: ['VitisAIExecutionProvider', 'DmlExecutionProvider', 'CPUExecutionProvider']
✅ Using NPU acceleration with VitisAIExecutionProvider
🔄 Loading ONNX model: C:\Users\iisc\npucloud_userdata\giulio-m-polimi\ryzenaisw\image_classification\mlperf_inference\resnet50_fp32.onnx
✅ Model loaded with input name: input, output name: output

🚀 MLPerf Inference ResNet50 - Offline Scenario
Dataset size: 50000 images, Batch size: 8

🔍 Running ACCURACY test...
📥 Loading 1024 samples into RAM...
✅ Sample loading complete
📥 Loading 1024 samples into RAM...
✅ Sample loading complete
📥 Loading 1024 samples into RAM...
✅ Sample loading complete
📥 Loading 1024 samples into RAM...
✅ Sample loading complete
📥 Loading 1024 samples into RAM...
✅ Sample loading complete
📥 Loading 1024 samples into RAM...
✅ Sa

{'test_time': '2025-05-16 19:00:38',
 'dataset': {'size': 50000, 'full_dataset_size': 50000, 'batch_size': 8},
 'accuracy': {'top1_accuracy': 76.146,
  'top5_accuracy': 92.872,
  'samples': 50000},
 'performance': {'total_samples': 55000,
  'test_duration_seconds': 1053.5606064796448,
  'throughput_samples_per_second': 52.203926059627804,
  'latency_stats': {'mean': 18.94911579652266,
   'median': 19.091039896011353,
   'min': 8.903145790100098,
   'max': 27.698993682861328,
   'p90': 19.856035709381104,
   'p95': 19.87442374229431,
   'p99': 20.48429846763611},
  'batch_stats': {'mean': 151.59292637218127,
   'median': 152.72831916809082,
   'min': 71.22516632080078,
   'max': 221.59194946289062}}}

In [12]:
print(os.getcwd())

C:\Users\iisc\npucloud_userdata\giulio-m-polimi\ryzenaisw\image_classification\mlperf_inference\results\resnet50\Offline
