# Coffee Level Detection Batch Inference

This notebook runs inference on all images in a given directory using a trained coffeeCNN model and saves results to a JSON file matching the format of `compiled_coffee_level_annotations.json`.

In [1]:
# 1. Import Required Libraries
!pip install -q ./coffee_level_detection
import os
import json
from datetime import datetime
from tqdm import tqdm
from coffee_level_detection.inference.tools import infer_coffee_level


[0m

In [5]:
# 2. Load Pre-trained Inference Model
image_dir = "processed_images"  # Change as needed
model_path = "coffeeCNN.pth"  # Change as needed

# No need to load model explicitly, infer_coffee_level loads it per call (can be optimized for batch)


In [None]:
# 3. Run Inference on All Images in image_dir (with batching & concurrency, optimized model loading)

import concurrent.futures
from coffee_level_detection.inference.tools import load_model, infer_coffee_level_batch

results = []
annotator = "inference"
version = "1.0"

image_files = [f for f in os.listdir(image_dir) if f.lower().endswith(('.jpg', '.png'))]
print(f"Found {len(image_files)} images in {image_dir}")

# --- Batching & Concurrency Parameters ---
batch_size = 32  # Change as needed
max_workers = 8  # Change as needed

device = None  # Set to 'cuda' or 'cpu' if needed
model = load_model(model_path, device)

def process_batch(batch):
    batch_paths = [os.path.join(image_dir, fname) for fname in batch]
    try:
        coffee_levels = infer_coffee_level_batch(batch_paths, model, device)
    except Exception as e:
        print(f"Error processing batch: {e}")
        coffee_levels = [None] * len(batch)
    batch_results = []
    for fname, coffee_level in zip(batch, coffee_levels['preds']):
        batch_results.append({
            "filename": fname,
            "coffee_level": coffee_level,
            "timestamp": datetime.now().isoformat(),
            "annotator": annotator,
            "version": version
        })
    return batch_results

def process_batch_with_confidence(batch):
    """
    Process a batch of images and extract predictions with confidence scores.
    
    Args:
        batch (list): List of image filenames to process
        
    Returns:
        list: List of annotation dictionaries with confidence data
    """
    batch_paths = [os.path.join(image_dir, fname) for fname in batch]
    batch_results = []
    
    try:
        # Get inference results with probabilities and confidence
        inference_results = infer_coffee_level_batch(batch_paths, model, device)
        
        # Extract components from the returned dictionary
        predictions = inference_results['preds']
        confidences = inference_results['conf'] 
        probabilities = inference_results['prob']
        
        # Process each image result
        for fname, pred, conf, prob_dist in zip(batch, predictions, confidences, probabilities):
            batch_results.append({
                "filename": fname,
                "coffee_level": int(pred) if pred is not None else None,
                "confidence": float(conf) if conf is not None else 0.0,
                "probability_distribution": prob_dist.tolist() if isinstance(prob_dist, np.ndarray) else prob_dist,
                "timestamp": datetime.now().isoformat(),
                "annotator": annotator,
                "version": version
            })
            
    except Exception as e:
        print(f"Error processing batch: {e}")
        # Create error entries for all files in batch
        for fname in batch:
            batch_results.append({
                "filename": fname,
                "coffee_level": None,
                "confidence": 0.0,
                "probability_distribution": None,
                "timestamp": datetime.now().isoformat(),
                "annotator": annotator,
                "version": version,
                "error": str(e)
            })
    
    return batch_results

# Split image_files into batches
def batchify(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = []
    for batch in batchify(image_files, batch_size):
        futures.append(executor.submit(process_batch_with_confidence, batch))
    for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Running inference batches"):
        results.extend(future.result())


Found 41608 images in processed_images


Running inference batches: 100%|██████████| 1301/1301 [01:08<00:00, 19.06it/s]


In [7]:
# 4. Compile Results into JSON Format
output_json = {
    "total_annotated": len(results),
    "session_timestamp": datetime.now().isoformat(),
    "level_distribution": {},
    "annotation_data": results
}

# Compute level distribution
levels = [r["coffee_level"] for r in results if r["coffee_level"] is not None]
from collections import Counter
output_json["level_distribution"] = dict(Counter(levels))


In [8]:
# 5. Save Results to JSON File
output_path = "inference_coffee_level_annotations.json"  # Change as needed
with open(output_path, "w", encoding="utf-8") as f:
    json.dump(output_json, f, indent=2)
print(f"Saved inference results to {output_path}")

Saved inference results to inference_coffee_level_annotations.json
