In [None]:
import os
import json

def load_data(*file_paths):
    """Load and merge data from multiple JSON files."""
    merged_data = {'annotations': [], 'images': [], 'categories': []}
    for path in file_paths:
        with open(path, 'r') as file:
            data = json.load(file)
            merged_data['annotations'].extend(data['annotations'])
            merged_data['images'].extend(data['images'])
            # Assuming categories are the same across files or only needed from the first file
            if 'categories' in data and not merged_data['categories']:
                merged_data['categories'] = data['categories']
    return merged_data

def remove_duplicate_annotations(data):
    """Remove duplicate annotations from the data based on image_id."""
    unique_image_ids = set()
    new_annotations = []
    for annotation in data['annotations']:
        if annotation['image_id'] not in unique_image_ids:
            unique_image_ids.add(annotation['image_id'])
            new_annotations.append(annotation)
    data['annotations'] = new_annotations
    return data

# Paths to your JSON files
json_path_1 = "data/ap-10k/annotations/ap10k-train-split1.json"
json_path_2 = "data/ap-10k/annotations/ap10k-test-split1.json"
json_path_3 = "data/ap-10k/annotations/ap10k-val-split1.json"

# Load and merge data
data = load_data(json_path_1, json_path_2, json_path_3)

# Remove duplicate annotations
data = remove_duplicate_annotations(data)

print(len(data['annotations']), "unique annotations after cleanup.")

annotations = data["annotations"]
images = data["images"]
categories = data["categories"]

# Create dictionaries to index images by image_id and categories by category_id
images_dict = {image["id"]: image for image in images}
categories_dict = {cat["id"]: cat for cat in categories}

# Set the base directory
base_dir = "data/ap-10k/ImageAnnotation"

# Function to pad the file name
def pad_filename(filename):
    return filename.zfill(17)

new_annotations = []

# Iterate over annotations to merge data and directly create and rename files
for annotation in annotations:
    image_id = annotation["image_id"]
    category_id = annotation.get("category_id")

    # Ensure both image_id and category_id exist in their respective dictionaries
    if image_id in images_dict and category_id in categories_dict:
        # Merge data from annotations, images, and categories
        new_annotation = {**annotation, **images_dict[image_id],
                          "name": categories_dict[category_id]["name"],
                          "supercategory": categories_dict[category_id]["supercategory"]}
        new_annotations.append(new_annotation)

        supercategory = new_annotation.get("supercategory")
        name = new_annotation.get("name")

        # Check if supercategory and name are available
        if supercategory and name:
            # Create the directory path
            directory_path = os.path.join(base_dir, supercategory, name)

            # Create the directory if it doesn't exist
            if not os.path.exists(directory_path):
                os.makedirs(directory_path)

            # Set the file name using the annotation id (or "unknown" if not available), and pad it
            file_name = pad_filename(str(new_annotation.get("id", "unknown")) + ".json")
            file_path = os.path.join(directory_path, file_name)

            # Write the merged data to a JSON file
            with open(file_path, 'w') as file:
                json.dump(new_annotation, file, indent=4)

print("All JSON files have been saved and renamed.")


In [None]:
# split the images into different folders according to the supercategory and name

import os
import shutil

# create a mapping dict to store the {supercategory}/{name} for each image id
id_to_category = {}
for item in new_annotations:
    image_id = int(item.get("id", "unknown"))
    supercategory = item.get("supercategory")
    name = item.get("name")
    if supercategory and name:
        id_to_category[image_id] = os.path.join(supercategory, name)

# image folder path
img_folder = "data/ap-10k/data"

# iterate through the image folder
for img_file in os.listdir(img_folder):
    img_path = os.path.join(img_folder, img_file)
    if os.path.isfile(img_path):
        # extract the image id
        img_id = int(img_file.split(".")[0][7:])
        
        # find the corresponding {supercategory}/{name}
        category_path = id_to_category.get(img_id)
        if category_path:
            # create the target directory
            target_directory = os.path.join("data/ap-10k/JPEGImages", category_path)
            if not os.path.exists(target_directory):
                os.makedirs(target_directory)

            # move the image to the target directory
            target_path = os.path.join(target_directory, img_file)
            shutil.move(img_path, target_path)

