In [None]:
import kagglehub
from transformers import Idefics2Processor, Idefics2ForConditionalGeneration, BitsAndBytesConfig

import cv2
import torch
from PIL import Image
import pandas as pd
import os
os.environ["YOLO_VERBOSE"] = "False"  # Prevent auto-downloads
from ultralytics import YOLO
from tqdm import tqdm
import json
iou_threshold = 0.5

# Evaluation of the model

In [None]:
path_private = kagglehub.dataset_download("stanislavlevendeev/hazmat-detection")
path_private


In [None]:
path_public = kagglehub.dataset_download("stanislavlevendeev/haz-mat-signs")
path_public

In [None]:
df_private = pd.read_csv( os.path.join(path_private, "labels_dataframe.csv"))
df_private["Checked"] = False
df_private.head()

In [None]:
df_public = pd.read_csv(os.path.join(path_public, "images_with_boxes.csv"))
df_public["Checked"] = False
df_public.head()

In [None]:
df_private.groupby("Absolute Frame").count()

In [None]:
filtered_df = df_private.groupby("Absolute Frame").filter(
    lambda group: group["Code"].nunique() > 1
)

# Display the filtered DataFrame
filtered_df.groupby("Absolute Frame").count()

In [None]:
df_un_numbers = pd.read_csv(os.path.join("./data", "un-number-labels.csv"))

In [None]:
model_detection =  YOLO('./data/yolo/yolo11x_earlystopping.pt')
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_detection.to(device)


In [None]:
processor = Idefics2Processor.from_pretrained( "HuggingFaceM4/idefics2-8b")
quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True,
    bnb_4bit_compute_dtype=torch.float16,
)
print('Processor loaded')
print('Device:', device)
model_ocr = Idefics2ForConditionalGeneration.from_pretrained(
     "HuggingFaceM4/idefics2-8b",
    torch_dtype=torch.float16,
    device_map=device,
    quantization_config=quantization_config,   
    # attn_implementation="flash_attention_2",
)
model_ocr = model_ocr.to(device)


In [None]:
prompt = """
Analyze the image and extract two key values:

    The UN number visible on the upper part of the placard.
    The code visible on the lower part of the placard, located below the horizontal line separating the two sections.

Both codes are printed in black. If either the upper or lower part cannot be detected, replace the missing value with "0." Output the extracted values as plain text, separated by a comma if multiple codes are present. No additional context or formatting is needed.

Input Examples:

    {98 {line} 4567}
    (not found, {line}, 8901)
    {101 {line} 3345}
    (not found, {line}, {not found})
    {45 {line} 2789}
    {22 {line} 5678}

Desired Output:

    98, 4567
    0, 8901
    101, 3345
    0, 0
    45, 2789
    22, 5678

Expected Transformation:

    For each input example, extract the UN number and the code below the horizontal line.
    If either part is missing (i.e., "not found"), replace it with 0.
    Output the extracted values as plain text, separated by a comma, without any additional context or formatting.
"""
messages = [
    {
        "role": "user",
        "content": [
            {
                "type": "text",
                "text": prompt},
            {"type": "image"},
        ],
    }
]
text = processor.apply_chat_template(messages, add_generation_prompt=True)

In [None]:

def get_un_number(image, bbox):
    pil_img = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    # crop image
    cropped_img = pil_img.crop(bbox)
    # OCR
    return perform_ocr(cropped_img)
def calculate_iou(box1, box2):
    """
    Calculate Intersection over Union (IoU) between two bounding boxes.
    box1 and box2 are in the format (x_min, y_min, x_max, y_max).
    """
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])

    # Compute the area of intersection
    intersection_area = max(0, x2 - x1) * max(0, y2 - y1)

    # Compute the area of both bounding boxes
    box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
    box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])

    # Compute the union area
    union_area = box1_area + box2_area - intersection_area

    # Avoid division by zero
    if union_area == 0:
        return 0

    # Compute IoU
    iou = intersection_area / union_area
    return iou   
