In [3]:
import os
import json
import shutil
import pandas as pd
import yaml
import random
from PIL import Image

# Data preparation

This notebook contains all data preparations steps for different models used.

## 1. Data preparation for using YOLO bounding boxes

We need to convert and reorganize the dataset into YOLO format. This involves:  
1. **Creating the required directory structure** (`images/` and `labels/` for `train`, `val`, `test`).  
2. **Extracting bounding boxes** from JSON annotations, filtering only rectangles.  
3. **Normalizing coordinates** to YOLO format (`class x_center y_center width height`).  
4. **Copying images and saving annotations** in the correct locations.  


### 1.1. Dataset containing only fractures

In [None]:
# Define dataset paths
raw_data_dir = "raw data"
yolo_data_dir = "data_object_detection_yolo"

# Create YOLO folder structure
for split in ["train", "val", "test"]:
    os.makedirs(os.path.join(yolo_data_dir, "images", split), exist_ok=True)
    os.makedirs(os.path.join(yolo_data_dir, "labels", split), exist_ok=True)

# Function to convert bounding boxes to YOLO format
def convert_bbox_to_yolo(img_w, img_h, bbox):
    x_min, y_min = bbox[0]
    x_max, y_max = bbox[1]

    # Convert to YOLO format
    x_center = ((x_min + x_max) / 2) / img_w
    y_center = ((y_min + y_max) / 2) / img_h
    bbox_width = (x_max - x_min) / img_w
    bbox_height = (y_max - y_min) / img_h

    return f"0 {x_center:.6f} {y_center:.6f} {bbox_width:.6f} {bbox_height:.6f}"

# Process each dataset split
for split in ["train", "val", "test"]:
    img_src_dir = os.path.join(raw_data_dir, split, "img")
    ann_src_dir = os.path.join(raw_data_dir, split, "ann")

    img_dest_dir = os.path.join(yolo_data_dir, "images", split)
    ann_dest_dir = os.path.join(yolo_data_dir, "labels", split)

    image_files = {os.path.splitext(f)[0]: f for f in os.listdir(img_src_dir) if f.endswith((".jpg", ".png", ".jpeg"))}

    for json_file in os.listdir(ann_src_dir):
        if not json_file.endswith(".json"):
            continue

        base_name = json_file.replace(".jpg.json", "").replace(".png.json", "").replace(".jpeg.json", "")

        # Find the corresponding image file
        if base_name not in image_files:
            print(f"Skipping {json_file}: No matching image found")
            continue

        img_name = image_files[base_name]
        img_path = os.path.join(img_src_dir, img_name)
        json_path = os.path.join(ann_src_dir, json_file)

        # Load annotation JSON
        with open(json_path, "r") as f:
            data = json.load(f)

        img_width, img_height = data["size"]["width"], data["size"]["height"]
        yolo_annotations = []

        # Process objects (only rectangles)
        for obj in data["objects"]:
            if obj["geometryType"] == "rectangle":
                bbox = obj["points"]["exterior"]
                yolo_annotations.append(convert_bbox_to_yolo(img_width, img_height, bbox))

        # Save YOLO annotations
        if yolo_annotations:
            yolo_label_path = os.path.join(ann_dest_dir, base_name + ".txt")
            with open(yolo_label_path, "w") as f:
                f.write("\n".join(yolo_annotations))

        # Copy image to YOLO dataset
        shutil.copy(img_path, img_dest_dir)


Let's make sure that we have the same number of images and annotations. This can also be compared to our `02_raw_data_analysis.ipynb`.

In [4]:
data_counts = {"Split": [], "Images": [], "Annotations": []}

# Count images and annotations in each split
for split in ["train", "val", "test"]:
    img_dir = os.path.join(yolo_data_dir, "images", split)
    ann_dir = os.path.join(yolo_data_dir, "labels", split)

    num_images = len([f for f in os.listdir(img_dir) if f.endswith((".jpg", ".png", ".jpeg"))])
    num_annotations = len([f for f in os.listdir(ann_dir) if f.endswith(".txt")])

    data_counts["Split"].append(split)
    data_counts["Images"].append(num_images)
    data_counts["Annotations"].append(num_annotations)

df_counts = pd.DataFrame(data_counts)
df_counts

Unnamed: 0,Split,Images,Annotations
0,train,574,574
1,val,82,82
2,test,61,61


Let's display the new data structure:

In [5]:
for root, dirs, files in os.walk(yolo_data_dir):
    level = root.replace(yolo_data_dir, '').count(os.sep)
    indent = ' ' * 4 * level
    print(f"{indent}{os.path.basename(root)}/")
    sub_indent = ' ' * 4 * (level + 1)
    for f in files[:2]:                                     # Show only first 2 files per folder
        print(f"{sub_indent}{f}")

