## Read Config

In [1]:
import os
# Automatically read from your env variables
FIFTYONE_PORT = int(os.getenv("FIFTYONE_PORT", 5151))
FIFTYONE_URI = os.getenv("FIFTYONE_URI", "0.0.0.0")

## Utils Function

In [2]:
import fiftyone as fo

def ext(path):
    _, file_ext = os.path.splitext(path)
    return file_ext

def read_yolo_label(path):
    with open(path, "r") as f:
        data = f.read().splitlines()
    
    bboxes = []
    labels = []
    for obj in data:
        obj = obj.split()
        
        label = int(obj[0])
        cx = float(obj[1])
        cy = float(obj[2])
        w = float(obj[3])
        h = float(obj[4])
        
        x = cx - w/2
        y = cy - h/2
        
        bbox = [x, y, w, h]
        bboxes.append(bbox)
        labels.append(label)
    
    return bboxes, labels

def read_yolo_pred(path, xyxy=False):
    with open(path, "r") as f:
        data = f.read().splitlines()
    
    bboxes = []
    labels = []
    scores = []
    for obj in data:
        obj = obj.split()
        
        label = int(obj[0])
        try:
            score = float(obj[5])
        except:
            score = 1.0
        
        if xyxy:
            x1 = int(obj[1])
            y1 = int(obj[2])
            x2 = int(obj[3])
            y2 = int(obj[4])
            
            x = x1
            y = y1
            w = x2 - x1
            h = y2 - y1
        else:
            cx = float(obj[1])
            cy = float(obj[2])
            w = float(obj[3])
            h = float(obj[4])

            x = cx - w/2
            y = cy - h/2
        
        bbox = [x, y, w, h]
        bboxes.append(bbox)
        labels.append(label)
        scores.append(score)
    
    return bboxes, labels, scores

def create_dataset(imgs_path, labels_path, classes=None, name="eval_det", xyxy=False):
    samples = []
    for img_path, label_path in zip(imgs_path, labels_path):
        sample = fo.Sample(filepath=img_path)
        gt_bboxes, gt_labels = read_yolo_label(label_path)
        gt_detections = [fo.Detection(label=str(label), bounding_box=bbox) for label, bbox in zip(gt_labels, gt_bboxes)]
        sample["ground_truth"] = fo.Detections(detections=gt_detections)
        samples.append(sample)
    print(f'{len(samples)} images found')
    # Create dataset
    dataset = fo.Dataset(name, overwrite=True)
    dataset.add_samples(samples)
    return dataset

def fo_eval_det_dataset(imgs_path, labels_path, preds_path, classes=None, name="eval_det", xyxy=False):
    samples = []
    for img_path, label_path, pred_path in zip(imgs_path, labels_path, preds_path):
        sample = fo.Sample(filepath=img_path)
        gt_bboxes, gt_labels = read_yolo_label(label_path)
        pred_bboxes, pred_labels, pred_scores = read_yolo_pred(pred_path, xyxy=xyxy)
        
        gt_detections = [fo.Detection(label=str(label), bounding_box=bbox) for label, bbox in zip(gt_labels, gt_bboxes)]
        pred_detections = [fo.Detection(label=str(label), bounding_box=bbox, confidence=score) for label, bbox, score in zip(pred_labels, pred_bboxes, pred_scores)]
        
        sample["ground_truth"] = fo.Detections(detections=gt_detections)
        sample["predictions"] = fo.Detections(detections=pred_detections)
        samples.append(sample)

    print(f'{len(samples)} images found')
    
    # Create dataset
    # fo.core.dataset.delete_dataset(name, verbose=True)
    # dataset = fo.Dataset(name, overwrite=True)
    # fo.delete_non_persistent_datasets(verbose=True)
    dataset = fo.Dataset(name, overwrite=True)
    dataset.add_samples(samples)
    
    return dataset

## Load Dataset

In [5]:
import glob

imgs_dir = "IVTXX17BBXX191024-01_Vehicle Detection Short Distance 5 & 17 Classes Dataset/images"
labels_dir = "IVTXX17BBXX191024-01_Vehicle Detection Short Distance 5 & 17 Classes Dataset/labels"

txt_exts = [".txt"]
imgs_path = sorted([img_path for img_path in sorted(glob.glob(f"{imgs_dir}/**/*.*", recursive=True)) if ext(img_path) not in txt_exts])
labels_path = sorted([label_path for label_path in sorted(glob.glob(f"{labels_dir}/**/*.*", recursive=True)) if ext(label_path) in txt_exts])

assert len(imgs_path)==len(labels_path)

# fiftyone dataset
dataset = create_dataset(imgs_path, labels_path, name="vehicle_dataset")

9224 images found
 100% |███████████████| 9224/9224 [17.8s elapsed, 0s remaining, 416.3 samples/s]      


In [6]:
dataset.persistent = True

In [4]:
dataset = fo.load_dataset("vehicle_dataset")
dataset

Name:        vehicle_dataset
Media type:  image
Num samples: 9224
Persistent:  True
Tags:        []
Sample fields:
    id:           fiftyone.core.fields.ObjectIdField
    filepath:     fiftyone.core.fields.StringField
    tags:         fiftyone.core.fields.ListField(fiftyone.core.fields.StringField)
    metadata:     fiftyone.core.fields.EmbeddedDocumentField(fiftyone.core.metadata.ImageMetadata)
    ground_truth: fiftyone.core.fields.EmbeddedDocumentField(fiftyone.core.labels.Detections)

