In [1]:
import os
import os.path as osp
import numpy as np
from mmcv import imread
import mmengine
from tqdm.auto import tqdm


from pycocotools import mask as maskutils

def binary_mask_to_rle_np(binary_mask):
     # Create a copy of the original mask
    thickened_mask = np.copy(binary_mask)

    # Use numpy slicing to mark pixels above and below the current mask
    # thickened_mask[:-1, :] |= binary_mask[1:, :]  # Mark the pixel above
    # thickened_mask[1:, :] |= binary_mask[:-1, :]  # Mark the pixel below
    binary_mask = np.asfortranarray(thickened_mask.astype(np.uint8))
    rle = {"counts": [], "size": list(binary_mask.shape)}
    area = np.sum(binary_mask)
    flattened_mask = binary_mask.ravel(order="F")
    diff_arr = np.diff(flattened_mask)
    nonzero_indices = np.where(diff_arr != 0)[0] + 1
    lengths = np.diff(np.concatenate(([0], nonzero_indices, [len(flattened_mask)])))

    # note that the odd counts are always the numbers of zeros
    if flattened_mask[0] == 1:
        lengths = np.concatenate(([0], lengths))

    rle["counts"] = lengths.tolist()

    return rle, area

def binary_mask_to_rle(binary_lead_mask):
    """
    Converts a mask to COCO's RLE format and calculates the area.

    Parameters:
    - lead_mask: 2D numpy array of the mask.
    - threshold: The threshold to binarize the mask.

    Returns:
    - rle: Dictionary representing the run-length encoding in COCO format.
    - area: Integer representing the area of the mask.
    """


    # Step 2: Encode the binary mask using maskutils
    rle = maskutils.encode(np.asfortranarray(binary_lead_mask.astype(np.uint8)))
    rle['counts'] = rle['counts'].decode('utf-8')
    # Step 3: Calculate the area
    area = maskutils.area(rle)

    return rle, area


def convert_ecg_to_coco(data_path, bbox_dir, mask_dir, out_file):
    # bbox_dir = osp.join(data_path, 'lead_bounding_box')
    # mask_dir = osp.join(data_path, 'masks')  # Path to mask directory
    annotations = []
    images = []
    obj_count = 0
    for entry in tqdm(os.listdir(data_path), desc='Processing file'):
        if entry.endswith(".png"):
            img_name = entry[:-4]
            bbox_path = osp.join(bbox_dir, f'{img_name}.txt')
            mask_path = osp.join(mask_dir, f'{img_name}.png')  
            
            with open(bbox_path, 'r') as file:
                lines = file.readlines()
                bbox_coords = []
                for i in range(13):
                    bbox_coords.append([int(float(coord)) for coord in lines[i].strip().split(',')[:4]])
            
            mask = imread(mask_path, flag='unchanged')
            
            if mask is None:
                print(f"Failed to read mask for {mask_path}")
                continue

            images.append(dict(
                id=int(entry[:5]),
                file_name=entry,
                height=1700,
                width=2200))
            
            for i in range(13):
                x_min, y_min, x_max, y_max = bbox_coords[i]
                y_min, y_max = 1700 - y_max, 1700 - y_min
                assert y_min <= y_max and x_min <= x_max, "Coordinates are invalid!"
                
                # Extract the sub-region from the mask
                sub_region = mask[y_min:y_max, x_min:x_max]

                # Apply threshold to create a binary mask
                binary_mask = sub_region > 0
                # plt.imshow(binary_mask)

                # Prepare a full-size binary mask for visualization
                full_binary_mask = np.zeros_like(mask, dtype=bool)
                full_binary_mask[y_min:y_max, x_min:x_max] = binary_mask
                
                
                rle, area = binary_mask_to_rle_np(full_binary_mask)

                
                data_anno = dict(
                    image_id=int(entry[:5]),
                    id=obj_count,
                    category_id=0,
                    bbox=[x_min, y_min, x_max - x_min, y_max-y_min],
                    area=area,
                    iscrowd=0,
                    segmentation=rle
                )
                
                annotations.append(data_anno)
                obj_count += 1

                # if visualize:
                    # Visualize the mask and polygon
                    # visualize_masks(lead_mask, x_coords, y_coords, poly, f'{entry[:5]}_{i}')

    coco_format_json = dict(
        images=images,
        annotations=annotations,
        categories=[{'id': 0, 'name': 'ecg_lead'}])
    mmengine.dump(coco_format_json, out_file)




  from .autonotebook import tqdm as notebook_tqdm