data_object_detection_yolo/
    data.yaml
    images/
        test/
            IMG0003297.jpg
            IMG0003298.jpg
        train/
            IMG0000019.jpg
            IMG0000025.jpg
        val/
            IMG0003733.jpg
            IMG0003734.jpg
    labels/
        train.cache
        val.cache
        test/
            IMG0003297.txt
            IMG0003298.txt
        train/
            IMG0000019.txt
            IMG0000025.txt
        val/
            IMG0003733.txt
            IMG0003734.txt


In the next step, we need to create the yaml file necessary to use the YOLO model.

In [6]:
# Define the paths for train, val, and test images
data_yaml = {
    "train": "images/train",
    "val": "images/val",
    "test": "images/test",  # Optional
    "nc": 1,  # Number of classes
    "names": ["fractured"]  # Class names
}

# Save the updated YAML file
yaml_path = os.path.join("data_object_detection_yolo", "data.yaml")
with open(yaml_path, "w") as file:
    yaml.dump(data_yaml, file, default_flow_style=False)

print(f"data.yaml created at: {yaml_path}")

data.yaml created at: data_object_detection_yolo\data.yaml


### 1.2. Dataset containing a balanced number of fractures/not fractured images

This time we will also add random not fractured images to the new dataset and create empty annotation files for those, since they do not have any bounding box information. While training a model, we encountered a corrupt image from the "not fractured" folder, therefore, we check for valid images when creating the new dataset directory. 

In [22]:
# Define dataset paths
raw_data_dir = "raw data"
not_fractured_img_dir = os.path.join(raw_data_dir, "not fractured", "img")
not_fractured_ann_dir = os.path.join(raw_data_dir, "not fractured", "ann")
yolo_data_dir = "data_object_detection_incl_not_fractured_yolo"

# Set a fixed seed for reproducibility
RANDOM_SEED = 42
random.seed(RANDOM_SEED)

# Create YOLO folder structure
for split in ["train", "val", "test"]:
    os.makedirs(os.path.join(yolo_data_dir, "images", split), exist_ok=True)
    os.makedirs(os.path.join(yolo_data_dir, "labels", split), exist_ok=True)

# Cache for storing valid/corrupt image results
valid_images_cache = {}
corrupt_images_logged = set()  # Store logged corrupt images to prevent duplicate messages

def is_valid_image(img_path):
    """Check if an image is valid and store the result in a cache."""
    if img_path in valid_images_cache:
        return valid_images_cache[img_path]

    try:
        with Image.open(img_path) as img:
            img.verify()  # Check if image format is correct
        with Image.open(img_path) as img:  
            img.convert("RGB")  # Try to fully load image (catches deeper corruption)
        valid_images_cache[img_path] = True
        return True
    except (IOError, SyntaxError, OSError) as e:
        img_name = os.path.basename(img_path)  # Extract filename only
        if img_name not in corrupt_images_logged:
            print(f"Corrupt image skipped: {img_name} - Error: {e}")
            corrupt_images_logged.add(img_name)  # Mark as logged to avoid duplicates
        valid_images_cache[img_path] = False
        return False

# Track used 'not fractured' images across all splits
used_not_fractured = set()

# Get all 'not fractured' images and filter out corrupted ones
all_not_fractured = [
    f for f in os.listdir(not_fractured_img_dir) 
    if f.endswith((".jpg", ".png", ".jpeg")) and is_valid_image(os.path.join(not_fractured_img_dir, f))
]