In [6]:
session = fo.launch_app(dataset)

## Load Model

In [None]:
import fiftyone.zoo as foz

# Load zoo model
model = foz.load_zoo_model("resnext50-32x4d-imagenet-torch")

Downloading model from 'https://download.pytorch.org/models/resnext50_32x4d-7cdf4587.pth'...
 100% |████|  766.3Mb/766.3Mb [13.5s elapsed, 0s remaining, 66.3Mb/s]      


Downloading: "https://download.pytorch.org/models/resnext50_32x4d-7cdf4587.pth" to /root/.cache/torch/hub/checkpoints/resnext50_32x4d-7cdf4587.pth
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████| 95.8M/95.8M [00:13<00:00, 7.36MB/s]


In [None]:
img_embeddings = dataset.compute_embeddings(model)

In [None]:
import numpy as np

np.save("full_frame_embeddings.npy", img_embeddings)

## Visualize Embedding Result

In [None]:
from fiftyone import ViewField as F
import fiftyone.brain as fob
import fiftyone.core.utils as fou

def compute_visualization_classification(
    dataset,
    embeddings,
    method="umap",
    brain_key="umap_embeddings",
    ):
    # Compute 2D representation using pre-computed embeddings
    viz_results = fob.compute_visualization(
        dataset,
        embeddings=embeddings,
        num_dims=2,
        method=method,
        brain_key=brain_key,
        verbose=True,
        seed=51)
    return viz_results

def compute_visualization_detection(
    dataset,
    embeddings,
    patches_field="ground_truth"
        ):
    # Compute 2D representation using pre-computed embeddings
    viz_results = fob.compute_visualization(
        dataset,
        patches_field=patches_field,
        embeddings=embeddings,
        num_dims=2,
        brain_key="image_embeddings",
        verbose=True,
        seed=51)
    return viz_results

def compute_uniqueness(dataset, embeddings):
    fob.compute_uniqueness(dataset, embeddings=embeddings)
    dataset.sort_by("uniqueness", reverse=True)
    return dataset

def plot_img_embedding(viz_result, labels="unlabeled"):
    plot = viz_result.visualize(labels=labels)
    return plot

## Plot Full Embeddings

In [None]:
full_viz_results = compute_visualization_classification(dataset, img_embeddings, method="tsne", brain_key="tsne_img_embeddings")

In [None]:
plot_full_frame = plot_img_embedding(full_viz_results, labels="full_frame")
fo.close_app()
plot_full_frame.show()
session = fo.launch_app(dataset, port=FIFTYONE_PORT)
session.plots.attach(plot_full_frame)

In [None]:
plot_full_frame = plot_img_embedding(full_viz_results, labels="full_frame")
fo.close_app()
session = fo.launch_app(dataset, port=FIFTYONE_PORT)
session.plots.attach(plot_full_frame)

## Compute Detection

In [None]:
patches_viz_results = compute_visualization_detection(dataset, patches_embeddings)

In [None]:
plot_patches_embd = plot_img_embedding(patches_viz_results, labels="ground_truth.detections")
fo.close_app()
plot_patches_emb.show()
session = fo.launch_app(dataset, port=FIFTYONE_PORT)
session.plots.attach(plot_patches_embd)

## Compute Uniqueness

In [None]:
dataset = compute_uniqueness(dataset, img_embeddings)

In [None]:
fo.close_app()
session = fo.launch_app(dataset, port=FIFTYONE_PORT)

## Compute Similarity

In [None]:
similarity_results = fob.compute_similarity(
    dataset, model="resnext50-32x4d-imagenet-torch", patches_field="ground_truth", brain_key="gt_sim"
)

In [None]:
fob.compute_similarity(
    dataset, model="resnext50-32x4d-imagenet-torch", brain_key="gt_sim_img"
)

In [None]:
dataset.persistent = True

In [None]:
session.dataset = dataset
session.show()

In [None]:
# evaluate
results = dataset.evaluate_detections(
    "predictions",
    gt_field="ground_truth",
    eval_key="eval",
)

# Print a classification report for the classes
results.print_report()

# Print some statistics about the total TP/FP/FN counts
print("TP: %d" % dataset.sum("eval_tp"))
print("FP: %d" % dataset.sum("eval_fp"))
print("FN: %d" % dataset.sum("eval_fn"))

## Split Dataset

In [None]:
def train_test_split(dataset, percentage=0.25):
    temp_dataset = dataset.clone()
    test_size = int(len(dataset) * percentage)
    test_samples = temp_dataset.take(test_size)
    temp_dataset.delete_samples(test_samples)
    test_dataset = test_samples.clone()
    test_dataset.persistent = True
    temp_dataset.persistent = True
    return temp_dataset, test_dataset

## Split Main Dataset

In [None]:
unique_dataset_samples = dataset.match(F("uniqueness") > 0.6)
common_dataset_samples = dataset.match(F("uniqueness") < 0.6)

In [None]:
unique_dataset_samples

## Export Dataset