In [None]:
# !pip install kagglehub

In [None]:
# Import necessary libraries
import kagglehub
import json
import numpy as np
import matplotlib.pyplot as plt
import random
import shutil
import pandas as pd
from pathlib import Path
from PIL import Image, ImageDraw

In [None]:
help (kagglehub.dataset_download)

# Download the dataset directly from kaggle

In [None]:

# Download latest version
path = kagglehub.dataset_download("aalborguniversity/aau-zebrafish-reid")

In [4]:
dataset_shortname = "Zebrafish"
data_dir = Path("./dataset") / dataset_shortname
data_dir.mkdir(exist_ok=True, parents=True)

# check if the data directory exists
assert data_dir.exists(), f"Data directory {data_dir} does not exist"


In [5]:
if not any(data_dir.iterdir()):
    shutil.move(path, str(data_dir))
else:
    print("Data already downloaded in the directory:", data_dir)


# Clean the annotation

In [8]:
data_path = data_dir / "2"/"data"
annotation_path = data_dir /"2"/ "annotations.csv"

In [None]:
data_df = pd.read_csv(annotation_path, sep=";")
data_df.head()

In [None]:
#check if the length of the dataframe is equal to the number of images in the data directory
def check_image_count(data_df, data_path):
    if len(list(data_path.iterdir())) == len(data_df):
        print("Number of images in the data directory and the dataframe are equal", len(data_df))
    else:
        print("Number of images in the data directory and the dataframe are not equal")
        print("Number of images in the data directory:", len(list(data_path.iterdir())))
        print("Number of images in the dataframe:", len(data_df))

check_image_count(data_df, data_path)

In [None]:
combined_col = "Right,Turning,Occlusion,Glitch"
for idx, col in enumerate(combined_col.split(",")):
    data_df[col] = data_df[combined_col].apply(lambda x: x.split(",")[idx])

ws = data_df["Lower right corner X"] - data_df["Upper left corner X"]
hs = data_df["Lower right corner Y"] - data_df["Upper left corner Y"]

data_df['bbox'] = [[x, y, w, h] for x, y, w, h in list(zip(data_df["Upper left corner X"].values,
                    data_df["Upper left corner Y"].values,
                    ws,
                    hs))]

data_df["path"] = data_path / data_df["Filename"]
data_df["Object ID"] = data_df["Object ID"].astype(str)
data_df["label"] = data_df["Annotation tag"]
data_df["image_id"] = data_df['Filename'].apply(lambda x: x.split(".")[0])
data_df = data_df[["image_id", "label", "bbox", "path"]]

data_df =  data_df.groupby("image_id").agg({"label": list, "bbox": list, "path": list}).reset_index()
# data_df["path"	] = data_df["path"].apply(lambda x: x[0])
# data_df["label"] = data_df["label"].apply(lambda x: x[0])

data_df.head()


In [None]:
check_image_count(data_df, data_path)

# Convert annotations to Coco format

In [13]:
def convert_to_serializable(obj):
    """
    Convert numpy types to native Python types for JSON serialization.
    """
    if isinstance(obj, (np.int64, np.int32, np.float64, np.float32)):
        return int(obj) if isinstance(obj, (np.int64, np.int32)) else float(obj)
    elif isinstance(obj, list):
        return [convert_to_serializable(item) for item in obj]
    elif isinstance(obj, dict):
        return {key: convert_to_serializable(value) for key, value in obj.items()}
    return obj

In [14]:
def dataframe_to_coco(df, output_json_path):
    # Initialize COCO format dictionary
    coco_format = {
        "info": {},
        "licenses": [],
        "images": [],
        "annotations": [],
        "categories": []
    }

    # Create a mapping from label to category_id
    unique_labels = sorted(set([label for sublist in df['label'] for label in sublist]))
    label_to_id = {label: i + 1 for i, label in enumerate(unique_labels)}

    # Populate categories
    for label, cat_id in label_to_id.items():
        coco_format["categories"].append({
            "id": cat_id,
            "name": label,
            "supercategory": "none"
        })

    # Initialize annotation id
    annotation_id = 1

    # Iterate over each row in the dataframe
    for idx, row in df.iterrows():
        image_id = row['image_id']
        image_path = row['path'][0]

        # Open image to get width and height
        with Image.open(image_path) as img:
            width, height = img.size

        # Add image info to COCO format
        coco_format["images"].append({
            "id": image_id,
            "file_name": image_path.name,
            "width": width,
            "height": height
        })

        # Add annotations for each object in the image
        for label, bbox in zip(row['label'], row['bbox']):
            coco_format["annotations"].append({
                "id": annotation_id,
                "image_id": image_id,
                "category_id": label_to_id[label],
                "bbox": convert_to_serializable(bbox),  #  bbox is [x_min, y_min, width, height]
                "area": bbox[2] * bbox[3],  # width * height
                "iscrowd": 0
            })
            annotation_id += 1


    # Save the COCO format dictionary to a JSON file
    with open(output_json_path, 'w') as f:
        json.dump(coco_format, f, indent=4)

    print(f"COCO format JSON saved to {output_json_path}")

In [None]:
output_json_path = data_dir / "coco_format.json"
dataframe_to_coco(data_df, output_json_path)

# Visualize the dataset with boxes

In [142]:

def load_coco_annotations(json_path):
    """Load COCO annotations from a JSON file."""
    with open(json_path, 'r') as f:
        coco_data = json.load(f)
    return coco_data

def get_image_annotations(image_id, coco_data):
    """Get all annotations for a specific image ID."""
    annotations = []
    for ann in coco_data['annotations']:
        if ann['image_id'] == image_id:
            annotations.append(ann)
    return annotations

def visualize_image_with_boxes(image_path, annotations, coco_data):
    """Visualize an image with its bounding boxes."""
    # Load the image
    image = Image.open(image_path)

    # Create a drawing context
    draw = ImageDraw.Draw(image)

    # Draw bounding boxes and labels
    for ann in annotations:
        bbox = ann['bbox']  # COCO bbox format: [x_min, y_min, width, height]
        category_id = ann['category_id']
        category_name = next(
            (cat['name'] for cat in coco_data['categories'] if cat['id'] == category_id),
            'unknown'
        )

        # Draw the bounding box
        x_min, y_min, width, height = map(int, bbox)
        draw.rectangle([x_min, y_min, x_min + width, y_min + height], outline="red", width=2)

        # Draw the label
        label = f"{category_name}"
        draw.text((x_min, y_min - 15), label, fill="red")

    # Display the image
    plt.figure(figsize=(10, 10))
    plt.imshow(image)
    plt.axis('off')
    plt.show()

def visualize_random_images(coco_data, num_images=5):
    """Visualize random images with their bounding boxes."""
    images = coco_data['images']
    random.shuffle(images)  # Shuffle to pick random images

    for i in range(min(num_images, len(images))):
        image_info = images[i]
        image_id = image_info['id']
        image_path = data_path / image_info['file_name']  # Assuming file_name contains the full path

        # Get annotations for the image
        annotations = get_image_annotations(image_id, coco_data)

        # Visualize the image with bounding boxes
        print(f"Visualizing image: {image_path}")
        visualize_image_with_boxes(image_path, annotations, coco_data)

In [None]:
coco_data = load_coco_annotations(output_json_path)

# Visualize random images
visualize_random_images(coco_data, num_images=5)