# Test PointPillars on ARCS data

In [None]:
# Libraries
import bbox
import itertools
import math
import os
import random
import sys
import time
import numpy as np
import pandas as pd
from contextlib import redirect_stdout
from mmdet3d.apis import LidarDet3DInferencer
from pathlib import Path

In [None]:
# Initialize inferencer
inferencer = LidarDet3DInferencer('pointpillars_kitti-3class')

In [None]:
# Directory paths
ARCS_LABELS = '../data/eval_data/labels'
ARCS_DATA_DIR = '../data/eval_data/arcs'
PREV_ARCS_AZIMUTH_FILTERED_DATA_DIR = '../data/eval_data/arcs_azimuth_filtered_01'
ARCS_AZIMUTH_FILTERED_DATA_DIR = '../data/eval_data/arcs_azimuth_filtered'
ARCS_LABEL_FILTERED_DATA_DIR = '../data/eval_data/arcs_label_filtered'

## Evaluation functions

In [None]:
# The 3D bbox object requires quaternion values. This function converts the yaw to these values
def yaw_to_quaternion(yaw):
    """
    Converts a yaw angle (rotation about the z-axis) to quaternion coordinates.
    
    Parameters:
    - yaw (float): The yaw angle in radians.
    
    Returns:
    - tuple of (rw, rx, ry, rz): The quaternion representation of the yaw.
    """
    # Compute the components of the quaternion
    rw = math.cos(yaw / 2.0)
    rz = math.sin(yaw / 2.0)
    
    # rx and ry are zero because the rotation is only about the z-axis
    return (rw, 0, 0, rz)

In [None]:
# This function takes x, y, z, dx, dy, dz, yaw and returns a 3D bbox object from the bbox package
# Yaw is in radians
def get3DBbox(x, y, z, dx, dy, dz, yaw):
    # Example usage:
    rw, rx, ry, rz = yaw_to_quaternion(yaw)
    bbox_obj = bbox.BBox3D(x, y, z, length=dx, width=dy, height=dz, rw=rw, rx=rx, ry=ry, rz=rz)
    return bbox_obj

In [None]:
# Takes two bounding boxes, returns the IoU
def calculate_iou(bbox1, bbox2):
    bbox_obj1 = get3DBbox(*bbox1)
    bbox_obj2 = get3DBbox(*bbox2)
    return bbox.metrics.jaccard_index_3d(bbox_obj1, bbox_obj2)

In [None]:
evaluation_area = [21.0175, 11.717, -0.3925, 40.035, 55.9349, 7.535, 0.5]

In [None]:
# Returns true if the pending box overlaps the evaluation area
def is_in_eval_area(pending_bbox):
    iou = calculate_iou(evaluation_area, pending_bbox)
    if iou > 0:
        return True
    return False
#     return True

In [None]:
# _evaluate frame takes a prediction dictionary output by the model, and a list of ground truths
# from the label file, and returns the TPs, FPs, and FNs for one lidar frame
def _evaluate_frame(predictions, ground_truths, iou_threshold=0.25):
    TPs, FPs, FNs = 0, 0, len(ground_truths)
    used_gt = set()
    confidence_labels = []
    
    for pred in predictions['predictions']:
        for label, score, bbox in zip(pred['labels_3d'], pred['scores_3d'], pred['bboxes_3d']):
            # Only count the prediction if it's within the evaluation area
            if is_in_eval_area(bbox):
                best_iou = 0
                best_gt = None
                for gt in ground_truths:
                    iou = calculate_iou(bbox, gt[8:])
                    if iou > best_iou:
                        best_iou = iou
                        best_gt = tuple(gt)

                if best_iou > iou_threshold:
                    if best_gt not in used_gt:
                        used_gt.add(best_gt)
                        TPs += 1
                        FNs -= 1
                        confidence_labels.append((score, 1))
                    else:
                        FPs += 1
                        confidence_labels.append((score, 0))
                else:
                    FPs += 1
    return TPs, FPs, FNs, confidence_labels

