# Fulmine LABS Eyball
## Overview
This Python code implements a class wrapper around an Anomaly Detection model which can be used to visually check if an image is anomalous or not. The supported architecture for this model is 'Siamese Network'.
In order to perform reduce false negatives the code compares the image against a jury of randomly selected known good images of configurable size 'jury_size'.
If the number of jurors who vote that the image is simlar to the chosen known good image is below a configurable 'threshold' then the code returns a verdict of 'Anomalous', otherwise it returns a verdict of 'Normal'.
If an image path is not specified but screen coordinates are, these will be used instead, enabling direct integration with automated visual checking scripts.

One goal is to use this class as part of automating visual checking of a medical image (PACS) production pipeline, although it could theoretically visually check any type of image on which the model has been trained.

It also has the capability of describing the images, using GPT-4 Turbo Vision, if an OpenAI key is supplied in the 'Eyball-OpenAI_key.txt' file.

## Initialize the Eyball class

predictor = ModelPredictor(siamese_model_path, known_good_images_folder, Eyball_key, threshold, jury_size)

## Example calls

role = "You are a radiology PACS test engineer, analyzing PACS or test process related image anomalies"

image_description_directive = "If the image is obviously not a medical image, state *** ANOMALOUS ***. If it is a typical medical image as acquired by an imaging modality with no additions or enhancements, state *** NORMAL ***. Otherwise, if it is a medical image but it also clearly has textual overlays or annotations or digital or image processing artifacts that could have been added by the PACS image viewer technology, describe those features and append *** ANOMALOUS ***."

verdict = predictor.predict_siamese(test_image_path)

actual_description = predictor.describe_image(test_image_path, None, role, image_description_directive)

## Author
Duncan Henderson
Fulmine Labs LLC


In [1]:
import numpy as np
import os
import io
import cv2
from PIL import Image, ImageGrab
import logging
import random
import base64
import requests
from openai import OpenAI

import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Lambda
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report






In [2]:
known_good_images_folder = r"D:\training_images\test\valid"
siamese_model_path = r'models\lung_ct_siamese_network_weights_043024.h5'
Eyball_key=r'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'

jury_size=12
threshold = 0.5

# LLM prompts
role = "You are a radiology PACS test engineer, analyzing PACS or test process related image anomalies"
image_description_directive = "If the image is obviously not a medical image, state *** ANOMALOUS ***. If it is a typical medical image as acquired by an imaging modality with no additions or enhancements, state *** NORMAL ***. Otherwise, if it is a medical image but it also clearly has textual overlays or annotations or digital or image processing artifacts that could have been added by the PACS image viewer technology, describe those features and append *** ANOMALOUS ***."