# Process each dataset split
for split in ["train", "val", "test"]:
    img_src_dir = os.path.join(raw_data_dir, split, "img")
    ann_src_dir = os.path.join(raw_data_dir, split, "ann")

    img_dest_dir = os.path.join(yolo_data_dir, "images", split)
    ann_dest_dir = os.path.join(yolo_data_dir, "labels", split)

    image_files = {
        os.path.splitext(f)[0]: f 
        for f in os.listdir(img_src_dir) 
        if f.endswith((".jpg", ".png", ".jpeg")) and is_valid_image(os.path.join(img_src_dir, f))
    }
    fractured_count = len(image_files)

    # Process fractured images
    for json_file in os.listdir(ann_src_dir):
        if not json_file.endswith(".json"):
            continue
        
        base_name = json_file.replace(".jpg.json", "").replace(".png.json", "").replace(".jpeg.json", "")

        if base_name not in image_files:
            print(f"Skipping {json_file}: No matching valid image found")
            continue

        img_name = image_files[base_name]
        img_path = os.path.join(img_src_dir, img_name)
        json_path = os.path.join(ann_src_dir, json_file)

        # Load annotation JSON
        with open(json_path, "r") as f:
            data = json.load(f)

        img_width, img_height = data["size"]["width"], data["size"]["height"]
        yolo_annotations = []

        # Process objects (only rectangles)
        for obj in data["objects"]:
            if obj["geometryType"] == "rectangle":
                bbox = obj["points"]["exterior"]
                yolo_annotations.append(
                    f"0 {(bbox[0][0] + bbox[1][0]) / 2 / img_width:.6f} "
                    f"{(bbox[0][1] + bbox[1][1]) / 2 / img_height:.6f} "
                    f"{(bbox[1][0] - bbox[0][0]) / img_width:.6f} "
                    f"{(bbox[1][1] - bbox[0][1]) / img_height:.6f}"
                )

        # Save YOLO annotations
        if yolo_annotations:
            yolo_label_path = os.path.join(ann_dest_dir, base_name + ".txt")
            with open(yolo_label_path, "w") as f:
                f.write("\n".join(yolo_annotations))

        # Copy image to YOLO dataset
        shutil.copy(img_path, img_dest_dir)

    # Process 'not fractured' images
    available_images = list(set(all_not_fractured) - used_not_fractured)

    # Ensure there are enough images to sample
    num_to_sample = min(fractured_count, len(available_images))
    if num_to_sample == 0:
        print(f"Warning: No valid 'not fractured' images available for {split}")
        continue

    selected_images = random.sample(available_images, num_to_sample)

    for img_name in selected_images:
        used_not_fractured.add(img_name)  # Mark image as used

        img_path = os.path.join(not_fractured_img_dir, img_name)

        # Copy valid image
        shutil.copy(img_path, img_dest_dir)

        # Create empty annotation file
        yolo_label_path = os.path.join(ann_dest_dir, os.path.splitext(img_name)[0] + ".txt")
        with open(yolo_label_path, "w") as f:
            pass  # Empty file

print("Dataset conversion completed successfully. Corrupt images were skipped.")


Corrupt image skipped: IMG0004028.jpg - Error: image file is truncated (20 bytes not processed)
Corrupt image skipped: IMG0004029.jpg - Error: image file is truncated (22 bytes not processed)
Corrupt image skipped: IMG0004036.jpg - Error: image file is truncated (14 bytes not processed)
Corrupt image skipped: IMG0004070.jpg - Error: image file is truncated (41 bytes not processed)
Corrupt image skipped: IMG0004073.jpg - Error: image file is truncated (3 bytes not processed)
Corrupt image skipped: IMG0004076.jpg - Error: image file is truncated (0 bytes not processed)
Corrupt image skipped: IMG0004079.jpg - Error: image file is truncated (24 bytes not processed)
Corrupt image skipped: IMG0004084.jpg - Error: image file is truncated (5 bytes not processed)
Corrupt image skipped: IMG0004092.jpg - Error: image file is truncated (3 bytes not processed)
Corrupt image skipped: IMG0004098.jpg - Error: image file is truncated (17 bytes not processed)
Corrupt image skipped: IMG0004100.jpg - Erro

Let's make sure that we have the same number of images and annotations. This can also be compared to our `02_raw_data_analysis.ipynb`, and should now contain twice the previous files. 

In [23]:
data_counts = {"Split": [], "Images": [], "Annotations": []}

# Count images and annotations in each split
for split in ["train", "val", "test"]:
    img_dir = os.path.join(yolo_data_dir, "images", split)
    ann_dir = os.path.join(yolo_data_dir, "labels", split)

    num_images = len([f for f in os.listdir(img_dir) if f.endswith((".jpg", ".png", ".jpeg"))])
    num_annotations = len([f for f in os.listdir(ann_dir) if f.endswith(".txt")])

    data_counts["Split"].append(split)
    data_counts["Images"].append(num_images)
    data_counts["Annotations"].append(num_annotations)

df_counts = pd.DataFrame(data_counts)
df_counts

Unnamed: 0,Split,Images,Annotations
0,train,1148,1148
1,val,164,164
2,test,122,122


As before, we again create the yaml file necessary to use the YOLO model.

In [24]:
# Define the paths for train, val, and test images
data_yaml = {
    "train": "images/train",
    "val": "images/val",
    "test": "images/test",  # Optional
    "nc": 1,  # Number of classes
    "names": ["fractured"]  # Class names
}

# Save the updated YAML file
yaml_path = os.path.join(yolo_data_dir, "data.yaml")
with open(yaml_path, "w") as file:
    yaml.dump(data_yaml, file, default_flow_style=False)

print(f"data.yaml created at: {yaml_path}")

data.yaml created at: data_object_detection_incl_not_fractured_yolo\data.yaml


## 2. Data preparation for image classification

In [None]:
# Define dataset paths
raw_data_dir = "raw data"
not_fractured_img_dir = os.path.join(raw_data_dir, "not fractured", "img")
classification_data_dir = "data_image_classification"

# Set a fixed seed for reproducibility
RANDOM_SEED = 42
random.seed(RANDOM_SEED)

