In [1]:
import json
import pathlib
import pandas as pd
import yaml
import numpy as np
import os
import shutil
import glob
import math
import matplotlib.pyplot as plt

In [2]:
def from_json(json_file):
    """Create a dictionary from a JSON file."""

    json_file = pathlib.Path(json_file)  # In case the file path is passed as a string

    # Load json as a regular python dict
    data = json.loads(json_file.read_bytes())

    # Convert 'objects' list of dictionaries to dataframe
    if 'objects' in data:
        data['objects'] = pd.DataFrame(data['objects'])

    return data

In [3]:
def create_data_yaml(output_dir, category_dict):
    
    category_dict_local = {v: k for k, v in category_dict.items()}
    
    """Create a data.yaml file in the output directory with the specified structure."""
    # Define the data
    data = {
        'train': '../train/images',
        'val': '../valid/images',
        'test': '../test/images',
        'nc': len(category_dict_local),
        'names': category_dict_local
    }

    # Create the output directory if it doesn't exist
    output_dir.mkdir(parents=True, exist_ok=True)

    # Define the file path
    file_path = output_dir / 'data.yaml'

    # Write the data to the file
    with open(file_path, 'w') as file:
        yaml.dump(data, file, default_flow_style=False, sort_keys=False)

In [4]:
def process_row(row):
    """Process a row of the DataFrame."""
    row = row.copy()  # to avoid SettingWithCopyWarning
    # Process 'bbox' if it exists in the row
    if 'bbox' in row:
        row = pd.concat([row.drop('bbox', axis=1), row['bbox'].apply(pd.Series)], axis=1)
    # Process 'properties' if it exists in the row
    if 'properties' in row:
        row = pd.concat([row.drop('properties', axis=1), row['properties'].apply(pd.Series)], axis=1)
    return row

In [5]:
def get_dataset(input_dir):
    """Get a DataFrame representing a dataset."""
    image_dir = input_dir / 'images'
    annotation_dir = input_dir / 'annotations'

    # Create a dictionary where the keys are the base names without the extension of annotation files
    image_path = {image_file.stem: image_file for image_file in image_dir.glob('*.jpg')}
    annotation_path = {annotation_file.stem: annotation_file for annotation_file in annotation_dir.glob('*.json')}

    # Get the intersection of the keys in both lists
    common_keys = set(image_path.keys()) & set(annotation_path.keys())

    # Create a list of dictionaries from the common annotation files
    annotation_dataframes = [dict(**from_json(annotation_path[key]), image=image_path[key]) for key in common_keys]

    # Convert the list of dictionaries into a DataFrame
    df = pd.DataFrame(annotation_dataframes)

    # Apply the process_row function to each row in the 'objects' column
    df['objects'] = df['objects'].apply(process_row)

    return df

In [6]:
def label_to_code(df):
    # Create a list to store all labels
    all_labels = [] 

    # Iterate over each DataFrame in the 'objects' series
    for df_inner in df['objects']:
        if 'label' in df_inner.columns:  # Check if 'label' column exists in the inner DataFrame
            all_labels.extend(df_inner['label'])
    
    # Generate a dictionary mapping each unique label to a unique integer code.
    label_to_code = {label: i for i, label in enumerate(set(all_labels))}

    # Create a function to map 'label' to 'label_code' within each inner DataFrame
    def map_labels(df_inner):
        if 'label' in df_inner.columns:
            df_inner['label_code'] = df_inner['label'].map(label_to_code)

    # Apply the map_labels function to each inner DataFrame in the 'objects' series
    df['objects'].apply(map_labels)
    
    return df, label_to_code

In [7]:
def label_to_code_opt(df):
    # Utilize pandas.Series.explode and a lambda to avoid the for loop 
    all_labels = df['objects'].apply(lambda x: x['label'].tolist() if 'label' in x.columns else []).explode()
    
    # Generate a dictionary mapping each unique label to a unique integer code.
    label_to_code = {label: i for i, label in enumerate(all_labels.dropna().unique())}
    
    # Create a function to map 'label' to 'label_code' within each inner DataFrame
    def map_labels(df_inner):
        if 'label' in df_inner.columns:
            return df_inner.assign(label_code=df_inner['label'].map(label_to_code))
        return df_inner

    # Apply the map_labels function to each inner DataFrame in the 'objects' series
    df['objects'] = df['objects'].apply(map_labels)
    
    return df, label_to_code

