## Testing image upload

In [16]:
from PIL import Image
import os

image_path = os.path.join("../", "../", "tests","test_image_2.jpg")
image = Image.open(image_path)

In [39]:
from datetime import datetime
import requests
import io

url = "https://europe-west3-pressure-watcher-ea41a.cloudfunctions.net/process-image"

# Convert image to bytes
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='JPEG')
img_byte_arr = img_byte_arr.getvalue()

timestamp = datetime.now()
filename = f"{timestamp.strftime("%Y%m%d_%H%M%S_%f")}.jpg" # Example output: 20240101_123456.png

files = {'image': (filename, img_byte_arr, 'image/jpeg')}
data = {
    'image': filename,
    'timestamp': str(timestamp),
    'device_id': "jupyter_test",
    'min_value': 0,
    'max_value': 160,
    'unit': "PSI"
}

response = requests.post(url, files=files, data=data)

print(response.status_code)
print(response.text)

200
{"detections":{"confidence":{"center":0.893,"gauge":0.988,"max":0.861,"min":0.654,"tip":0.812},"found":["gauge","center","max","tip","min"]},"device_id":"jupyter_test","firestore_id":"RsGjORXEda5c5DEKUYr4","measurement":91.22,"status":"success","storage":{"bucket":"analog-gauge-images","full_url":"gs://analog-gauge-images/jupyter_test/20251111/20251111_132651_016266.jpg.jpg","image_path":"jupyter_test/20251111/20251111_132651_016266.jpg.jpg"},"timestamp":"2025-11-11 13:26:51.016266","unit":"PSI"}



# Mock data

In [42]:
import os
from time import sleep
import numpy as np

for i in range(30):
    for f in os.listdir("mock_data"):
        random_choice = np.random.choice([1,2,3])
        if random_choice != 3:
            if 'ipynb' in f: continue
            image_path = os.path.join("mock_data", f)
            image = Image.open(image_path)
        
            # Convert image to bytes
            img_byte_arr = io.BytesIO()
            image.save(img_byte_arr, format='JPEG')
            img_byte_arr = img_byte_arr.getvalue()
            
            timestamp = datetime.now()
            filename = f"{timestamp.strftime("%Y%m%d_%H%M%S_%f")}.jpg" # Example output: 20240101_123456.png
            
            files = {'image': (filename, img_byte_arr, 'image/jpeg')}
            data = {
                'image': filename,
                'timestamp': str(timestamp),
                'device_id': "demo",
                'min_value': -1,
                'max_value': 3,
                'unit': "bar"
            }
        
            response = requests.post(url, files=files, data=data)
            
            sleep(10)
        else:
            continue
    

In [17]:
"""
YOLO Model Inference Module

This module provides functionality for loading a YOLO model and detecting
objects in images, specifically designed for analog gauge reading applications.

Classes detected:
    - center: Center point of the gauge
    - gauge: The entire gauge body
    - max: Maximum value marker on the gauge
    - min: Minimum value marker on the gauge
    - tip: Tip of the gauge needle

Usage:
    from yolo import GaugeDetector
    
    detector = GaugeDetector('path/to/yolo_best.pt')
    detections = detector.predict('path/to/image.jpg')
    
    # Access detections by class name
    center_box = detections['center']
    tip_box = detections['tip']
"""

from ultralytics import YOLO
from typing import Dict, Tuple
import os
from PIL.Image import Image as PILImage


class Detection:
    """
    Represents a single object detection with bounding box and metadata.
    
    Attributes:
        class_name (str): Name of the detected class
        confidence (float): Detection confidence score (0-1)
        bbox (Tuple[float, float, float, float]): Bounding box coordinates (x1, y1, x2, y2)
    """
    
    def __init__(self, class_name: str, confidence: float, bbox: Tuple[float, float, float, float]):
        """
        Initialize a Detection object.
        
        Args:
            class_name: Name of the detected class
            confidence: Confidence score between 0 and 1
            bbox: Bounding box as (x1, y1, x2, y2) coordinates
        """
        self.class_name = class_name
        self.confidence = confidence
        self.bbox = bbox
    
    def __repr__(self) -> str:
        return (f"Detection(class='{self.class_name}', "
                f"confidence={self.confidence:.2%}, "
                f"bbox={[f'{x:.0f}' for x in self.bbox]})")