In [9]:
class ModelPredictor:

    def __init__(self, siamese_model_path, known_good_images_folder, api_key, threshold=0.5, jury_size=12):
        self.siamese_model_path = siamese_model_path
        self.known_good_images_folder = known_good_images_folder
        self.api_key = api_key
        self.client = OpenAI(api_key=api_key)
        self.siamese_model = self.load_siamese_model(siamese_model_path)
        self.threshold = threshold
        self.jury_size = jury_size
        self.known_good_images = self.preload_known_good_images()
        self.headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.api_key}"
        }

    def preload_known_good_images(self):
        # Your existing method to preload images
        print("Preloading known good images...")
        image_paths = []
        for root, dirs, files in os.walk(self.known_good_images_folder):
            for file in files:
                if file.lower().endswith(('.png', '.jpg', '.jpeg')):
                    full_path = os.path.join(root, file)
                    image_paths.append(full_path)
        return image_paths
        # Cache known good images if needed here
        
    # Continue to define ModelPredictor class
    def load_siamese_model(self, siamese_model_path):
        # Define the base network architecture
        def initialize_base_network(input_shape):
            input = Input(shape=input_shape)
            x = Conv2D(64, (3, 3), activation='relu')(input)
            x = MaxPooling2D((2, 2))(x)
            x = Conv2D(128, (3, 3), activation='relu')(x)
            x = MaxPooling2D((2, 2))(x)
            x = Flatten()(x)
            x = Dense(128, activation='relu')(x)
            return Model(input, x)

        # Rebuild the Siamese network architecture
        input_shape = (152, 152, 1)
        base_network = initialize_base_network(input_shape)
        input_a = Input(shape=input_shape)
        input_b = Input(shape=input_shape)
        processed_a = base_network(input_a)
        processed_b = base_network(input_b)
        distance = Lambda(lambda tensors: tf.sqrt(tf.reduce_sum(tf.square(tensors[0] - tensors[1]), axis=1, keepdims=True)))([processed_a, processed_b])
        model = Model([input_a, input_b], distance)
        model.load_weights(siamese_model_path)  # Load the saved model or weights
        print("Siamese model loaded successfully.")
        return model

    def predict_siamese(self, image_path=None, coordinates=None):
        if coordinates:
            # Capture the screen if coordinates are provided
            captured_image = self.capture_screen(coordinates)
            # Convert the captured image to grayscale and resize it
            image = cv2.cvtColor(captured_image, cv2.COLOR_BGR2GRAY)
            image = cv2.resize(image, (152, 152))
        elif image_path:
            # Process the image from file path
            image = self.preprocess_image(image_path)
        else:
            raise ValueError("Either image_path or coordinates must be provided.")
        
        image = np.expand_dims(image, axis=0)  # Adjust as necessary for the model input
    
        # Randomly select a subset of known good images to compare against
        selected_good_images = random.sample(self.known_good_images, min(self.jury_size, len(self.known_good_images)))
        votes = []
    
        for known_good_image_path in selected_good_images:
            known_good_image = self.preprocess_image(known_good_image_path)
            known_good_image = np.expand_dims(known_good_image, axis=0)  # Adjust as necessary
    
            # Prepare the pair
            image_pair = [image, known_good_image]
    
            # Make prediction
            prediction_distance = self.siamese_model.predict(image_pair)
            is_similar = prediction_distance < self.threshold  # Threshold to determine similarity
    
            # Debugging output
            print(f"Comparing {image_path if image_path else 'screen capture'} with {known_good_image_path}: Distance = {prediction_distance}, Similar = {is_similar}")
            votes.append(is_similar)
    
        # Calculate the majority vote
        num_similar = sum(votes)
        majority_similar = num_similar > len(votes) / 2
        print(f"Total votes for 'Similar': {num_similar}/{len(votes)}. Final verdict: {'Normal' if majority_similar else 'Anomalous'}")
    
        return majority_similar

    def compare_to_known_images(self, captured_image, threshold=0.5, jury_size=3):
        processed_captured_image = self.preprocess_data(captured_image)
        verdicts = []

        for _ in range(jury_size):
            comparison_image = np.random.choice(self.known_good_images)
            prediction = self.siamese_model.predict([processed_captured_image, comparison_image])
            verdicts.append(prediction < threshold)

        # Determine if the majority verdict is 'similar' or 'dissimilar'
        final_verdict = sum(verdicts) >= jury_size / 2
        return final_verdict
    
    def evaluate_image(self, coordinates):
        # Capture the image from screen coordinates
        captured_image = self.capture_screen(coordinates)

        # Compare to known images to get a verdict
        is_normal = self.compare_to_known_images(captured_image)

        return is_normal

    def preprocess_image(self, img_path: str, target_size=(152, 152)):
        try:
            image = load_img(img_path, target_size=target_size, color_mode='grayscale')
            image = img_to_array(image)
            image /= 255.0  # Normalize to [0, 1]
            if image.shape[-1] == 1:  # Check if image is grayscale
                image = image.squeeze(-1)  # Remove the channels dimension if grayscale
        except FileNotFoundError as e:
            print(f"Failed to open image at {img_path}: {e}")
            return None
        except Exception as e:
            print(f"Error processing image at {img_path}: {e}")
            return None
        return image 

    def capture_screen(self, coordinates):
        """ Capture the screen area defined by coordinates. """
        screenshot = ImageGrab.grab(bbox=coordinates)
        return np.array(screenshot, dtype=np.uint8)  # Ensure dtype is uint8

    def encode_image(self, image):
        """ Encode image array to base64 string. """
        if isinstance(image, np.ndarray):
            # Convert numpy array to PIL Image if it's not already one
            image = Image.fromarray(image.astype('uint8'), 'RGB')
        buffer = io.BytesIO()
        image.save(buffer, format="JPEG")
        return base64.b64encode(buffer.getvalue()).decode('utf-8')

    # def describe_image(self, image_path=None, coordinates=None, role_description="User", image_description_directive="Describe the image"):
    #     """ Describe an image either from a file or from screen coordinates. """
    #     if image_path:
    #         image = self.preprocess_image(image_path)
    #     elif coordinates:
    #         image = self.capture_screen(coordinates)
    #     else:
    #         raise ValueError("Either image_path or coordinates must be provided.")
    
    #     base64_image = self.encode_image(image)
    
    #     payload = {
    #         "model": "gpt-4-turbo",
    #         "messages": [
    #             {
    #                 "role": "system",
    #                 "content": role_description
    #             },
    #             {
    #                 "role": "user",
    #                 "content": [
    #                     {
    #                         "type": "text",
    #                         "text": image_description_directive
    #                     },
    #                     {
    #                         "type": "image_url",
    #                         "image_url": {
    #                             "url": f"data:image/jpeg;base64,{base64_image}"
    #                         }
    #                     }
    #                 ]
    #             }
    #         ],
    #         "max_tokens": 300
    #     }
    
    #     response = requests.post("https://api.openai.com/v1/chat/completions", headers=self.headers, json=payload)
    #     if response.status_code != 200:
    #         print("Error from API:", response.status_code, response.text)
    #         return None
    
    #     try:
    #         description = response.json()['choices'][0]['message']['content']
    #         print("Image Description:", description)
    #         return description
    #     except KeyError as e:
    #         print("Failed to parse API response:", response.json())
    #         raise e

    def preprocess_and_encode_image(self, image_path=None, coordinates=None):
        """ Load an image from a path or capture screen, preprocess, and encode it. """
        if image_path:
            image = self.preprocess_image(image_path)
        elif coordinates:
            image = self.capture_screen(coordinates)
            image = self.preprocess_image(image)  # Assuming preprocess can handle numpy arrays directly
        else:
            raise ValueError("Either image_path or coordinates must be provided.")
    
        base64_image = self.encode_image(image)
        return base64_image




    def describe_image(self, image_path=None, coordinates=None, role_description="User", image_description_directive="Describe the image"):
        if image_path:
            image = self.preprocess_image(image_path)
        elif coordinates:
            image = self.capture_screen(coordinates)
        else:
            raise ValueError("Either image_path or coordinates must be provided.")
    
        if image is None:
            raise ValueError("Failed to load or process image.")
    
        # try:
        #     #print("Image dtype before conversion:", image.dtype)  # Debugging output
        #     #print("Image shape before conversion:", image.shape)  # Debugging output
        #     image = Image.fromarray(image.astype('uint8'), 'RGB')
        # except ValueError as e:
        #     print("Error during image encoding:", e)
        #     return None
    
        base64_image = self.encode_image(image)
    
    
        # Ensure image is properly formatted as a numpy array if not done in preprocess
        if not isinstance(image, np.ndarray):
            raise ValueError("Processed image must be a numpy array.")
    
        # Encode the processed image to base64
        base64_image = self.encode_image(image)
        
        # Construct payload
        payload = {
            "model": "gpt-4-turbo",
            "messages": [
                {
                    "role": "system",
                    "content": role_description
                },
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": image_description_directive
                        },
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/jpeg;base64,{base64_image}"
                            }
                        }
                    ]
                }
            ],
            "max_tokens": 300
        }
    
        # Send request
        response = requests.post("https://api.openai.com/v1/chat/completions", headers=self.headers, json=payload)
        if response.status_code != 200:
            print("Error from API:", response.status_code, response.text)
            return None
    
        try:
            description = response.json()['choices'][0]['message']['content']
            print("Image Description:", description)
            return description
        except KeyError as e:
            print("Failed to parse API response:", response.json())
            raise e