In [None]:
def parse_ground_truths(label_path):
    labels = []
    with open(label_path, 'r') as file:
        for line in file:
            parts = line.strip().split()
            bbox = []
            # Add the category as a string
            bbox.append(parts[0])
            # Extract the bounding box dimensions and location as 
            bbox = bbox + [float(value) for value in parts[1:15]]  
            bbox[2] = int(bbox[2])
            # Only include the bbox if it's in the evaluation area
            if is_in_eval_area(bbox[8:]):
                labels.append(bbox)
    return labels

In [None]:
generated_problem_file_list = []

In [None]:
# _evaluate frame takes a lidar and label file path and
# and returns the TPs, FPs, and FNs for one lidar frame
def evaluate_frame(lidar_file_path, label_file_path):
    inputs = dict(points=str(lidar_file_path))
    
    # Get predictions
    try:
        predictions = inferencer(inputs)
            # Get ground truths
        ground_truths = parse_ground_truths(label_file_path)
        num_ground_truths = len(ground_truths)

        TPs, FPs, FNs, confidence_labels = _evaluate_frame(predictions, ground_truths)
    except:
        # Add file id to the problem file list
        print(str(lidar_file_path))
        lidar_filename = os.path.basename(lidar_file_path)
        file_id, extension = os.path.splitext(lidar_filename)
        generated_problem_file_list.append(file_id)
        TPs, FPs, FNs, num_ground_truths, confidence_labels = 0, 0, 0, 0, []
    
    return TPs, FPs, FNs, num_ground_truths, confidence_labels

In [None]:
# Problem files. I'll find out why later
problem_file_ids = ['000522', '003066', '000556', '004340', '001016', '004224', '005318', '004344', '001014', '000522',
                    '003066', '000556', '004340', '001016', '004224', '005318', '004344', '001014', '001027', '006440',
                    '001026', '000869', '001031', '000530', '004928', '000548', '000553', '004290', '006437', '000548',
                    '000553', '004290', '006437', '000542', '001015', '006443', '005320', '000542', '001015', '006443', 
                    '005320']

In [None]:
def print_summary(list_file_ids):
    for id in list_file_ids:
        print(f'\n_____________________________________________________________')
        
        file_path = f'{ARCS_LABEL_FILTERED_DATA_DIR}/{id}.bin'
        label_path = f'{ARCS_LABELS}/{id}.txt' 

        # Open point cloud file
        points = np.fromfile(file_path, dtype=np.float32).reshape(-1, 4)
        df_points = pd.DataFrame(points, columns=['x', 'y', 'z', 'intensity'])

        # Print the DataFrame summary
        print(f"Summary for file ID {id}:")
        print('length: ' + str(len(df_points)))
        print("DataFrame Info:")
        df_points.info()
        print("\nDataFrame Description:")
        print(df_points.describe())
        print(f'\nlabels:')
        # Print labels
        with open(label_path, 'r') as file:
            for line in file:
                print(line)

## Metrics calculation functions

In [None]:
def calculate_precision_recall(predictions, num_ground_truths):
    # Sort by confidence score in descending order
    predictions.sort(key=lambda x: x[0], reverse=True)
    
    tp = 0
    fp = 0
    total_positives = sum(is_tp for _, is_tp in predictions)
    
    precisions = []
    recalls = []
    
    for conf, is_tp in predictions:
        if is_tp:
            tp += 1
        else:
            fp += 1
        
        precision = tp / (tp + fp)
        recall = tp / num_ground_truths
        
        precisions.append(precision)
        recalls.append(recall)
    
    return precisions, recalls

In [None]:
def calculate_ap(precisions, recalls):
    # Calculate AP using the trapezoidal rule to compute the area under the curve
    ap = 0
    for i in range(1, len(recalls)):
        ap += (recalls[i] - recalls[i-1]) * (precisions[i] + precisions[i-1]) / 2
    return ap