def perform_ocr(image): 
    inputs = processor(images=image, text=text, return_tensors="pt").to(device)
    generated_text = model_ocr.generate(**inputs, max_new_tokens=500)
    generated_text = processor.batch_decode(generated_text, skip_special_tokens=True)[0]
    assistant_output = generated_text.split("Assistant:")[1].strip()
    
    # Split the output by comma to get the individual numbers
    numbers = assistant_output.split(",")

    numbers = [number.strip().replace('.','') for number in numbers]
    numbers.append('0000') 
    numbers.append('0000')
    un_number, hin_number = numbers[:2]
    return un_number, hin_number  

def get_bboxes(image_path):
    img = cv2.imread(image_path)
    results = model_detection(img)
    return results, img     

def get_predictions(image_path):
    image_name = image_path.split("\\")[-1]
    predictions = None
    pub = False
    if '_' in image_name:
        video_name = image_name.split('_')[0]
        frame_id = int(image_name.split('_')[1].split('.')[0])
        predictions = df_private[(df_private['Source'] == video_name + '.mp4') & (df_private['Relative Frame'] == int(frame_id))]
        
    else:
        image_id = image_name.split('.')[0]
        predictions = df_public[df_public['image_id'] == int(image_id)]
        pub = True
    return predictions, pub
def get_ground_truth(image_path, bbox: tuple[float, float, float, float]):
    predictions, pub = get_predictions(image_path)
    # Check if the predicted bbox matches any ground truth bbox
    if predictions is not None and not predictions.empty:
        # Check for the existence of 'XTL' or 'box_xtl' columns
        xtl_column = 'XTL' if 'XTL' in predictions.columns else 'box_xtl'
        ytl_column = 'YTL' if 'YTL' in predictions.columns else 'box_ytl'
        xbr_column = 'XBR' if 'XBR' in predictions.columns else 'box_xbr'
        ybr_column = 'YBR' if 'YBR' in predictions.columns else 'box_ybr'

        # Iterate through predictions to check IoU
        for idx, row in predictions.iterrows():
            ground_truth_bbox = (row[xtl_column], row[ytl_column], row[xbr_column], row[ybr_column])
            iou = calculate_iou(bbox, ground_truth_bbox)
            
            if iou > iou_threshold:  # IoU threshold
                # Update the 'Checked' column in the original DataFrame
                if pub:
                    df_public.loc[idx, 'Checked'] = True
                else:
                    df_private.loc[idx, 'Checked'] = True
                code = row['Code'] if 'Code' in row else row["code"]
                return code.split('/')

    # If no valid prediction is found, return "code"
    return None
def get_description(un_number):
    # if unnumber can not be converted to int
    try:
        un_number = int(un_number)
    except ValueError:
        return None
    description = df_un_numbers[df_un_numbers['number'] == un_number]
    if not description.empty:
        return description['description'].values[0]
    return None
def store_result(res):
    print('Saving evaluation')
    with open('full-pipeline-yolo-evaluationnn.json', 'w') as f:
        json.dump(res, f)
    print('Evaluation saved')        
    

In [None]:
test_metrics = {"metrics": [], "success": 0, "failure": 0}

def test_path(path):
    # list all png jpeg files in the path
    files = [f for f in os.listdir(path) if f.endswith('.png') or f.endswith('.jpg')]
    with tqdm(total=len(files), desc="Processing images", unit="image") as pbar:
        for file in files:
            image_path = os.path.join(path, file)
            [results, img] = get_bboxes(image_path)
            if results is not None:
                for result in results:  # Assuming result is a list of bounding boxes
                    for bbox in result.boxes:
                        bbox = bbox.xyxy[0].tolist()
                        ground_truth = get_ground_truth(image_path, bbox)
                        un_number, hin_number = get_un_number(img, bbox)
                        if ground_truth is not None:
                            if ground_truth[1] == hin_number and ground_truth[0] == un_number:
                                test_metrics["success"] += 1
                            else:
                                test_metrics["failure"] += 1
                        else:
                            test_metrics["failure"] += 1
                        test_metrics["metrics"].append({
                            "image": image_path,
                            "bbox": bbox,
                            "ground_truth": ground_truth,
                            "prediction": [un_number, hin_number]
                        })
            preds, pub = get_predictions(image_path)
            if preds is not None:
                for idx, row in preds.iterrows():
                    if not row['Checked']:
                        test_metrics["failure"] += 1
                        test_metrics["metrics"].append({
                            "image": image_path,
                            "bbox" : None,
                            "actual_bbox": [row['XTL'], row['YTL'], row['XBR'], row['YBR']],
                            "ground_truth": [row['Code']],
                            "prediction": [0, 0]
                        })
            # Update the progress bar and description
            pbar.set_description(f"Processing images | Success: {test_metrics['success']} | Failure: {test_metrics['failure']}")
            pbar.update(1)
        
        