In [10]:
predictor = ModelPredictor(siamese_model_path, known_good_images_folder, Eyball_key, threshold, jury_size)

Siamese model loaded successfully.
Preloading known good images...


In [11]:
# Capture and evaluate an area of the screen ...

left = 10
right = 500
top = 10
bottom = 200

predictor.predict_siamese(coordinates=(left, top, right, bottom))

predictor.describe_image(coordinates=(left, top, right, bottom), role_description=role, image_description_directive=image_description_directive)

Comparing screen capture with D:\training_images\test\valid\zoomed\randomized_wl\c7de3c88-90f9-41a7-95b7-e6827e99d95c_0.png: Distance = [[53.70162]], Similar = [[False]]
Comparing screen capture with D:\training_images\test\valid\zoomed\randomized_wl\13231a60-86f8-4f4a-b695-f4b252425385_0.png: Distance = [[53.70162]], Similar = [[False]]
Comparing screen capture with D:\training_images\test\valid\dummy_class\2eb738fa-250b-422b-8ef7-a5a174582c21.png: Distance = [[53.70162]], Similar = [[False]]
Comparing screen capture with D:\training_images\test\valid\randomized_wl\cropped\e4a3e90d-e038-41ca-a549-adc72c61fb71_1.png: Distance = [[53.60519]], Similar = [[False]]
Comparing screen capture with D:\training_images\test\valid\zoomed\randomized_wl\107ff504-d5ac-44fb-94ac-bb36f0651db1_1.png: Distance = [[53.70162]], Similar = [[False]]
Comparing screen capture with D:\training_images\test\valid\zoomed\randomized_wl\c7fd2b40-c72a-4322-92bf-04a82b00cb9b_1.png: Distance = [[53.70162]], Similar = 