In [None]:
test_dir = Path(ARCS_DATA_DIR)
files = [f for f in os.listdir(test_dir) if f.endswith('.bin')]
random.shuffle(files)
files = files[:400]
files = [file for file in files if file[:-4] not in problem_file_ids]
print(files)

In [None]:
def evaluate_dataset(dataset_path, num_frames):
    label_dir = Path(ARCS_LABELS)
    dataset_dir = Path(dataset_path)
    
    total_TPs, total_FPs, total_FNs = 0, 0, 0
    num_ground_truths = 0
    confidence_labels = []

    start = time.time()
    for bin_file in files:
#     for bin_file in itertools.islice(dataset_dir.iterdir(), num_frames):
#     for bin_file in dataset_dir.iterdir():
        if str(bin_file).endswith('.bin'):
            print('.', end='')

            lidar_filename = os.path.basename(bin_file)
            # Split the filename from the extension ('006428', '.txt')
            file_id, extension = os.path.splitext(lidar_filename)
            if file_id not in problem_file_ids:
                label_filename = file_id + '.txt'

                # Make file paths
                lidar_file_path = Path(dataset_dir, lidar_filename)
                label_file_path = Path(label_dir, label_filename)
                
#                 print('evaluating: ' + str(lidar_file_path))
                
                TPs, FPs, FNs, num_frame_ground_truths, frame_confidence_labels = evaluate_frame(lidar_file_path, label_file_path)

                total_TPs += TPs
                total_FPs += FPs
                total_FNs += FNs
                num_ground_truths += num_frame_ground_truths
                confidence_labels += frame_confidence_labels
            else:
                print('skipping ' + file_id)
    end = time.time()
        
    return total_TPs, total_FPs, total_FNs, num_ground_truths, confidence_labels, (end - start) / num_frames

In [None]:
def test_pointpillars(dataset_path, dataset_name, num_frames=200):
    print('Evaluating dataset: ' + dataset_path)
    # Get all TP, FP, and FN in the dataset
    TPs, FPs, FNs, num_ground_truths, confidence_labels, time_per_frame = evaluate_dataset(dataset_path, num_frames)
    # Get metrics
    precision = TPs / (TPs + FPs) if TPs + FPs > 0 else 0
    recall = TPs / (TPs + FNs) if TPs + FNs > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    # Placeholder for average precision calculation
    precisions, recalls = calculate_precision_recall(confidence_labels, num_ground_truths)
    average_precision = calculate_ap(precisions, recalls)

    # Organize results into a dictionary
    results = {
        'Dataset': dataset_name,
        'Precision': precision,
        'Recall': recall,
        'F1 Score': f1_score,
        'Average Precision': average_precision,
        'time_per_frame': time_per_frame
    }
    print(results)

    return results

In [None]:
%%capture
# Run tests on ARCS
results = test_pointpillars(ARCS_DATA_DIR, 'ARCS')

In [None]:
%%capture
# Run tests on ARCS filtered
azimuth_filter_results = test_pointpillars(PREV_ARCS_AZIMUTH_FILTERED_DATA_DIR, 'Azimuth Filtered ARCS')

In [None]:
%%capture
# Run tests on ARCS filtered
new_filter_results = test_pointpillars(ARCS_AZIMUTH_FILTERED_DATA_DIR, 'New Azimuth Filtered ARCS')

In [None]:
%%capture
# Run tests on ARCS label filtered
label_filter_results = test_pointpillars(ARCS_LABEL_FILTERED_DATA_DIR, 'Label Filtered ARCS')

In [None]:
print(generated_problem_file_list)

## Show results

In [None]:
# print(results)
# print(filter_results)
# print(label_filter_results)

In [None]:
# Create DataFrame
results_df = pd.DataFrame([results, azimuth_filter_results, new_filter_results, label_filter_results])
display(results_df)

In [None]:
print(results_df.to_string(index=False))