<a href="https://colab.research.google.com/github/donbcolab/composable_vlms/blob/main/notebooks/vision_model_evaluation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -q pycocotools Pillow tqdm transformers torch wandb timm

In [None]:
import os
from google.colab import userdata
import wandb

# setting the WANDB_API_KEY environment variable
os.environ['WANDB_API_KEY'] = userdata.get('WANDB_API_KEY')

In [None]:
# # Step 1: List the contents of the current directory
# print("Listing directory contents:")
# !ls

# # Step 2: Verify the file name and ensure it matches in the command
# print("Running vision-model-evaluation.py:")
# !python vision-model-evaluation.py


## Block 1: Imports and Configuration

In [None]:
import os
import json
import requests
import zipfile
import logging
import torch
import wandb
from PIL import Image, ImageDraw
from tqdm import tqdm
from pycocotools.coco import COCO
from sklearn.metrics import precision_score, recall_score, f1_score, average_precision_score
from transformers import AutoModelForObjectDetection, AutoModelForImageSegmentation, AutoProcessor

# Set up logging
logging.basicConfig(filename='model_evaluation.log', level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
logging.getLogger().addHandler(console_handler)

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logging.info(f"Using device: {device}")

# Configuration
CONFIG = {
    "annotation_file": '/content/annotations/instances_train2017.json',
    "image_directory": '/content/train2017/',
    "num_images": 500,
    "batch_size": 8,  # Reduce batch size to manage GPU memory
    "detection_model_name": "facebook/detr-resnet-50",
    "segmentation_model_name": "facebook/detr-resnet-50-panoptic",
}

def load_image(image_info):
    image_path = os.path.join(CONFIG['image_directory'], image_info['file_name'])
    logging.info(f"Loading image from {image_path}")
    return Image.open(image_path)


## Block 2: Dataset Preparation

In [None]:
def download_file(url, dest_path):
    logging.info(f"Downloading from {url}...")
    response = requests.get(url)
    response.raise_for_status()  # Raise an error for bad status codes
    with open(dest_path, "wb") as f:
        f.write(response.content)
    logging.info(f"Downloaded to {dest_path}")

def extract_zip(file_path, extract_to):
    logging.info(f"Extracting {file_path}...")
    with zipfile.ZipFile(file_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
    os.remove(file_path)
    logging.info(f"Extracted to {extract_to}")

def download_coco_dataset():
    annotation_url = "http://images.cocodataset.org/annotations/annotations_trainval2017.zip"
    annotation_zip = "/content/annotations.zip"
    download_file(annotation_url, annotation_zip)
    extract_zip(annotation_zip, '/content/')

    image_url = "http://images.cocodataset.org/zips/train2017.zip"
    image_zip = "/content/train2017.zip"
    download_file(image_url, image_zip)
    extract_zip(image_zip, '/content/')

def prepare_data():
    if not os.path.exists(CONFIG["annotation_file"]):
        download_coco_dataset()

    coco = COCO(CONFIG["annotation_file"])
    catIds = coco.getCatIds(catNms=['person', 'car'])
    imgIds = coco.getImgIds(catIds=catIds)
    images = coco.loadImgs(imgIds[:CONFIG["num_images"]])

    return images, coco


## Block 3: Model Loading

In [None]:
def load_models():
    try:
        logging.info("Loading detection model...")
        detection_model = AutoModelForObjectDetection.from_pretrained(CONFIG["detection_model_name"]).to(device)
        detection_processor = AutoProcessor.from_pretrained(CONFIG["detection_model_name"])

        logging.info("Loading segmentation model...")
        segmentation_model = AutoModelForImageSegmentation.from_pretrained(CONFIG["segmentation_model_name"]).to(device)
        segmentation_processor = AutoProcessor.from_pretrained(CONFIG["segmentation_model_name"])

        return detection_model, detection_processor, segmentation_model, segmentation_processor
    except Exception as e:
        logging.error(f"Error loading models: {e}")
        raise


## Block 4: Model Inference

In [None]:
def model_inference(model, processor, images, task="detection"):
    inputs = processor(images=images, return_tensors="pt").to(device)
    try:
        outputs = model(**inputs)
    except torch.cuda.OutOfMemoryError:
        logging.error("CUDA out of memory. Reduce the batch size and try again.")
        return None
    except Exception as e:
        logging.error(f"Error during model inference: {e}")
        return None

    if task == "detection":
        results = processor.post_process_object_detection(outputs, target_sizes=[(img.height, img.width) for img in images])
    elif task == "segmentation":
        results = processor.post_process_panoptic_segmentation(outputs, target_sizes=[(img.height, img.width) for img in images])

    return results


## Block 5: Metrics Calculation

In [None]:
def calculate_iou(box1, box2):
    x1, y1, x2, y2 = box1
    x3, y3, x4, y4 = box2

    xi1, yi1 = max(x1, x3), max(y1, y3)
    xi2, yi2 = min(x2, x4), min(y2, y4)

    intersection = max(0, xi2 - xi1) * max(0, yi2 - yi1)
    box1_area = (x2 - x1) * (y2 - y1)
    box2_area = (x4 - x3) * (y4 - y3)

    union = box1_area + box2_area - intersection

    return intersection / union if union > 0 else 0

def calculate_metrics(pred_boxes, gt_boxes, iou_threshold=0.5):
    matches = []
    for pred in pred_boxes:
        match = any(calculate_iou(pred, gt) > iou_threshold for gt in gt_boxes)
        matches.append(1 if match else 0)

    precision = precision_score([1] * len(gt_boxes), matches, zero_division=0)
    recall = recall_score([1] * len(gt_boxes), matches, zero_division=0)
    f1 = f1_score([1] * len(gt_boxes), matches, zero_division=0)
    ap = average_precision_score([1] * len(gt_boxes), matches)

    return precision, recall, f1, ap


## Block 6: Results Visualization

In [None]:
def visualize_results(image, results, model_name, image_id):
    draw = ImageDraw.Draw(image)
    for result in results:
        if 'bbox' in result:
            bbox = result['bbox']
            draw.rectangle(bbox, outline="red", width=2)
            draw.text((bbox[0], bbox[1]), result['label'], fill="red")

    image_path = f"{model_name}_result_{image_id}.jpg"
    try:
        image.save(image_path)
        logging.info(f"Saved visualization: {image_path}")
    except IOError as e:
        logging.error(f"Error saving visualization: {e}")

    return image_path


## Block 7: Model Evaluation

In [None]:
def process_batch(batch, model, processor, coco, task):
    logging.info(f"Processing batch of size {len(batch)} for task: {task}")
    batch_images = [load_image(img) for img in batch]
    results = model_inference(model, processor, batch_images, task)
    if results is None:
        logging.error("Skipping batch due to CUDA out of memory error.")
        return []

    batch_results = []
    for img, result in zip(batch, results):
        gt_boxes = [ann['bbox'] for ann in coco.loadAnns(coco.getAnnIds(imgIds=img['id']))]
        pred_boxes = result['boxes'].tolist()
        precision, recall, f1, ap = calculate_metrics(pred_boxes, gt_boxes)
        vis_path = visualize_results(load_image(img), result, task, img['id'])
        batch_results.append({
            "image_id": img['id'],
            "model": task,
            "precision": precision,
            "recall": recall,
            "f1": f1,
            "ap": ap,
            "visualization": vis_path
        })
    logging.info(f"Completed processing batch for task: {task}")
    return batch_results

def evaluate_models(images, coco, models):
    detection_model, detection_processor, segmentation_model, segmentation_processor = models
    results = []

    for i in tqdm(range(0, len(images), CONFIG["batch_size"]), desc="Processing batches"):
        batch = images[i:i+CONFIG["batch_size"]]

        logging.info(f"Running detection on batch {i//CONFIG['batch_size']+1}")
        results.extend(process_batch(batch, detection_model, detection_processor, coco, "detection"))

        logging.info(f"Running segmentation on batch {i//CONFIG['batch_size']+1}")
        results.extend(process_batch(batch, segmentation_model, segmentation_processor, coco, "segmentation"))

    return results


## Block 8: Main Function and JSON Validation

In [None]:
def validate_json_file(file_path):
    try:
        with open(file_path, 'r') as f:
            json.load(f)
        return True
    except json.JSONDecodeError as e:
        logging.error(f"JSONDecodeError: {e}")
        return False

def load_prepared_data(flag_path, annotation_file):
    with open(flag_path, 'r') as f:
        data = json.load(f)
        images = data["images"]
        coco = COCO(annotation_file)
    return images, coco

def save_prepared_data(flag_path, images):
    with open(flag_path, 'w') as f:
        json.dump({"images": images}, f)

In [None]:
def main():
    wandb.init(project="vision-models-evaluation", config=CONFIG, resume=True)

    data_prepared_flag = '/content/data_prepared.flag'
    if os.path.exists(data_prepared_flag) and validate_json_file(data_prepared_flag):
        logging.info("Data preparation already done, skipping...")
        images, coco = load_prepared_data(data_prepared_flag, CONFIG["annotation_file"])
    else:
        logging.info("Starting data preparation")
        images, coco = prepare_data()
        save_prepared_data(data_prepared_flag, images)

    models_loaded_flag = '/content/models_loaded.flag'
    if os.path.exists(models_loaded_flag) and validate_json_file(models_loaded_flag):
        logging.info("Models already loaded, skipping...")
    else:
        logging.info("Loading models")
        models = load_models()
        with open(models_loaded_flag, 'w') as f:
            json.dump({"models_loaded": True}, f)

    logging.info("Starting model evaluation")
    results = evaluate_models(images, coco, models)

    # Log results to wandb
    table = wandb.Table(dataframe=pd.DataFrame(results))
    wandb.log({"results": table})

    for result in results:
        wandb.log({f"{result['model']}_visualization": wandb.Image(result['visualization'])})

    logging.info("Evaluation complete")
    wandb.finish()

if __name__ == "__main__":
    main()