class GaugeDetector:
    """
    YOLO-based detector for analog gauge components.
    
    This class handles loading a YOLO model and running inference on images
    to detect gauge components (center, tip, min, max markers).
    """
    
    def __init__(self, model_path: str):
        """
        Initialize the GaugeDetector with a trained YOLO model.
        
        Args:
            model_path: Path to the YOLO model weights file (.pt)
            
        Raises:
            FileNotFoundError: If the model file doesn't exist
            ValueError: If the model cannot be loaded
        """
        if not os.path.exists(model_path):
            raise FileNotFoundError(f"Model file not found: {model_path}")
        
        try:
            self.model = YOLO(model_path)
            self.class_names = self.model.names
            print("=" * 60)
            print("MODEL LOADED")
            print("=" * 60)
            print(f"Classes: {self.class_names}")
            print(f"Number of classes: {len(self.class_names)}")
            print("=" * 60)
        except Exception as e:
            raise ValueError(f"Failed to load model: {e}")
    
    def predict(self, image: PILImage, verbose: bool = True) -> Dict[str, Detection]:
        """
        Run inference on an image and return the highest confidence detection for each class.
        
        This method detects all objects in the image and returns only the detection
        with the highest confidence score for each class. This is important for
        gauge reading where multiple false positives might occur.
        
        Args:
            image_path: Path to the input image
            verbose: If True, print detection information
            
        Returns:
            Dictionary mapping class names to Detection objects. Only includes
            classes that were detected in the image.
            
        Raises:
            FileNotFoundError: If the image file doesn't exist
            
        Example:
            detections = detector.predict('gauge.jpg')
            if 'tip' in detections:
                tip_bbox = detections['tip'].bbox
                confidence = detections['tip'].confidence
        """
        
        # Run inference
        results = self.model(image)
        
        # Dictionary to store the highest confidence detection for each class
        best_detections: Dict[str, Detection] = {}
        
        # Process all detections
        for box in results[0].boxes:
            class_id = int(box.cls[0])
            class_name = self.class_names[class_id]
            confidence = box.conf[0].item()
            x1, y1, x2, y2 = box.xyxy[0].tolist()
            
            # Keep only the highest confidence detection for each class
            if class_name not in best_detections or confidence > best_detections[class_name].confidence:
                best_detections[class_name] = Detection(class_name, confidence, (x1, y1, x2, y2))
        
        if verbose:
            print(f"\nFound {len(best_detections)} unique class(es):")
            for i, (class_name, detection) in enumerate(best_detections.items(), 1):
                print(f"\nDetection {i}:")
                print(f"  Class: {detection.class_name}")
                print(f"  Confidence: {detection.confidence:.2%}")
                print(f"  Box: [{detection.bbox[0]:.0f}, {detection.bbox[1]:.0f}, "
                      f"{detection.bbox[2]:.0f}, {detection.bbox[3]:.0f}]")
        
        return best_detections
    
    def show_results(self, image: PILImage) -> None:
        """
        Run inference and display the image with bounding boxes.
        
        Args:
            image_path: Path to the input image
        """
        results = self.model(image)
        results[0].show()

In [18]:
"""
Analog Gauge Reader Module

This module extracts measurements from analog gauges by analyzing the detected
bounding boxes of gauge components (center, tip, min, max markers).

The measurement is calculated by:
1. Finding the center point of each detected component
2. Computing vectors from the gauge center to other components
3. Calculating the angle between the needle tip and min marker
4. Computing the maximum angle range between min and max markers
5. Converting the angle to a measurement value using linear interpolation

Usage:
    from yolo import GaugeDetector
    from gauge_reader import GaugeReader

    # Detect gauge components
    detector = GaugeDetector('yolo_best.pt')
    detections = detector.predict('gauge_image.jpg')

    # Read measurement
    reader = GaugeReader(min_value=0, max_value=160, unit="PSI")
    measurement = reader.read_gauge(detections)
    print(f"Reading: {measurement:.2f} PSI")
"""

import numpy as np
import math
from typing import Dict, Tuple, Optional
#from yolo import Detection


