In [1]:
import os
import csv
import json
import shutil
import random
from collections import defaultdict

In [2]:
# Paths to original DeepFashion2 data
TRAIN_IMG_DIR = "../data/original/train/image" 
TRAIN_ANNO_DIR = "../data/original/train/annos" 
VAL_IMG_DIR = "../data/original/validation/image" 
VAL_ANNO_DIR = "../data/original/validation/annos"

# Paths to subsets
TRAIN_CSV = "../data/subset/train/classification_metadata.csv" 
VAL_CSV = "../data/subset/val/classification_metadata.csv"

# Paths to new test set
TEST_IMG_DIR = "../data/subset/test/images" 
TEST_ANNO_DIR = "../data/subset/test/annotations" 
TEST_CSV = "../data/subset/test/classification_metadata.csv"

CLASS_SAMPLE_COUNT = 100

category_map = {
    1:  "short_sleeve_top",
    2:  "long_sleeve_top",
    3:  "short_sleeve_outwear",
    4:  "long_sleeve_outwear",
    5:  "vest",
    6:  "sling",
    7:  "shorts",
    8:  "trousers",
    9:  "skirt",
    10: "short_sleeve_dress",
    11: "long_sleeve_dress",
    12: "vest_dress",
    13: "sling_dress"
}

random.seed(42) 

os.makedirs(TEST_IMG_DIR, exist_ok=True) 
os.makedirs(TEST_ANNO_DIR, exist_ok=True)

In [3]:
def load_used_images(csv_path):
    used = set()
    with open(csv_path, 'r', newline='') as f:
        reader = csv.DictReader(f)
        for row in reader:
            used.add(row["image_filename"])
    return used

used_train = load_used_images(TRAIN_CSV)
used_val   = load_used_images(VAL_CSV)
used_images = used_train.union(used_val)

In [4]:
def find_dominant_item(annotation_data):
    # Returns the category_id of the item with the largest bounding box area.
    # Returns None if no items are found.
    largest_area = 0
    dominant_cat_id = None
    i = 1
    while True:
        item_key = f"item{i}"
        if item_key not in annotation_data:
            break
        item = annotation_data[item_key]
        cat_id = item["category_id"]
        x1, y1, x2, y2 = item["bounding_box"]
        area = (x2 - x1) * (y2 - y1)
        if area > largest_area:
            largest_area = area
            dominant_cat_id = cat_id
        i += 1
    return dominant_cat_id

def parse_dataset(img_dir, anno_dir):
    # Only the 'dominant' item (largest bbox) is used to assign category_id.
    category_index = defaultdict(list)
    all_anno_files = [f for f in os.listdir(anno_dir) if f.endswith(".json")]
    for anno_file in all_anno_files:
        image_id = os.path.splitext(anno_file)[0]
        img_file = image_id + ".jpg"
        anno_path = os.path.join(anno_dir, anno_file)
        img_path  = os.path.join(img_dir, img_file)

        # Skip if the image doesn't exist
        if not os.path.isfile(img_path):
            continue

        with open(anno_path, 'r') as f:
            data = json.load(f)

        cat_id = find_dominant_item(data)
        if cat_id is not None:
            category_index[cat_id].append((img_file, anno_file))
    return category_index

train_index = parse_dataset(TRAIN_IMG_DIR, TRAIN_ANNO_DIR)
val_index   = parse_dataset(VAL_IMG_DIR, VAL_ANNO_DIR)

In [5]:
def filter_remaining(full_index, used_imgs):
    # Remove any pairs whose img_file is in used_imgs
    # Return the leftover as {cat_id -> [(img_file, anno_file), ...]}
    remaining = {}
    for cat_id, items in full_index.items():
        leftover = []
        for (img_file, anno_file) in items:
            if img_file not in used_imgs:
                leftover.append((img_file, anno_file))
        if leftover:
            remaining[cat_id] = leftover
    return remaining

leftover_train = filter_remaining(train_index, used_images)
leftover_val   = filter_remaining(val_index, used_images)

In [6]:
grouped_leftover = defaultdict(list)
for cat_id, items in leftover_train.items():
    for (img_file, anno_file) in items:
        grouped_leftover[cat_id].append((cat_id, img_file, anno_file))
for cat_id, items in leftover_val.items():
    for (img_file, anno_file) in items:
        grouped_leftover[cat_id].append((cat_id, img_file, anno_file))

# For each category randomly select up to CLASS_SAMPLE_COUNT images
sampled_items = []
for cat_id, items in grouped_leftover.items():
    random.shuffle(items)
    count = min(CLASS_SAMPLE_COUNT, len(items))
    sampled_items.extend(items[:count])

random.shuffle(sampled_items)

In [7]:
def copy_test_items(sampled_list, test_csv_path):
    """
    Copy each item into TEST_IMG_DIR/TEST_ANNO_DIR,
    writing category_id and category_name in the CSV.
    """
    with open(test_csv_path, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["image_filename", "annotation_filename", "category_id", "category_name"])

        for (cat_id, img_file, anno_file) in sampled_list:
            cat_name = category_map.get(cat_id, f"cat_{cat_id}")
            # Determine source directory (train or val)
            if os.path.isfile(os.path.join(TRAIN_IMG_DIR, img_file)):
                src_img_dir = TRAIN_IMG_DIR
                src_anno_dir = TRAIN_ANNO_DIR
            else:
                src_img_dir = VAL_IMG_DIR
                src_anno_dir = VAL_ANNO_DIR

            src_img_path  = os.path.join(src_img_dir,  img_file)
            src_anno_path = os.path.join(src_anno_dir, anno_file)
            dst_img_path  = os.path.join(TEST_IMG_DIR,  img_file)
            dst_anno_path = os.path.join(TEST_ANNO_DIR, anno_file)

            # Copy files
            shutil.copyfile(src_img_path, dst_img_path)
            shutil.copyfile(src_anno_path, dst_anno_path)

            # Write CSV row
            writer.writerow([img_file, anno_file, cat_id, cat_name])

copy_test_items(sampled_items, TEST_CSV)

print(f"Done creating test subset with up to {CLASS_SAMPLE_COUNT} images!")
print(f"Test CSV: {TEST_CSV}")

Done creating test subset with up to 100 images!
Test CSV: ../data/subset/test/classification_metadata.csv