# delete the original folder, data/ap-10k/data
shutil.rmtree("data/ap-10k/data")

print("All images have been organized according to their respective categories.")

In [28]:
import os
import json

base_path = "data/ap-10k/ImageAnnotation"

def load_list_from_file(file_path):
    """Load a list of lines from a file."""
    with open(file_path, 'r') as file:
        return [line.strip() for line in file]

def split_json_files(json_files):
    """Split JSON files into train, eval, and test sets based on specified criteria."""
    length = len(json_files)
    test_size = min(30, length)  # Ensure we don't exceed the total number of files
    eval_size = min(20, max(0, length - test_size))  # Adjust eval_size based on remaining files
    train_size = max(0, length - test_size - eval_size)  # Avoid negative train_size

    # Split the dataset
    train, eval, test = json_files[:train_size], json_files[train_size:train_size+eval_size], json_files[-test_size:]
    return train, eval, test

def save_list_to_file(file_path, items):
    """Save a list of items to a file."""
    with open(file_path, 'w') as file:
        for item in items:
            file.write(f"{item}\n")

is_crowd_list = load_list_from_file("data/ap-10k_is_crowd.txt")

for root, _, files in os.walk(base_path):
    if root.count(os.sep) == base_path.count(os.sep) + 2:
        json_list = [os.path.join(root, f) for f in files if f.endswith(".json")]
        filtered_json_list = []

        for json_file in json_list:
            with open(json_file, 'r') as f:
                json_data = json.load(f)
            
            if json_file.split('/')[-1].strip('.json') in is_crowd_list:
                json_data["is_crowd"] = 1
                with open(json_file, 'w') as f:
                    json.dump(json_data, f)
            elif json_data["num_keypoints"] >= 3:
                filtered_json_list.append(json_file)
        
        train_json_list, eval_json_list, test_json_list = split_json_files(filtered_json_list)
        
        # Save the lists to their respective files
        save_list_to_file(os.path.join(root, "train_filtered.txt"), train_json_list)
        save_list_to_file(os.path.join(root, "val_filtered.txt"), eval_json_list)
        save_list_to_file(os.path.join(root, "test_filtered.txt"), test_json_list)

print("JSON file splitting complete.")

JSON file splitting complete.


In [None]:
import os
import random
import json
import numpy as np
import itertools

# Set the seed for reproducibility
random.seed(42)

def generate_pairs(base_path, file_name, output_folder, N_multiplier=None): #intra-specicies
    N_total_pairs = 0
    for root, dirs, files in os.walk(base_path):
        # Process only subdirectories that are two levels deep
        if root.count(os.sep) == base_path.count(os.sep) + 2:
            json_list = []
            with open(os.path.join(root, file_name), 'r') as file:
                json_list = [line.strip() for line in file]

            # For training, set N based on the length of json_list and a multiplier
            if N_multiplier is not None:
                N = 50 * len(json_list)  # Specific to training
            else:
                N = len(list(itertools.combinations(json_list, 2)))

            random.shuffle(json_list)
            all_possible_pairs = list(itertools.combinations(json_list, 2))
            possible_pairs = []

            for pair in all_possible_pairs:
                src_json_path, trg_json_path = pair
                with open(src_json_path) as f:
                    src_data = json.load(f)
                with open(trg_json_path) as f:
                    trg_data = json.load(f)
                src_kpt = np.array(src_data["keypoints"]).reshape(-1, 3)
                trg_kpt = np.array(trg_data["keypoints"]).reshape(-1, 3)
                mutual_vis = src_kpt[:, -1] / 2 * trg_kpt[:, -1] / 2
                if mutual_vis.sum() >= 3:
                    possible_pairs.append(pair)

            # Adjust N based on the number of possible pairs
            N = min(N, len(possible_pairs))
            pairs_sampled = random.sample(possible_pairs, N) if N > 0 else []

            for pair in pairs_sampled:
                src_json_path, trg_json_path = pair
                src_json_name, trg_json_name = os.path.basename(src_json_path).split(".")[0], os.path.basename(trg_json_path).split(".")[0]
                category_name = os.path.basename(os.path.dirname(src_json_path))
                new_json_file_name = f"{src_json_name}-{trg_json_name}:{category_name}.json"
                
                if not os.path.exists(output_folder):
                    os.makedirs(output_folder)
                new_json_path = os.path.join(output_folder, new_json_file_name)
                
                data = {"src_json_path": src_json_path, "trg_json_path": trg_json_path}
                with open(new_json_path, 'w') as f:
                    json.dump(data, f, indent=4)
                
                N_total_pairs += 1

    print(f"Total {N_total_pairs} pairs generated for {file_name}")