'*** ANOMALOUS ***\n\nThis image is a screenshot of a user interface from JupyterLab, a web-based interactive development environment for notebooks, code, and data. It is not a medical image acquired by any imaging modality, hence it is classified as anomalous in the context of medical imagery analysis.'

In [None]:
# Or (from here on) pass in a captured and saved file

test_image_path = r'C:\temp\engineer_typing3.png'
print("Model predicts", predictor.predict_siamese(image_path=test_image_path))

actual_description = predictor.describe_image(image_path=test_image_path, role_description=role, image_description_directive=image_description_directive)


In [None]:
test_image_path = r'D:\Custom_invalid\cat.jpg'
predictor.predict_siamese(test_image_path)

actual_description = predictor.describe_image(test_image_path, None, role, image_description_directive)


In [None]:
test_image_path = r'D:\custom_test_valid\internet_27f6574b96deb965217cff1aac35fc_gallery.jpg'
print("Model predicts", predictor.predict_siamese(test_image_path))

actual_description = predictor.describe_image(test_image_path, None, role, image_description_directive)


In [None]:
test_image_path = r'D:\custom_test_valid\istockphoto-493741910-612x612.jpg'
print("Model predicts", predictor.predict_siamese(test_image_path))

actual_description = predictor.describe_image(test_image_path, None, role, image_description_directive)


In [None]:
test_image_path = r'D:\custom_test_valid\low-dose-lung-cancer-screening-with-lung-nodules.jpg'
print("Model predicts", predictor.predict_siamese(test_image_path))

actual_description = predictor.describe_image(test_image_path, None, role, image_description_directive)


In [None]:
test_image_path = r'D:\custom_invalid\istockphoto-with_arrow.jpg'
print("Model predicts", predictor.predict_siamese(test_image_path))

actual_description = predictor.describe_image(test_image_path, None, role, image_description_directive)


In [None]:
test_image_path = r'D:\custom_invalid\Lung_abscess_-_CT_with_overlay.jpg'
print("Model predicts", predictor.predict_siamese(test_image_path))

actual_description = predictor.describe_image(test_image_path, None, role, image_description_directive)


In [None]:
test_image_path = r'D:\Custom_invalid\augmented_0abe42cc-623a-46f2-91ee-be4f339ff73b.png'
print("Model predicts", predictor.predict_siamese(test_image_path))

actual_description = predictor.describe_image(test_image_path, None, role, image_description_directive)