class Detection:
    """
    Represents a single object detection with bounding box and metadata.

    Attributes:
        class_name (str): Name of the detected class
        confidence (float): Detection confidence score (0-1)
        bbox (Tuple[float, float, float, float]): Bounding box coordinates (x1, y1, x2, y2)
    """

    def __init__(self, class_name: str, confidence: float, bbox: Tuple[float, float, float, float]):
        """
        Initialize a Detection object.

        Args:
            class_name: Name of the detected class
            confidence: Confidence score between 0 and 1
            bbox: Bounding box as (x1, y1, x2, y2) coordinates
        """
        self.class_name = class_name
        self.confidence = confidence
        self.bbox = bbox

    def __repr__(self) -> str:
        return (f"Detection(class='{self.class_name}', "
                f"confidence={self.confidence:.2%}, "
                f"bbox={[f'{x:.0f}' for x in self.bbox]})")

class GaugeReader:
    """
    Reads measurements from analog gauges using detected component positions.

    This class performs geometric calculations to determine the gauge reading
    based on the positions of the needle tip relative to the min/max markers.
    """

    def __init__(self, min_value: float = 0, max_value: float = 100, unit: str = "units"):
        """
        Initialize the GaugeReader with gauge specifications.

        Args:
            min_value: The value corresponding to the minimum position on the gauge
            max_value: The value corresponding to the maximum position on the gauge
            unit: The unit of measurement (e.g., "PSI", "°C", "RPM")

        Example:
            reader = GaugeReader(min_value=0, max_value=160, unit="PSI")
        """
        self.min_value = min_value
        self.max_value = max_value
        self.unit = unit

    @staticmethod
    def get_bbox_center(bbox: Tuple[float, float, float, float]) -> np.ndarray:
        """
        Calculate the center point of a bounding box.

        Args:
            bbox: Bounding box coordinates as (x1, y1, x2, y2)

        Returns:
            Numpy array containing the center coordinates [xc, yc]

        Example:
            center = get_bbox_center((0, 0, 10, 10))  # Returns [5.0, 5.0]
        """
        x1, y1, x2, y2 = bbox
        xc = x1 + (x2 - x1) / 2
        yc = y1 + (y2 - y1) / 2
        return np.array([xc, yc])

    @staticmethod
    def get_angle_between_vectors(v1: np.ndarray, v2: np.ndarray, in_degrees: bool = True) -> float:
        """
        Calculate the angle between two vectors.

        Args:
            v1: First vector as numpy array
            v2: Second vector as numpy array
            in_degrees: If True, return angle in degrees; if False, return in radians

        Returns:
            Angle between the vectors

        Note:
            The angle is always positive and between 0 and 180 degrees (or 0 and π radians).
        """
        # Normalize vectors to avoid numerical issues
        v1_norm = np.linalg.norm(v1)
        v2_norm = np.linalg.norm(v2)

        if v1_norm == 0 or v2_norm == 0:
            return 0.0

        # Calculate dot product and clamp to [-1, 1] to avoid numerical errors with arccos
        cos_angle = np.clip(np.dot(v1, v2) / (v1_norm * v2_norm), -1.0, 1.0)
        angle_rad = np.arccos(cos_angle)

        if in_degrees:
            return math.degrees(angle_rad)
        return angle_rad

    def read_gauge(self, detections: Dict[str, Detection], verbose: bool = True) -> Optional[float]:
        """
        Calculate the gauge reading from detected components.

        This method requires detections for 'center', 'tip', 'min', and 'max' classes.
        It calculates the needle position relative to the gauge range and converts
        it to a measurement value.

        Args:
            detections: Dictionary mapping class names to Detection objects
            verbose: If True, print detailed calculation information

        Returns:
            The calculated measurement value, or None if required detections are missing

        Raises:
            ValueError: If required detections are missing

        Example:
            measurement = reader.read_gauge(detections)
            if measurement is not None:
                print(f"Gauge reading: {measurement:.2f} {reader.unit}")
        """
        # Check for required detections
        required_classes = ['center', 'tip', 'min', 'max']
        missing_classes = [cls for cls in required_classes if cls not in detections]

        if missing_classes:
            raise ValueError(
                f"Missing required detections: {', '.join(missing_classes)}. "
                f"Cannot calculate gauge reading without all components."
            )

        # Extract center points of each bounding box
        center = self.get_bbox_center(detections['center'].bbox)
        tip = self.get_bbox_center(detections['tip'].bbox)
        min_marker = self.get_bbox_center(detections['min'].bbox)
        max_marker = self.get_bbox_center(detections['max'].bbox)

        if verbose:
            print("\nComponent Centers:")
            print(f"  Center: {center}")
            print(f"  Tip:    {tip}")
            print(f"  Min:    {min_marker}")
            print(f"  Max:    {max_marker}")

        # Calculate radius vectors from gauge center to each component
        r_tip = tip - center
        r_min = min_marker - center
        r_max = max_marker - center

        # Calculate the angle between min and max markers
        # This represents the full range of the gauge
        min_max_angle = self.get_angle_between_vectors(r_min, r_max, in_degrees=True)

        # The gauge typically spans more than 180 degrees, so we take the reflex angle
        # BUG FIX: Changed from 360 - angle to handle gauges that span less than 180 degrees
        # We now check which interpretation makes sense based on gauge design
        if min_max_angle < 180:
            # Gauge spans less than 180 degrees - use the angle directly
            max_angle = min_max_angle
        else:
            # This shouldn't happen with arccos, but kept for safety
            max_angle = min_max_angle

        # However, most analog gauges span more than 180 degrees (e.g., 270 degrees)
        # So we typically want the reflex angle
        # BUG FIX: The original calculation assumed this, which is correct for most gauges
        max_angle = 360 - min_max_angle

        # Calculate the angle between tip and min marker
        # This represents the current needle position
        tip_angle = self.get_angle_between_vectors(r_tip, r_min, in_degrees=True)

        if verbose:
            print(f"\nAngle Calculations:")
            print(f"  Full gauge range: {round(max_angle)}°")
            print(f"  Needle position:  {round(tip_angle)}°")

        # Calculate the proportion of the gauge range covered by the needle
        tip_proportion = tip_angle / max_angle

        # Convert proportion to measurement value using linear interpolation
        measurement = tip_proportion * (self.max_value - self.min_value) + self.min_value

        if verbose:
            print(f"\nGauge Reading:")
            print(f"  Needle at {round(tip_proportion)} of full range")
            print(f"  Measurement: {measurement:.1f} {self.unit}")

        return measurement

    def read_gauge_from_image(self, image_path: str, detector, verbose: bool = True) -> Optional[float]:
        """
        Convenience method to detect and read gauge in one step.

        Args:
            image_path: Path to the gauge image
            detector: GaugeDetector instance for running inference
            verbose: If True, print detailed information

        Returns:
            The calculated measurement value, or None if reading fails

        Example:
            from yolo import GaugeDetector
            from gauge_reader import GaugeReader

            detector = GaugeDetector('yolo_best.pt')
            reader = GaugeReader(min_value=0, max_value=160, unit="PSI")

            measurement = reader.read_gauge_from_image('gauge.jpg', detector)
        """
        try:
            # Run detection
            detections = detector.predict(image_path, verbose=verbose)

            # Calculate measurement
            measurement = self.read_gauge(detections, verbose=verbose)

            return measurement

        except (ValueError, FileNotFoundError) as e:
            print(f"Error reading gauge: {e}")
            return None