base_path = "data/ap-10k/ImageAnnotation"
# Generate for training set with a specific multiplier for N
generate_pairs(base_path, "train_filtered.txt", 'data/ap-10k/PairAnnotation/trn', N_multiplier=50)
# Generate for test and validation sets without the multiplier for N
generate_pairs(base_path, "test_filtered.txt", 'data/ap-10k/PairAnnotation/test')
generate_pairs(base_path, "val_filtered.txt", 'data/ap-10k/PairAnnotation/val')


In [None]:
import os
import random
import json
import numpy as np
import itertools

# Set the seed for reproducibility
random.seed(42)

def generate_cross_species_pairs(base_path, file_name, output_folder, N_pairs_per_combination):
    N_total_pairs = 0
    subfolder_path = {}
    for root, dirs, files in os.walk(base_path):
        if root.count(os.sep) == base_path.count(os.sep) + 1:
            subfolder_path[root] = []
            for subroot, subdirs, subfiles in os.walk(root):
                if subroot.count(os.sep) == root.count(os.sep) + 1:
                    subfolder_path[root].append(subroot)

    for key, value in subfolder_path.items():
        if len(value) > 1:  # the family has more than one species
            total_cross_species_pairs = []
            species_combination = list(itertools.combinations(value, 2))
            for species_pair in species_combination:
                with open(os.path.join(species_pair[0], file_name), 'r') as train_file:
                    train_json_list_1 = [line.strip() for line in train_file]

                with open(os.path.join(species_pair[1], file_name), 'r') as train_file:
                    train_json_list_2 = [line.strip() for line in train_file]

                cross_species_pairs = list(itertools.product(train_json_list_1, train_json_list_2))
                for pair in cross_species_pairs:
                    if random.random() > 0.5:
                        pair = (pair[1], pair[0])
                total_cross_species_pairs.extend(cross_species_pairs)

            possible_pairs = []
            for pair in total_cross_species_pairs:
                src_json_path, trg_json_path = pair
                with open(src_json_path) as f:
                    src_data = json.load(f)
                with open(trg_json_path) as f:
                    trg_data = json.load(f)
                src_kpt = np.array(src_data["keypoints"]).reshape(-1, 3)
                trg_kpt = np.array(trg_data["keypoints"]).reshape(-1, 3)
                src_vis = src_kpt[:, -1] / 2
                trg_vis = trg_kpt[:, -1] / 2
                mutual_vis = src_vis * trg_vis
                if mutual_vis.sum() >= 3:
                    possible_pairs.append(pair)

            N = min(N_pairs_per_combination, len(possible_pairs))
            pairs_sampled = random.sample(possible_pairs, N)

            for pair in pairs_sampled:
                src_json_path, trg_json_path = pair
                src_json_name = os.path.basename(src_json_path).split(".")[0]
                trg_json_name = os.path.basename(trg_json_path).split(".")[0]
                category_name = os.path.basename(os.path.dirname(os.path.dirname(src_json_path)))
                new_json_file_name = f"{src_json_name}-{trg_json_name}:{category_name}.json"
                new_json_path = os.path.join(output_folder, new_json_file_name)

                if not os.path.exists(output_folder):
                    os.makedirs(output_folder)

                data = {
                    "src_json_path": src_json_path,
                    "trg_json_path": trg_json_path
                }

                with open(new_json_path, 'w') as f:
                    json.dump(data, f, indent=4)

            N_total_pairs += N
    
    print(f"Total {N_total_pairs} pairs for {file_name}")