In [None]:
test_image_path = r'C:\temp\medical_image_zoomed_more_resized_modified_aspect_ratio_hairlines.png'
print("Model predicts", predictor.predict_siamese(test_image_path))

actual_description = predictor.describe_image(test_image_path, None, role, image_description_directive)


In [None]:
test_image_path = r'D:\Custom_invalid\internet-gettyimages-1320918955-612x612_small_label.jpg'
print("Model predicts", predictor.predict_siamese(test_image_path))


actual_description = predictor.describe_image(test_image_path, None, role, image_description_directive)


In [None]:
test_image_path = r'D:\Custom_test_valid\internet-gettyimages-1322138871-612x612.jpg'
print("Model predicts", predictor.predict_siamese(test_image_path))

actual_description = predictor.describe_image(test_image_path, None, role, image_description_directive)


In [None]:
def evaluate_methods_simplified(base_folder, sample_size=40, jury_size=12, role="User", image_description_directive="Describe the image"):
    valid_folder = os.path.join(base_folder, 'valid')
    invalid_folder = os.path.join(base_folder, 'invalid')

    # Ensure directories exist
    if not os.path.exists(valid_folder) or not os.path.exists(invalid_folder):
        raise ValueError("One or more image directories do not exist.")

    # List and sample images
    valid_images = random.sample(os.listdir(valid_folder), min(sample_size, len(os.listdir(valid_folder))))
    invalid_images = random.sample(os.listdir(invalid_folder), min(sample_size, len(os.listdir(invalid_folder))))

    # Initialize predictions for both methods
    predictions_siamese = []
    predictions_gpt = []

    # Initialize actual values
    actuals = [1] * len(valid_images) + [0] * len(invalid_images)  # 1 for normal, 0 for anomalous

    # Process sampled valid and invalid images
    for filename in valid_images + invalid_images:
        print ("Filename", filename)
        folder = valid_folder if filename in valid_images else invalid_folder
        image_path = os.path.join(folder, filename)

        # Siamese Network Prediction
        siamese_result = predictor.predict_siamese(image_path)
        predictions_siamese.append(siamese_result)
        print ("Siamese result", siamese_result)

        # GPT Vision Direct Analysis Prediction
        description_result = predictor.describe_image(image_path, None, role, image_description_directive)
        predictions_gpt.append('NORMAL' in description_result)
        print ("API result", description_result)

    # Assuming that True/False predictions from Siamese network are correct and just need flattening:
    predictions_siamese = [int(pred.flatten()[0]) for pred in predictions_siamese]
    
    # Convert GPT predictions from True/False to 0/1 as well:
    predictions_gpt = [int(pred) for pred in predictions_gpt]
    
    # Recalculate the metrics:
    accuracy_s = accuracy_score(actuals, predictions_siamese)
    precision_s = precision_score(actuals, predictions_siamese)
    recall_s = recall_score(actuals, predictions_siamese)
    f1_s = f1_score(actuals, predictions_siamese)
    
    accuracy_g = accuracy_score(actuals, predictions_gpt)
    precision_g = precision_score(actuals, predictions_gpt)
    recall_g = recall_score(actuals, predictions_gpt)
    f1_g = f1_score(actuals, predictions_gpt)
    
    print('Evaluation Results - Siamese Model:', {
        'accuracy': accuracy_s,
        'precision': precision_s,
        'recall': recall_s,
        'f1': f1_s
    })
    print('Evaluation Results - GPT Model:', {
        'accuracy': accuracy_g,
        'precision': precision_g,
        'recall': recall_g,
        'f1': f1_g
    })

    return {
        "siamese": {"accuracy": accuracy_s, "precision": precision_s, "recall": recall_s, "f1": f1_s},
        "gpt": {"accuracy": accuracy_g, "precision": precision_g, "recall": recall_g, "f1": f1_g}
    }




In [None]:
# Example call
base_folder = r'D:\model_comparison_test'
sample_size = 40
try:
    results = evaluate_methods_simplified(base_folder, sample_size, jury_size, role, image_description_directive)
    print("Evaluation Results:", results)
except Exception as e:
    print("Error during evaluation:", str(e))