# Create classification folder structure
for split in ["train", "val", "test"]:
    os.makedirs(os.path.join(classification_data_dir, split, "fractured"), exist_ok=True)
    os.makedirs(os.path.join(classification_data_dir, split, "not_fractured"), exist_ok=True)

# Cache for storing valid/corrupt image results
valid_images_cache = {}
corrupt_images_logged = set()

def is_valid_image(img_path):
    """Check if an image is valid and store the result in a cache."""
    if img_path in valid_images_cache:
        return valid_images_cache[img_path]
    try:
        with Image.open(img_path) as img:
            img.verify()
        with Image.open(img_path) as img:
            img.convert("RGB")
        valid_images_cache[img_path] = True
        return True
    except (IOError, SyntaxError, OSError) as e:
        img_name = os.path.basename(img_path)
        if img_name not in corrupt_images_logged:
            print(f"Corrupt image skipped: {img_name} - Error: {e}")
            corrupt_images_logged.add(img_name)
        valid_images_cache[img_path] = False
        return False

# Track used 'not fractured' images across all splits
used_not_fractured = set()

# Get all 'not fractured' images and filter out corrupted ones
all_not_fractured = [
    f for f in os.listdir(not_fractured_img_dir)
    if f.endswith((".jpg", ".png", ".jpeg")) and is_valid_image(os.path.join(not_fractured_img_dir, f))
]

# Process each dataset split
for split in ["train", "val", "test"]:
    img_src_dir = os.path.join(raw_data_dir, split, "img")
    img_dest_dir_fractured = os.path.join(classification_data_dir, split, "fractured")
    img_dest_dir_not_fractured = os.path.join(classification_data_dir, split, "not_fractured")

    # Get valid fractured images
    image_files = [
        f for f in os.listdir(img_src_dir)
        if f.endswith((".jpg", ".png", ".jpeg")) and is_valid_image(os.path.join(img_src_dir, f))
    ]
    fractured_count = len(image_files)

    # Copy fractured images
    for img_name in image_files:
        img_path = os.path.join(img_src_dir, img_name)
        shutil.copy(img_path, img_dest_dir_fractured)

    # Process 'not fractured' images
    available_images = list(set(all_not_fractured) - used_not_fractured)
    num_to_sample = min(fractured_count, len(available_images))
    if num_to_sample == 0:
        print(f"Warning: No valid 'not fractured' images available for {split}")
        continue

    selected_images = random.sample(available_images, num_to_sample)

    for img_name in selected_images:
        used_not_fractured.add(img_name)
        img_path = os.path.join(not_fractured_img_dir, img_name)
        shutil.copy(img_path, img_dest_dir_not_fractured)

print("Dataset conversion for image classification completed successfully. Corrupt images were skipped.")


Corrupt image skipped: IMG0004028.jpg - Error: image file is truncated (20 bytes not processed)
Corrupt image skipped: IMG0004029.jpg - Error: image file is truncated (22 bytes not processed)
Corrupt image skipped: IMG0004036.jpg - Error: image file is truncated (14 bytes not processed)
Corrupt image skipped: IMG0004070.jpg - Error: image file is truncated (41 bytes not processed)
Corrupt image skipped: IMG0004073.jpg - Error: image file is truncated (3 bytes not processed)
Corrupt image skipped: IMG0004076.jpg - Error: image file is truncated (0 bytes not processed)
Corrupt image skipped: IMG0004079.jpg - Error: image file is truncated (24 bytes not processed)
Corrupt image skipped: IMG0004084.jpg - Error: image file is truncated (5 bytes not processed)
Corrupt image skipped: IMG0004092.jpg - Error: image file is truncated (3 bytes not processed)
Corrupt image skipped: IMG0004098.jpg - Error: image file is truncated (17 bytes not processed)
Corrupt image skipped: IMG0004100.jpg - Erro

In [4]:
# Control: Count images in each split
data_counts = {"Split": [], "Fractured": [], "Not Fractured": []}

for split in ["train", "val", "test"]:
    fractured_dir = os.path.join(classification_data_dir, split, "fractured")
    not_fractured_dir = os.path.join(classification_data_dir, split, "not_fractured")

    num_fractured = len([f for f in os.listdir(fractured_dir) if f.endswith((".jpg", ".png", ".jpeg"))])
    num_not_fractured = len([f for f in os.listdir(not_fractured_dir) if f.endswith((".jpg", ".png", ".jpeg"))])

    data_counts["Split"].append(split)
    data_counts["Fractured"].append(num_fractured)
    data_counts["Not Fractured"].append(num_not_fractured)

# Convert to DataFrame
df_counts = pd.DataFrame(data_counts)
df_counts

Unnamed: 0,Split,Fractured,Not Fractured
0,train,574,574
1,val,82,82
2,test,61,61
