In [None]:
from pathlib import Path
from matplotlib import pyplot as plt
from scipy.spatial.distance import euclidean
from glob import glob
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import seaborn as sns
import shutil
import cv2 as cv

In [None]:
def load_labels(labels):
    with open(labels) as f:
        labels = f.read()
    labels = labels.strip().split("\n")
    labels = [line.strip().split() for line in labels]
    segments = []
    for line in labels:
        class_ = int(line[0])
        contour = np.array([[float(line[i]), float(line[i + 1])] for i in range(1, len(line), 2)])
        segments.append([class_, contour])
    return segments

def normal_to_image(image_shape, contour):
    h, w, _ = image_shape
    return (contour * (w, h)).astype(int)

def draw_labels(image, labels):
    if type(image) == str:
        image = cv.imread(image)
    if type(labels) == str:
        labels = load_labels(labels)
        for segment in labels:
            segment[1] = normal_to_image(image.shape, segment[1])
    for class_, contour in labels:
        image = cv.polylines(image, [contour], True, (255, 0, 0))
        image = cv.putText(image, str(class_), contour[0], cv.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0))
    return image

In [None]:
# Credit: ChatGPT
def perpendicular_distance(point, start, end):
    """Calculate the perpendicular distance from a point to a line segment."""
    if np.all(start == end):
        return euclidean(point, start)
    
    line_vec = end - start
    point_vec = point - start
    line_len = np.dot(line_vec, line_vec)
    t = max(0, min(1, np.dot(point_vec, line_vec) / line_len))
    projection = start + t * line_vec
    return euclidean(point, projection)

# Credit: ChatGPT
def rdp(contour, epsilon):
    """
    Applies the Ramer-Douglas-Peucker (RDP) algorithm to simplify a contour.
    
    Parameters:
        contour (np.ndarray): A Nx2 array of (x, y) coordinates.
        epsilon (float): Tolerance for point reduction (higher -> more aggressive).
    
    Returns:
        np.ndarray: Simplified contour as a Nx2 array.
    """
    if len(contour) < 3:
        return contour

    # Find the point with the maximum perpendicular distance
    start, end = contour[0], contour[-1]
    distances = np.array([perpendicular_distance(p, start, end) for p in contour[1:-1]])
    
    max_idx = np.argmax(distances)
    max_dist = distances[max_idx]
    
    if max_dist > epsilon:
        max_idx += 1  # Offset for skipping the first point
        # Recursive RDP on both segments
        left = rdp(contour[:max_idx+1], epsilon)
        right = rdp(contour[max_idx:], epsilon)
        return np.vstack((left[:-1], right))
    else:
        return np.array([start, end])

In [None]:
def image_names(path):
    files = glob(str(path / "images" / "*.jpg"))
    return [Path(f).stem for f in files]

def image_path(path, image, check=False):
    path = path / "images" / f"{image}.jpg"
    if check and not path.exists():
        return None
    else:
        path.parent.mkdir(parents=True, exist_ok=True)
        return path

def labels_path(path, image, check=False):
    path = path / "labels" / f"{image}.txt"
    if check and not path.exists():
        return None
    else:
        path.parent.mkdir(parents=True, exist_ok=True)
        return path

def filter_image(epsilon, min_, max_, src, dst, image):
    if (labels := labels_path(src, image, check=True)) is None:
        return
    labels = load_labels(labels)
    segments = []
    for class_, contour in labels:
        simple = rdp(contour, epsilon)
        if min_ <= simple.shape[0] and simple.shape[0] <= max_:
            segments.append([class_] + contour.flatten().tolist())
    if len(segments) > 0:
        shutil.copy(image_path(src, image), image_path(dst, image))
        with open(labels_path(dst, image), "w") as f:
            for segment in segments:
                segment = [str(x) for x in segment]
                f.write(" ".join(segment) + "\n")

def equal_slices(n, xs):
    if len(xs) < n:
        raise ValueError()
    slice_len = len(xs) // n
    remainder = len(xs) % n
    slices = [slice(slice_len * i, slice_len * (i + 1)) for i in range(n)]
    if remainder > 0:
        slices[-1] = slice(slice_len * (n - 1), slice_len * n + remainder)
    return slices

In [None]:
dataset_path = Path("datasets/coco2017/train")
filtered_path = Path("datasets/coco2017-filtered/train")
contour_epsilon = 0.05
contour_min = 4
contour_max = 7
chunks = 40

def filter_chunk(images):
    for image in images:
        filter_image(contour_epsilon, contour_min, contour_max, dataset_path, filtered_path, image)

if filtered_path.exists():
    shutil.rmtree(filtered_path)
with ThreadPoolExecutor(max_workers=chunks) as executor:
    images = image_names(dataset_path)
    slices = equal_slices(chunks, images)
    futures = [executor.submit(filter_chunk, images[s]) for s in slices]
    for future in futures:
        future.result()