In [14]:

detector = GaugeDetector(os.path.join("../", "../","models", "yolo_best.pt"))
results = detector(image)

/home/chris/Projects/Analog Gauge Monitoring/google-cloud-functions/process-image
MODEL LOADED
Classes: {0: 'center', 1: 'gauge', 2: 'max', 3: 'min', 4: 'tip'}
Number of classes: 5


    Found GPU0 NVIDIA GeForce GTX 1080 Ti which is of cuda capability 6.1.
    Minimum and Maximum cuda capability supported by this version of PyTorch is
    (7.0) - (12.0)
    
    Please install PyTorch with a following CUDA
    configurations:  12.6 following instructions at
    https://pytorch.org/get-started/locally/
    
NVIDIA GeForce GTX 1080 Ti with CUDA capability sm_61 is not compatible with the current PyTorch installation.
The current PyTorch install supports CUDA capabilities sm_70 sm_75 sm_80 sm_86 sm_90 sm_100 sm_120.
If you want to use the NVIDIA GeForce GTX 1080 Ti GPU with PyTorch, please check the instructions at https://pytorch.org/get-started/locally/






AcceleratorError: CUDA error: no kernel image is available for execution on the device
Search for `cudaErrorNoKernelImageForDevice' in https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__TYPES.html for more information.
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.