In [8]:
def filter_dataframe(df, num_classes):
    """Filter the DataFrame based on the num_classes variable."""
    if num_classes != -1:
        df = df[df['label_code'] < num_classes]
    return df

In [9]:
def reduce_data(df, sample_ratio=100):
    # Sample a percentage of the rows
    df_sample = df.sample(frac=sample_ratio / 100, random_state=42)
    return df_sample

In [10]:
def to_yolo(df):
    all_new_rows = []
    for outer_i, row in df.iterrows():
        objects = row["objects"]
        image_objects = []
        image_name = row['image']
        for inner_i, obj in objects.iterrows():
            w = obj['xmax'] - obj['xmin']
            h = obj['ymax'] - obj['ymin']
            xc = (obj['xmax'] + obj['xmin']) / 2
            yc = (obj['ymax'] + obj['ymin']) / 2
            
            new_row = pd.DataFrame(
                [[obj['label_code'], xc, yc, w, h]], 
                columns=['class', 'xc', 'yc', 'w', 'h'])
            
            # Normalize calculated values to the image's width and height
            new_row[['xc', 'w']] = new_row[['xc', 'w']].div(row['width'], axis=0)
            new_row[['yc', 'h']] = new_row[['yc', 'h']].div(row['height'], axis=0)
            
            image_objects.append(new_row)
    
        # Combine all image_objects DataFrames into a single DataFrame
        if image_objects:
            image_df = pd.concat(image_objects, ignore_index=True)
        else:
            image_df = pd.DataFrame()
    
        # All_new_rows is a list of tuples. Each tuple has image_name and corresponding image_df
        all_new_rows.append((image_name, image_df))
    
    # convert the list of tuples to a DataFrame
    return pd.DataFrame(all_new_rows, columns=['image', 'objects'])

In [11]:
def split_data(df, train_ratio=0.7, val_ratio=0.2):
    # Create an array of indices, shuffled
    indices = np.random.permutation(df.index)

    # Calculate the sizes of the train, val, and test sets
    train_size = int(train_ratio * len(df))
    val_size = int(val_ratio * len(df))

    # Split the indices into train, val, and test indices
    train_indices, val_indices, test_indices = np.split(indices, [train_size, train_size + val_size])

    # Create the train, val, and test sets
    train_df = df.loc[train_indices]
    val_df = df.loc[val_indices]
    test_df = df.loc[test_indices]

    return train_df, val_df, test_df

In [12]:
def create_folders_and_labels(train_df, val_df, test_df, base_dir):
    # Create the main directories
    for set_name in ['train', 'val', 'test']:
        os.makedirs(os.path.join(base_dir, set_name, 'images'), exist_ok=True)
        os.makedirs(os.path.join(base_dir, set_name, 'labels'), exist_ok=True)

    # Assuming that each DataFrame has 'image', 'label', 'class', 'xc', 'yc', 'w', 'h' columns
    for df, set_name in zip([train_df, val_df, test_df], ['train', 'val', 'test']):
        for _, row in df.iterrows():
            # Copy the image file to the 'images' directory
            shutil.copy(row["image"], os.path.join(base_dir, set_name, "images"))
            
            # Write the label to a text file in the 'labels' directory
            label_file_path = os.path.join(base_dir, set_name, 'labels', f'{os.path.basename(row["image"]).split(".")[0]}.txt')
            with open(label_file_path, 'w') as f:
                for _, inner_row in row['objects'].iterrows():
                    f.write(f'{int(inner_row["class"])} {inner_row["xc"]} {inner_row["yc"]} {inner_row["w"]} {inner_row["h"]}\n')

    # Print the number of labels and images in each directory
    for set_name in ['train', 'val', 'test']:
        num_images = len(os.listdir(os.path.join(base_dir, set_name, 'images')))
        num_labels = len(os.listdir(os.path.join(base_dir, set_name, 'labels')))
        print(f'{set_name}: {num_images} images, {num_labels} labels')

