In [73]:
import cv2
import numpy as np

from pathlib import Path
import os
import sys
import onnx
import onnxruntime
from collections import defaultdict
# Navigate to the parent directory
parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))

# Add the parent directory to sys.path
if parent_dir not in sys.path:
    sys.path.append(parent_dir)

from src.config import MODEL_PATH, INPUT_SHAPE,OUTPUT_NAME,INPUT_NAME,session,CLASS_NAMES


In [74]:
# Define the path to your model
MODEL_PATH = '/Users/alaindestinkarasira/Documents/Malaria_Diagnosis_/src/models/weights/best.onnx'
SEVERITY_THRESHOLD=10
# Function to load the model and create a session
def load_model(model_path):
    # Load the ONNX model
    model = onnx.load(model_path)
    # Create an ONNX Runtime session
    session = onnxruntime.InferenceSession(model_path)
    return model, session

# Load the model and session
model, session = load_model(MODEL_PATH)

# Get model input and output details
INPUT_NAME = session.get_inputs()[0].name
INPUT_SHAPE = session.get_inputs()[0].shape
OUTPUT_NAME = session.get_outputs()[0].name

# Optionally, load class names from the model metadata
CLASS_NAMES = {}
for prop in model.metadata_props:
    if prop.key == "names" or prop.key == "classes":
        CLASS_NAMES=eval(prop.value)  # Be cautious with eval
        break

In [75]:
print (INPUT_NAME)
print (INPUT_SHAPE)
print (OUTPUT_NAME)
print(CLASS_NAMES)
dets=""

images
[1, 3, 640, 640]
output0
{0: 'PF', 1: 'PM', 2: 'PO', 3: 'PV'}


In [82]:
# Preprocess the image
def preprocess_image(image_path, input_shape):
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (input_shape[2], input_shape[3]))
    image = image.transpose(2, 0, 1)
    image = np.expand_dims(image, 0)
    image = image.astype(np.float32) / 255.0
    return image

# def run_inference(session, image):
#     input_name = session.get_inputs()[0].name
#     output_names = [output.name for output in session.get_outputs()]
#     outputs = session.run(output_names, {input_name: image})
#     return outputs

def run_inference(image,output_name,input_name):
    results = session.run([output_name], {input_name: image})
    # print(results[0].shape)  # Print raw model output for debugging

    return results[0]


# Process detections
# def process_detections(detections, confidence_threshold=0.2):
#     results = []
#     for detection in detections[0]:
#         score = float(detection[4])
#         if score > confidence_threshold:
#             class_id = int(detection[5])
#             class_name = CLASS_NAMES.get(class_id, str(class_id))
#             bbox = detection[0:4].tolist()
#             results.append({
#                 'class': class_id,
#                 'class name': class_name,
#                 'confidence': score,
#                 'bbox': bbox
#             })
#     return results


def process_detections(detections, confidence_threshold=0.4):
    results = []
    for detection in detections[0]:  # Iterate over all detections
        objectness = float(detection[4])
        class_scores = detection[5:]  # The rest are class scores
        class_id = int(np.argmax(class_scores))
        confidence = class_scores[class_id]
        
        if objectness > confidence_threshold and confidence > confidence_threshold:
            class_name = CLASS_NAMES.get(class_id, str(class_id))
            bbox = detection[0:4].tolist()
            results.append({
                'class': class_id,
                'class_name': class_name,
                'confidence': confidence,
                'bbox': bbox
            })
    return results
def process_patient_images(patient_id, image_folder, temp_storage):
    # Ensure the image folder path is a Path object
    image_folder = Path(image_folder)
    
    # Get all image files in the folder
    image_files = [f for f in image_folder.iterdir() if f.is_file() and f.suffix.lower() in ('.jpg', '.jpeg', '.png', '.bmp')]
    
    for image_file in image_files:
        # Generate an image ID based on the filename
        image_id = f"{patient_id}_{image_file.stem}"
        
        # Preprocess the image
        image = preprocess_image(str(image_file), INPUT_SHAPE)
        
        # Run inference
        detections = run_inference(image,OUTPUT_NAME,INPUT_NAME)
        # print(detections)
        
        # Process detections
        results = process_detections(detections, confidence_threshold=0.5)
        
        # Save the results to temporary storage
        temp_storage.save_detection(patient_id, image_id, results)

# def process_patient_images(patient_id, image_folder, temp_storage):
#     # Ensure the image folder path is a Path object
#     image_folder = Path(image_folder)
    
#     # Get all image files in the folder
#     image_files = [f for f in image_folder.iterdir() if f.is_file() and f.suffix.lower() in ('.jpg', '.jpeg', '.png', '.bmp')]
    
#     for image_file in image_files:
#         # Generate an image ID based on the filename
#         image_id = f"{patient_id}_{image_file.stem}"
        
#         print(f"\nProcessing image: {image_file.name}")
        
#         # Preprocess the image
#         image = preprocess_image(str(image_file), INPUT_SHAPE)
        
#         # Run inference
#         detections = run_inference(image, OUTPUT_NAME,INPUT_NAME )
        
#         # Print raw detections
#         print("Raw detections:")
#         for i, detection in enumerate(detections):
#             print(f"Detection {i}:")
#             print(detection)
        
#         # Process detections
#         results = process_detections(detections, CLASS_NAMES, confidence_threshold=0.5)
        
#         # Print processed results
#         print("\nProcessed results:")
#         for result in results:
#             print(f"Class: {result['class_name']}")
#             print(f"Confidence: {result['confidence']:.2f}")
#             print(f"Bounding Box: {result['bbox']}")
#             print()
        
