In [None]:
from typing import List

from PIL import Image 
from datasets import load_dataset
import os 
from PIL import Image, ImageChops, ImageFilter
from tqdm.auto import tqdm 
from typing import Tuple
import numpy as np 
from segment.utils import convert_coco_polygons_to_mask

In [None]:
repo_id = "jordandavis/fashion_people_detections"
workers=os.cpu_count()
PROJECT_ROOT = os.path.join(os.path.dirname(os.getcwd()),"SEGMENT")
CACHE_DIR = os.path.join(PROJECT_ROOT, 'hf_cache')

ds = load_dataset(
        repo_id, split="train", trust_remote_code=True,  cache_dir=CACHE_DIR, streaming=False
    )

rows = iter(ds)
row = next(rows)

In [None]:
def combine_masks(mask_images: List[dict]) -> Image.Image:
    if not mask_images:
        return None  # or return a default mask, depending on your requirements
    
    if len(mask_images) == 1:
        return mask_images[0]
    
    # Initialize the combined mask with the first mask
    combined_mask = mask_images[0]

    # Combine the remaining masks with the existing combined_mask using a bitwise OR operation to ensure non-overlap
    for mask in mask_images[1:]:
        combined_mask = ImageChops.lighter(combined_mask, mask)

    return combined_mask

In [None]:
# Define colors for each class (in RGB format)
class_colors = {
    'background': (0, 0, 0),     # Black
    'hair':   (255, 0, 0),       # Red
    'face':   (0, 255, 0),       # Green
    'neck':   (0, 0, 255),       # Blue
    'arm':    (255, 255, 0),     # Yellow
    'hand':   (255, 0, 255),     # Magenta
    'back':   (0, 255, 255),     # Cyan
    'leg':    (255, 128, 0),     # Orange
    'foot':   (128, 0, 128),     # Purple
    'outfit': (75,0,130),       # Dark Purple
    'person': (128, 128, 128),   # Gray
    'phone':  (255, 192, 203)    # Pink
}

def get_color(class_name):
    """
    Get the color for a given class ID or name.
    
    :param class_id_or_name: int or str, the class ID or name
    :return: tuple, the RGB color for the class
    """
    color = class_colors.get(class_name)
    if color is None:
        print(f"Warning: No color defined for class '{class_name}'. Using default color.")
        color = (200, 200, 200)  # Default color (light gray) for undefined classes
    
    return color

def colorize_mask(mask: np.ndarray, color: Tuple) -> Image.Image:
    mask = mask.astype(np.uint8) * 255

    # Apply the color to the mask
    colored_mask = np.zeros((mask.shape[0], mask.shape[1], 3), dtype=np.uint8)

    colored_mask[mask > 0] = color
    return Image.fromarray(colored_mask)

def colorize_and_combine(md, class_specific_color=True):
    masks = []
    for i in md:
        label = i.get('label')
        if label == 'person':
            continue
        polygons = i.get('polygons')
        mask = convert_coco_polygons_to_mask(polygons, 1024, 1024)

        if class_specific_color:
            # The same color for each class
            color = get_color(label)
        else:
            # Generate a random color
            color = np.random.randint(0, 256, size=3)
        color_mask = colorize_mask(mask, color)
        masks.append(color_mask)

    return combine_masks(masks)

In [None]:
mask_dir = '/home/ubuntu/SPAICE/SEGMENT/datasets/fashion_people_detection/masks'
os.makedirs(mask_dir, exist_ok=True)

def save_color_mask(row):
    md = row.get('mask_metadata')
    
    image_id = row.get('image_id').split('.')[0]
    image_id = f"{image_id}.png"

    color_mask = colorize_and_combine(md)
    if color_mask:
        save_path = os.path.join(mask_dir, image_id)
        color_mask.save(save_path)

In [None]:
#  Get the total number of rows
total_rows = len(ds)

# Create a progress bar
progress_bar = tqdm(total=total_rows, desc="Processing rows")

def update_progress(row):
    progress_bar.update(1)
    return save_color_mask(row)

num_proc = os.cpu_count() - 1

# Apply the save_color_mask function to all rows with progress monitoring
processed_dataset = ds.map(update_progress, num_proc=num_proc)

# Close the progress bar
progress_bar.close()

print(f"Processed {len(processed_dataset)} rows")

In [None]:
image_dir = '/home/ubuntu/SPAICE/SEGMENT/datasets/fashion_people_detection/images'


train_mask_dir = os.path.join(mask_dir, 'train')
val_mask_dir = os.path.join(mask_dir, 'val')
os.makedirs(train_mask_dir,exist_ok=True)
os.makedirs(val_mask_dir,exist_ok=True)

for dir_type in ['train', 'val']:

    dir = os.path.join(image_dir,dir_type)

    for file in tqdm(os.listdir(dir), desc=f'Moving Masks To {dir_type} dir'):
        mask_name = file.split('.')[0] + '.png'
        source_path = os.path.join(mask_dir, mask_name)
        if not os.path.exists(source_path):
            print(f"Mask Not Found: {mask_name}")
            continue
        
        mask_dest_dir = os.path.join(mask_dir,dir_type)
        dest_path = os.path.join(mask_dest_dir, mask_name)
        os.rename(source_path, dest_path)