In [13]:
def plot_sorted_counts(df: pd.DataFrame, column: str):
    # Get counts of values in 'class' column
    class_counts = df[column].value_counts()

    # Sort the counts and plot
    class_counts.sort_index().plot(kind='bar')

    plt.xlabel('Class')
    plt.ylabel('Count')
    plt.title('Number of instances per class')
    plt.show()

In [14]:
input_dir = pathlib.Path('data/')
output_dir = pathlib.Path('data/yolo')

In [15]:
# Get the dataset
df = get_dataset(input_dir)

In [16]:
df, category_dict = label_to_code(df)

In [17]:
num_classes = -1 
df = filter_dataframe(df, num_classes)

In [18]:
# Create the data.yaml file
create_data_yaml(output_dir, category_dict)

In [19]:
sample_ratio = 100 
# Reduce percentage of total dataset
df = reduce_data(df, sample_ratio)

In [20]:
df_yolo = to_yolo(df)

In [21]:
df_yolo.head()

Unnamed: 0,image,objects
0,data/images/ZMA8CFaAb8T5MN_6NhdvfA.jpg,class xc yc w ...
1,data/images/MWcOkejquLOuuTtwt1qp6A.jpg,class xc yc w ...
2,data/images/2QK-_ZCc4DBE9KphEkoz7A.jpg,class xc yc w ...
3,data/images/CgwKoXSQ7CuT2MU-lyn8mQ.jpg,class xc yc w ...
4,data/images/fJnmrhyhHPjmdTqDjA6mCg.jpg,class xc yc w ...


In [22]:
# Split data into train, val, and test sets
train_df, val_df, test_df = split_data(df_yolo)

In [None]:
create_folders_and_labels(train_df, val_df, test_df, output_dir)

In [57]:
# Get a list of all the image files in each subdirectory
train_images = set(os.listdir(os.path.join(output_dir, 'train', 'images')))
val_images = set(os.listdir(os.path.join(output_dir, 'val', 'images')))
test_images = set(os.listdir(os.path.join(output_dir, 'test', 'images')))

# Check that the images in each subdirectory are unique
assert len(train_images) == len(os.listdir(os.path.join(output_dir, 'train', 'images')))
assert len(val_images) == len(os.listdir(os.path.join(output_dir, 'val', 'images')))
assert len(test_images) == len(os.listdir(os.path.join(output_dir, 'test', 'images')))

# Check that there are no duplicates between the subdirectories
assert len(train_images.intersection(val_images)) == 0
assert len(train_images.intersection(test_images)) == 0
assert len(val_images.intersection(test_images)) == 0

print("All checks passed. The images in each subdirectory are unique and there are no duplicates between them.")

All checks passed. The images in each subdirectory are unique and there are no duplicates between them.


In [58]:
# Get a list of all the image files in each subdirectory
train_images = set(os.listdir(os.path.join(output_dir, 'train', 'images')))
val_images = set(os.listdir(os.path.join(output_dir, 'val', 'images')))
test_images = set(os.listdir(os.path.join(output_dir, 'test', 'images')))

# Find duplicates between the subdirectories
train_val_duplicates = train_images.intersection(val_images)
train_test_duplicates = train_images.intersection(test_images)
val_test_duplicates = val_images.intersection(test_images)

# Print out the duplicates and their count
print(f"Train-Val duplicates: {list(train_val_duplicates)}")
print(f"Number of Train-Val duplicates: {len(train_val_duplicates)}")

print(f"Train-Test duplicates: {list(train_test_duplicates)}")
print(f"Number of Train-Test duplicates: {len(train_test_duplicates)}")

print(f"Val-Test duplicates: {list(val_test_duplicates)}")
print(f"Number of Val-Test duplicates: {len(val_test_duplicates)}")

Train-Val duplicates: []
Number of Train-Val duplicates: 0
Train-Test duplicates: []
Number of Train-Test duplicates: 0
Val-Test duplicates: []
Number of Val-Test duplicates: 0