In [None]:
print(get_description('1005'))

In [None]:
import gradio as gr
import numpy as np
def analyze_image(img):
        # Convert the uploaded image to OpenCV format (BGR)
    img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
    result_out = []
    un_numbers = []
    # Debug: Print the shape of the image
    results = model_detection(img)
    if results is not None:
        for result in results:  # Assuming result is a list of bounding boxes
            for bbox in result.boxes:
                bbox = bbox.xyxy[0].tolist()
                # draw bounding box
                un_number, hin_number = get_un_number(img, bbox)
                desc = get_description(hin_number)
                if desc is not None:
                    # greeen color
                    cv2.rectangle(img, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), (0, 255, 0), 2)
                    # text numnber
                    cv2.putText(img, f"{len(un_numbers)}", (int(bbox[0]), int(bbox[1] - 10)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
                    un_numbers.append({
                        "hin_number": un_number,
                        "un_number": hin_number,
                        "description": desc
                    })
                else:
                    #red color
                    cv2.rectangle(img, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), (0, 0, 255), 2)
    # convert image to pil image   
    img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))    
    return img, un_numbers
demo = gr.Interface(fn=analyze_image, inputs="image", outputs=["image","json"])
demo.launch()

In [None]:
path = os.path.join(path_public, "images")
print(path)
test_path(path)
# test_path(os.path.join("/home", "s3544648", "data", "yolo", "images", "val"))
# test_path(os.path.join("/home", "s3544648", "data", "yolo", "images", "train"))
# test_path(os.path.join(path_public, "images"))
store_result()

In [None]:
print(test_metrics["metrics"])
store_result(test_metrics)

In [None]:
# get df_public where checked flag is true
df_public_checked = df_public[df_public["Checked"] == True]
df_public_checked.head()

# Reading the evaluation metrics

In [None]:
# read json
data =  None
with open('./data/evaluation_full_pipeline_yolo.json') as f:
  data = json.load(f)
# make df of mteriss property
df = pd.DataFrame(data['metrics'])
# add description column
df['description'] = df['prediction'].apply(lambda x: get_description(x[1]))
df.head()
df.shape

## Public Leaderboard

In [None]:
# sort df for images that consist public
df_public = df[df['image'].str.contains('public')]
df_public.shape[0]

In [None]:
# look how many of them satisfy ground truth = prediction
df_correct = df_public[df_public['ground_truth'] == df_public['prediction']]
df_correct.shape

In [None]:
# print ratio of correct predictions
print(f'Ratio of correct predictions: {df_correct.shape[0] / df_public.shape[0]}')

## Private Leaderboard

In [None]:
# sort df for images that consist public
df_private = df[df['image'].str.contains('_')]
df_private.shape[0]

In [None]:
# look how many of them satisfy ground truth = prediction
df_correct = df_private[df_private['ground_truth'] == df_private['prediction']]
df_correct.shape

In [None]:
# print ratio of correct predictions
print(f'Ratio of correct predictions: {df_correct.shape[0] / df_private.shape[0]}')

### Custom metric

In [None]:
df_predicted = df[df['prediction'] ==  None or df['prediction'] == ["0", "0"]]
df_predicted.shape[0]