In [2]:

convert_ecg_to_coco(
    '/scratch/hshang/moody/official-phase-mins-eth/TeamCode/tests/resources/training_data/train',
    '/scratch/hshang/moody/official-phase-mins-eth/TeamCode/tests/resources/training_data/train/bboxes',
    '/scratch/hshang/moody/official-phase-mins-eth/TeamCode/tests/resources/training_data/train/masks',
    '/scratch/hshang/moody/official-phase-mins-eth/TeamCode/tests/resources/training_data/train/annotation_coco.json')
convert_ecg_to_coco(
    '/scratch/hshang/moody/official-phase-mins-eth/TeamCode/tests/resources/training_data/val',
    '/scratch/hshang/moody/official-phase-mins-eth/TeamCode/tests/resources/training_data/val/bboxes',
    '/scratch/hshang/moody/official-phase-mins-eth/TeamCode/tests/resources/training_data/val/masks',
    '/scratch/hshang/moody/official-phase-mins-eth/TeamCode/tests/resources/training_data/val/annotation_coco.json')

Processing file: 100%|██████████| 17/17 [00:00<00:00, 20.27it/s]
Processing file: 100%|██████████| 11/11 [00:00<00:00, 25.90it/s]


In [3]:
import os
import numpy as np
import imageio
from tqdm import tqdm

def crop_and_save_images(image_path, mask_path, bbox_path, img_save_path, mask_save_path):
    """
    Crops images and corresponding masks based on bounding boxes and saves them to specified directories.

    Parameters:
    - image_path: Path to the directory containing images.
    - mask_path: Path to the directory containing masks.
    - bbox_path: Path to the directory containing bounding boxes.
    - img_save_path: Path to the directory where cropped images will be saved.
    - mask_save_path: Path to the directory where cropped masks will be saved.
    """

    # Ensure save directories exist
    os.makedirs(img_save_path, exist_ok=True)
    os.makedirs(mask_save_path, exist_ok=True)

    for filename in tqdm(os.listdir(image_path)):
        if filename.endswith(".png"):
            # Load image and mask
            image = imageio.imread(os.path.join(image_path, filename))
            mask = imageio.imread(os.path.join(mask_path, filename))
            
            # Load bounding boxes
            bbox = np.loadtxt(os.path.join(bbox_path, filename.replace(".png", ".txt")), delimiter=',')
            
            # Ensure bbox is 2D array even if only one bounding box is present
            if bbox.ndim == 1:
                bbox = np.expand_dims(bbox, axis=0)
            
            for i in range(len(bbox)):
                # Read bounding box coordinates
                x1, y1, x2, y2 = bbox[i][:4]
                
                # Adjust for image dimensions
                x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
                
                y1 = image.shape[0] - y1
                y2 = image.shape[0] - y2
                
                # Crop the image and mask
                cropped_image = image[y2:y1, x1:x2, :3]
                cropped_mask = mask[y2:y1, x1:x2]
                
                # Save the cropped image and mask
                base_filename = os.path.splitext(filename)[0]
                imageio.imwrite(os.path.join(img_save_path, f"{base_filename}_{i}.png"), cropped_image)
                imageio.imwrite(os.path.join(mask_save_path, f"{base_filename}_{i}.png"), cropped_mask)
                
        else:
            continue



In [5]:
# Example usage:
image_path = "/scratch/hshang/moody/official-phase-mins-eth/TeamCode/tests/resources/training_data/train"
mask_path = "/scratch/hshang/moody/official-phase-mins-eth/TeamCode/tests/resources/training_data/train/masks"
bbox_path = "/scratch/hshang/moody/official-phase-mins-eth/TeamCode/tests/resources/training_data/train/bboxes"
img_save_path = "/scratch/hshang/moody/official-phase-mins-eth/TeamCode/tests/resources/training_data/cropped_img"
mask_save_path = "/scratch/hshang/moody/official-phase-mins-eth/TeamCode/tests/resources/training_data/cropped_masks"

crop_and_save_images(image_path, mask_path, bbox_path, img_save_path, mask_save_path)


  image = imageio.imread(os.path.join(image_path, filename))
  mask = imageio.imread(os.path.join(mask_path, filename))
100%|██████████| 18/18 [00:01<00:00, 16.20it/s]