base_path = "data/ap-10k/ImageAnnotation"
generate_cross_species_pairs(base_path, "val_filtered.txt", 'data/ap-10k/PairAnnotation/val_cross_species', 400)
generate_cross_species_pairs(base_path, "test_filtered.txt", 'data/ap-10k/PairAnnotation/test_cross_species', 900)


In [None]:
import os
import random
import json
import numpy as np
import itertools
from tqdm import tqdm

# Set seed for reproducibility
random.seed(42)

def generate_cross_family_pairs(base_path, file_name, output_folder, N_pairs_per_combination):
    N_total_pairs = 0
    subfolder_path = {}

    # Collect all JSON file paths within each family
    for root, dirs, files in os.walk(base_path):
        if root.count(os.sep) == base_path.count(os.sep) + 1:
            subfolder_path[root] = []
            for subroot, subdirs, subfiles in os.walk(root):
                if subroot.count(os.sep) == root.count(os.sep) + 1:
                    with open(os.path.join(subroot, file_name), 'r') as train_file:
                        train_json_list = [line.strip() for line in train_file]
                    subfolder_path[root].extend(train_json_list)

    # Iterate over combinations of families
    families_combination = list(itertools.combinations(subfolder_path.keys(), 2))

    for family_pair in tqdm(families_combination, desc="Generating pairs"):
        family_1, family_2 = family_pair
        family_1_json_list = subfolder_path[family_1]
        family_2_json_list = subfolder_path[family_2]

        # Generate all possible pairs between two families
        cross_family_pairs = list(itertools.product(family_1_json_list, family_2_json_list))

        # Filter out pairs based on visibility criteria
        possible_pairs = []
        for pair in cross_family_pairs:
            src_json_path, trg_json_path = pair
            with open(src_json_path) as f:
                src_data = json.load(f)
            with open(trg_json_path) as f:
                trg_data = json.load(f)
            src_kpt = np.array(src_data["keypoints"]).reshape(-1, 3)
            trg_kpt = np.array(trg_data["keypoints"]).reshape(-1, 3)
            mutual_vis = src_kpt[:, -1] / 2 * trg_kpt[:, -1] / 2
            if mutual_vis.sum() >= 3:
                possible_pairs.append(pair)

        # Sample pairs if more are available than needed
        N = min(N_pairs_per_combination, len(possible_pairs))
        pairs_sampled = random.sample(possible_pairs, N)

        # Create annotation files for sampled pairs
        for pair in pairs_sampled:
            src_json_path, trg_json_path = pair
            src_json_name, trg_json_name = os.path.basename(src_json_path).split(".")[0], os.path.basename(trg_json_path).split(".")[0]
            category_name = os.path.basename(os.path.dirname(os.path.dirname(src_json_path)))
            new_json_file_name = f"{src_json_name}-{trg_json_name}:all.json"
            new_json_path = os.path.join(output_folder, new_json_file_name)

            if not os.path.exists(output_folder):
                os.makedirs(output_folder)

            data = {"src_json_path": src_json_path, "trg_json_path": trg_json_path}
            with open(new_json_path, 'w') as f:
                json.dump(data, f, indent=4)

            N_total_pairs += N

    print(f"Total {N_total_pairs} pairs generated for {file_name}")

base_path = "data/ap-10k/ImageAnnotation"
# Generate for test set
generate_cross_family_pairs(base_path, "test_filtered.txt", 'data/ap-10k/PairAnnotation/test_cross_family', 30)
# Generate for val set
generate_cross_family_pairs(base_path, "val_filtered.txt", 'data/ap-10k/PairAnnotation/val_cross_family', 20)


In [None]:
# visualize the key points of ap-10k benchmark