In [None]:
row.get('mask_metadata')[0].get('box')

In [None]:
def get_lines(md, image_width, image_height, bbox=True, polygons=False):
    """
    Generate YOLO format lines for bounding boxes, polygons, or both.
    
    :param md: Input data containing object information
    :param image_width: Width of the image
    :param image_height: Height of the image
    :param bbox: Boolean, whether to include bounding boxes (default True)
    :param polygons: Boolean, whether to include polygons (default True)
    :return: List of YOLO format lines
    """
    lines = []
    label_scores = {}  # Dictionary to keep track of the best scores per label
    
    for row in md:
        label = row.get("label")
        
        # don't include person since it overlaps with other masks
        if label != 'person':
            label_id = row.get("label_id")
            score = row.get("score", 0)  # Default score
            
            # Process bounding box if required
            if bbox:
                bbox_coords = row.get("box")
                if bbox_coords:
                    
                    # Normalize bbox coordinates
                    x_center = (bbox_coords[0] + bbox_coords[2] / 2) / image_width
                    y_center = (bbox_coords[1] + bbox_coords[3] / 2) / image_height
                    width = bbox_coords[2] / image_width
                    height = bbox_coords[3] / image_height
                    
                    bbox_line = f"{label_id} {x_center:.6f} {y_center:.6f} {width:.6f} {height:.6f}"
                    lines.append(bbox_line)
            
            # Process polygons if required
            if polygons:
                coco_polygons = row.get("polygons")
                if coco_polygons:
                    # Normalize and reshape polygon coordinates
                    if len(coco_polygons) > 1:
                        yolo_polygons = merge_multi_segment(coco_polygons)
                        yolo_polygons = (
                            (
                                np.concatenate(yolo_polygons, axis=0)
                                / np.array([image_width, image_height])
                            )
                            .reshape(-1)
                            .tolist()
                        )
                    else:
                        yolo_polygons = [j for i in coco_polygons for j in i]
                        yolo_polygons = (
                            (
                                np.array(yolo_polygons).reshape(-1, 2)
                                / np.array([image_width, image_height])
                            )
                            .reshape(-1)
                            .tolist()
                        )
                    
                    yolo_polygons_str = " ".join([f"{coord:.6f}" for coord in yolo_polygons])
                    polygon_line = f"{label_id} {yolo_polygons_str}"
                    lines.append(polygon_line)
    
    return lines

In [None]:
from segment.utils import resize_image_pil
labels_dir = '/home/ubuntu/SPAICE/SEGMENT/datasets/fashion_people_detection/labels'
os.makedirs(labels_dir, exist_ok=True)


def save_labels(row):
    md = row.get('mask_metadata')
    lines = get_lines(md, 1024, 1024, bbox=True, polygons=False)

    
    image_id = row.get('image_id').split('.')[0]
    image_id = f"{image_id}.txt"

    if lines:
        save_path = os.path.join(labels_dir, image_id)
        with open(save_path, 'w') as f: 
            f.write("\n".join(lines))


In [None]:

#  Get the total number of rows
total_rows = len(ds)

# Create a progress bar
progress_bar = tqdm(total=total_rows, desc="Processing rows")

def update_progress(row):
    progress_bar.update(1)
    return save_labels(row)

num_proc = os.cpu_count() - 1

# Apply the save_color_mask function to all rows with progress monitoring
processed_dataset = ds.map(update_progress, num_proc=num_proc)

# Close the progress bar
progress_bar.close()

print(f"Processed {len(processed_dataset)} rows")


In [None]:
image_dir = '/home/ubuntu/SPAICE/SEGMENT/datasets/fashion_people_detection/images'


train_mask_dir = os.path.join(labels_dir, 'train')
val_mask_dir = os.path.join(labels_dir, 'val')
os.makedirs(train_mask_dir,exist_ok=True)
os.makedirs(val_mask_dir,exist_ok=True)

for dir_type in ['train', 'val']:

    dir = os.path.join(image_dir,dir_type)

    for file in tqdm(os.listdir(dir), desc=f'Moving Labels To {dir_type} dir'):
        mask_name = file.split('.')[0] + '.txt'
        source_path = os.path.join(labels_dir, mask_name)
        if not os.path.exists(source_path):
            print(f"Mask Not Found: {mask_name}")
            continue
        
        mask_dest_dir = os.path.join(labels_dir, dir_type)
        dest_path = os.path.join(mask_dest_dir, mask_name)
        os.rename(source_path, dest_path)


In [None]:
image_name = '294325360b4b1ac6c000911abe6c9340.jpg'
im_path = os.path.join(image_dir, 'train',image_name)
os.path.exists(im_path)

In [None]:
Image.open(im_path)