#         # Save the results to temporary storage
#         temp_storage.save_detection(patient_id, image_id, results)
        
#         print(f"Results saved for image: {image_file.name}")
#         print("-" * 50)

In [83]:
class TempStorage:
    def __init__(self):
        self.patient_detections = defaultdict(list)

    def save_detection(self, patient_id, image_id, detection_results):
        self.patient_detections[patient_id].append({
            "image_id": image_id,
            "results": detection_results
        })

    def get_patient_detections(self, patient_id):
        return self.patient_detections.get(patient_id, [])

def aggregate_and_assess_severity(image_results, severity_threshold):
    aggregated_results = {}

    # Aggregate data across all images
    for result in image_results:
        for parasite in result["results"]:
            parasite_type = parasite["class_name"]
            confidence = parasite["confidence"]

            if parasite_type not in aggregated_results:
                aggregated_results[parasite_type] = {
                    "count": 0,
                    "total_confidence": 0.0,
                    "instances": 0
                }

            aggregated_results[parasite_type]["count"] += 1
            aggregated_results[parasite_type]["total_confidence"] += confidence
            aggregated_results[parasite_type]["instances"] += 1

    # Calculate average confidence
    for parasite_type, data in aggregated_results.items():
        data["average_confidence"] = (
            data["total_confidence"] / data["instances"]
            if data["instances"] > 0 else 0
        )

    # Determine severity
    total_parasite_count = sum(data["count"] for data in aggregated_results.values())
    severity = "Mild" if total_parasite_count < severity_threshold else "Severe"

    return aggregated_results, severity


In [91]:
# Hardcoded patient ID and image folder path
patient_id = "pat_001"
image_folder = "/Users/alaindestinkarasira/Documents/Malaria_Diagnosis_/data/External"  # Ensure the case matches

# Log the received request and paths
print(f"Processing images for patient ID: {patient_id} in folder: {image_folder}")

# Create a temporary storage instance
temp_storage = TempStorage()

# Process the images for the given patient
process_patient_images(patient_id, image_folder, temp_storage)

# Aggregate results and assess severity
aggregated_results, severity = aggregate_and_assess_severity(
    temp_storage.get_patient_detections(patient_id), 2
)

# Log the results
# print(f"Aggregated Instances: {aggregated_results['instances']}")
print(f"Aggregated Instances: {aggregated_results}")

print(f"Severity: {severity}")


Processing images for patient ID: pat_001 in folder: /Users/alaindestinkarasira/Documents/Malaria_Diagnosis_/data/External
(1, 25200, 9)
(1, 25200, 9)
(1, 25200, 9)
(1, 25200, 9)
(1, 25200, 9)
(1, 25200, 9)
(1, 25200, 9)
(1, 25200, 9)
Parasite Type: PV
  Count: 27
  Average Confidence: 0.95
  Severity: 25.62
  Severity Level: high

Parasite Type: PO
  Count: 20
  Average Confidence: 0.79
  Severity: 15.70
  Severity Level: high

Aggregated Instances: PV
Severity: PO


In [92]:
print(dets)




In [93]:
def aggregate_and_assess_severity(image_results, severity_threshold):
    aggregated_results = {}

    # Aggregate data across all images
    for result in image_results:
        seen_parasites = set()  # To avoid double counting in the same image
        for parasite in result["results"]:
            parasite_type = parasite["class_name"]
            confidence = parasite["confidence"]

            # Unique key to identify each detected instance by class and bounding box
            unique_key = (parasite_type, tuple(parasite["bbox"]))

            if unique_key not in seen_parasites:
                seen_parasites.add(unique_key)

                if parasite_type not in aggregated_results:
                    aggregated_results[parasite_type] = {
                        "count": 0,
                        "total_confidence": 0.0,
                        "instances": 0
                    }

                aggregated_results[parasite_type]["count"] += 1
                aggregated_results[parasite_type]["total_confidence"] += confidence
                aggregated_results[parasite_type]["instances"] += 1

    # Calculate average confidence and severity, and print results
    severity_results = {}
    for parasite_type, data in aggregated_results.items():
        data["average_confidence"] = (
            data["total_confidence"] / data["instances"]
            if data["instances"] > 0 else 0
        )
        # Example severity computation: (count * average_confidence)
        severity = data["count"] * data["average_confidence"]

        # Assess severity based on the threshold
        severity_level = "low"
        if severity > severity_threshold:
            severity_level = "high"
        
        # Store the results
        severity_results[parasite_type] = {
            "severity": severity,
            "severity_level": severity_level,
            "count": data["count"],
            "average_confidence": data["average_confidence"]
        }
        
        # Print the aggregated results and severity level
        print(f"Parasite Type: {parasite_type}")
        print(f"  Count: {data['count']}")
        print(f"  Average Confidence: {data['average_confidence']:.2f}")
        print(f"  Severity: {severity:.2f}")
        print(f"  Severity Level: {severity_level}")
        print()

    return severity_results

In [94]:
aggregate_and_assess_severity(temp_storage.get_patient_detections(patient_id), 2)

Parasite Type: PV
  Count: 27
  Average Confidence: 0.95
  Severity: 25.62
  Severity Level: high

Parasite Type: PO
  Count: 20
  Average Confidence: 0.79
  Severity: 15.70
  Severity Level: high



{'PV': {'severity': 25.621412217617035,
  'severity_level': 'high',
  'count': 27,
  'average_confidence': 0.9489411932450754},
 'PO': {'severity': 15.702557981014252,
  'severity_level': 'high',
  'count': 20,
  'average_confidence': 0.7851278990507126}}