import json
import numpy as np
import torch
from PIL import Image
from glob import glob
from utils.utils_correspondence import resize
from utils.utils_visualization import draw_correspondences_gathered
from utils.utils_dataset import preprocess_kps_pad

np.random.seed(42)

def load_ap10k_data(path="data/ap-10k", size=840, category='all', split='test_cross_family', subsample=20):
    np.random.seed(42)
    pairs = sorted(glob(f'{path}/PairAnnotation/{split}/*:{category}.json'))
    if subsample is not None and subsample > 0:
        pairs = [pairs[ix] for ix in np.random.choice(len(pairs), subsample)]
    # print(f'Number of SPairs for {category} = {len(pairs)}')
    files = []
    # print(f'Number of SPair key points for {category} <= {num_kps}')
    kps = []
    thresholds = []
    for pair in pairs:
        with open(pair) as f:
            data = json.load(f)
        source_json_path = data["src_json_path"]
        target_json_path = data["trg_json_path"]
        src_img_path = source_json_path.replace("json", "jpg").replace('ImageAnnotation', 'JPEGImages')
        trg_img_path = target_json_path.replace("json", "jpg").replace('ImageAnnotation', 'JPEGImages')

        with open(source_json_path) as f:
            src_file = json.load(f)
        with open(target_json_path) as f:
            trg_file = json.load(f)
            
        source_bbox = np.asarray(src_file["bbox"])  # l t w h
        target_bbox = np.asarray(trg_file["bbox"])
        
        source_size = np.array([src_file["width"], src_file["height"]])  # (W, H)
        target_size = np.array([trg_file["width"], trg_file["height"]])  # (W, H)

        # print(source_raw_kps.shape)
        source_kps = torch.tensor(src_file["keypoints"]).view(-1, 3).float()
        source_kps[:,-1] /= 2
        source_kps, src_x, src_y, src_scale = preprocess_kps_pad(source_kps, source_size[0], source_size[1], size)

        target_kps = torch.tensor(trg_file["keypoints"]).view(-1, 3).float()
        target_kps[:,-1] /= 2
        target_kps, trg_x, trg_y, trg_scale = preprocess_kps_pad(target_kps, target_size[0], target_size[1], size)
        # The source thresholds aren't actually used to evaluate PCK on SPair-71K, but for completeness
        # they are computed as well:
        # thresholds.append(max(source_bbox[3] - source_bbox[1], source_bbox[2] - source_bbox[0]))
        if split == 'test':
            thresholds.append(max(target_bbox[3], target_bbox[2])*trg_scale)
        elif split == 'trn':
            thresholds.append(max(source_bbox[3], source_bbox[2])*src_scale)
            thresholds.append(max(target_bbox[3], target_bbox[2])*trg_scale)

        kps.append(source_kps)
        kps.append(target_kps)
        files.append(src_img_path)
        files.append(trg_img_path)

    kps = torch.stack(kps)
    used_kps, = torch.where(kps[:, :, 2].any(dim=0))
    kps = kps[:, used_kps, :]
    # print(f'Final number of used key points: {kps.size(1)}')
    return files, kps, thresholds, used_kps


files, kps, thresholds, used_kps = load_ap10k_data(category='all', split='test_cross_family') # one can specify the category and split

pair_idx = 0
src_img = Image.open(files[2*pair_idx]).convert('RGB')
src_kps = kps[2*pair_idx]
src_img = resize(src_img, 840, resize=True, to_pil=True)

trg_img = Image.open(files[2*pair_idx+1]).convert('RGB')
trg_kps = kps[2*pair_idx+1]
trg_img = resize(trg_img, 840, resize=True, to_pil=True)

vis = src_kps[:, 2] * trg_kps[:, 2] > 0 #tensor([ True,  True,  True,  True,  True,  True,  True,  True, False,  True, True, False, False, False, False])
src_kps = src_kps[vis]
trg_kps = trg_kps[vis]
fig = draw_correspondences_gathered(src_kps[:, [1,0]], trg_kps[:, [1,0]], src_img, trg_img)