In [None]:
from pcdet.datasets.kitti.kitti_dataset import *
from pcdet.datasets.dataset import *
import yaml
from easydict import EasyDict
from pathlib import Path
from pcdet.utils import common_utils

dataset_cfg = EasyDict(yaml.safe_load(open('/home/rlab10/OpenPCDet/tools/cfgs/dataset_configs/kitti_dataset.yaml')))
class_names = ['Car', 'Pedestrian', 'Cyclist']
file_path = '/home/rlab10/OpenPCDet/pcdet/datasets/kitti/kitti_dataset.py' 
ROOT_DIR = (Path(file_path).resolve().parent / '../../../').resolve()
data_path = ROOT_DIR / 'data' / 'kitti'
save_path = ROOT_DIR / 'data' / 'kitti'
kitti_infos = []
num_features = len(dataset_cfg.POINT_FEATURE_ENCODING.src_feature_list)

def create_kitti_infos(dataset_cfg, class_names, data_path, save_path, workers=4):
    from time import sleep
    
    dataset = KittiDataset(dataset_cfg=dataset_cfg, class_names=class_names, root_path=data_path, training=False, logger=common_utils.create_logger())
    
    train_split, val_split = 'train', 'val'
    num_features = len(dataset_cfg.POINT_FEATURE_ENCODING.src_feature_list)

    train_filename = save_path / ('kitti_infos_%s.pkl' % train_split)
    val_filename = save_path / ('kitti_%s_dataset.pkl' % val_split)
    test_filename = save_path / 'kitti_infos_test.pkl'

    print('\n' + '-' * 36 + 'Start to generate data infos' + '-' * 37)
    print('---------------CAUTION: Source code is configured to serve as Augmentor NOT training-----------------')

    dataset.set_split(train_split)
    # ensure that get_infos() processes the single scene, NOTE: get_infos() collects infos about all classes (except 'DontCare'), filter unwanted classes with param `used_classes` in create_groundtruth_database.
    kitti_infos_train = dataset.get_infos(num_workers=workers, has_label=True, count_inside_pts=True, num_features=num_features)
    with open(train_filename, 'wb') as f:
        pickle.dump(kitti_infos_train, f)
    print('Kitti info train file is saved to %s\n' % train_filename)
    sleep(3)

    dataset.set_split(val_split)
    # ensure that mode 'test' will process the single scene with PointFeatureEncoder, DataProcessor, FOV_FLAG
    dataset.training = False
    allowed_classes = class_names
    kitti_val_dataset = dataset.get_infos_val(num_workers=workers, has_label=True, count_inside_pts=True, num_features=num_features, class_names=allowed_classes)
    with open(val_filename, 'wb') as f:
        pickle.dump(kitti_val_dataset, f)
    print('Kitti info val file is saved to %s\n' % val_filename)
    sleep(3)

    dataset.set_split('test')
    kitti_infos_test = dataset.get_infos(num_workers=workers, has_label=False, count_inside_pts=False)
    with open(test_filename, 'wb') as f:
       pickle.dump(kitti_infos_test, f)
    print('Kitti info test file is saved to %s\n' % test_filename)
    sleep(3)

    print('\n---------------Start creating groundtruth database for later data augmentation-------------------------')
    print('---------------CAUTION: Source code is configured to serve as Augmentor NOT training-------------------')
    print('---------------No DataProcessor and PointFeatureEncoder required, handled by training data creation----')
    
    # Input the 'kitti_infos_train.pkl' to generate gt_database (cutted objects of samples)
    dataset.set_split(train_split)
    dataset.create_groundtruth_database(info_path=train_filename, used_classes=class_names, split=train_split)
    print(f'---------------These groundtruth {train_split} objects are randomly inserted into samples (augmentation)-------')
    print('-' * 41 + 'Data preparation Done' + '-' * 41)

def save_data_list(data_list=None, save_path=None, root_path=None, sample_id_list=None, augmentors=None):
    root_path = root_path if root_path is not None else Path(dataset_cfg.DATA_PATH) 
    split = dataset_cfg.DATA_SPLIT['train']
    split_dir = root_path / 'ImageSets' / (split + '.txt')
    sample_id_list = [x.strip() for x in open(split_dir).readlines()] if split_dir.exists() else None
    
    train_split = 'train'
    train_filename = save_path / ('kitti_%s_dataset.pkl' % train_split)

    aug_config_list = augmentors
    num_features = len(dataset_cfg.POINT_FEATURE_ENCODING.src_feature_list)
    
    print('\n' + '-' * 35 + 'Start to save data infos(original+augmented)' + '-' * 37)

    with open(train_filename, 'wb') as f:
        pickle.dump(data_list, f)
    
    for sample_idx in sample_id_list:
        applied_augmentations = [str(name) for name in aug_config_list]
        aug_str = ', '.join(applied_augmentations)
        print(f"{split} sample_idx: {sample_idx} (original, {aug_str})")
 
    print('Kitti info train/aug file is saved to %s' % train_filename)
    print('-' * 49 + 'Data saving Done' + '-' * 51 + '\n') 


# Step 1 : Create the data_infos, only validation data_infos and gt_database are important. 
# The val data get postprocessed through DataProcessor, PointFeatureEncoder, also includes only points of FOV.
# The gt_database is necessary for successfully creating augmented training samples.
create_kitti_infos(dataset_cfg, class_names, data_path, save_path, workers=4)

# Step 2: Create the training set with data augmentation
dataset = KittiDataset(dataset_cfg=dataset_cfg, class_names=class_names, root_path=data_path, training=True) # the training flag allows data augmentation before training

# Step 3: Call the member method to catch information
dataset.dataset_w_all_infos = dataset.get_infos(num_workers=4, has_label=True, count_inside_pts=True, num_features=num_features)


# Cath the first entry of the dataset
# TODO: How to split train and val, saving one in a list, the other in dict
# Solution: let val as .pkl and use train+aug as .pkl too?
#data_list = dataset[0].copy()
#save_data_list_to_pkl(data_list=data_list, save_path=save_path)


In [None]:
# Step 4: save it
dataset_as_list = []

for idx in range(len(dataset)):
    data, applied_augmentors = dataset[idx]
    # debug
    #sample_idx = data[0]['frame_id']
    #print(f"{sample_idx}")
    dataset_as_list.append(data)   
    # dataset_as_list.append(dataset[idx])

save_data_list(data_list=dataset_as_list, save_path=save_path, root_path=None, sample_id_list=None, augmentors=applied_augmentors)

In [None]:
# Test für eine 3D-BB in der Szene
# first_sample = dataset[0][0]
# gt_boxes = first_sample['gt_boxes']
# points = first_sample['points']

# print("GT Boxes:", len(gt_boxes))
# print("Points:", len(points))

# Test für mehrere 3D-BB in der Szene
first_sample = dataset[2][0]
gt_boxes = first_sample['gt_boxes']
points = first_sample['points']

print("GT Boxes:", len(gt_boxes))
print("GT Boxes Shape:", gt_boxes.shape)
print("Points:", len(points))
print("Points Shape:", points.shape)

# Test für mehrere 3D-BB in der Szene mit einer Kollision
# first_sample = dataset[2][0]
# gt_boxes = first_sample['gt_boxes'] = np.array([
#     [25.299036, 0.70895809, -0.68842334, 3.2, 1.66, 1.61, 0.019203663, 1.0],  # Box 1
#     [25.5, 0.8, -0.68842334, 3.2, 1.66, 1.61, 0.0, 1.0],                      # Box 2 (überlappt mit Box 1)
#     [34.37772, 12.651429, -0.60237646, 1.95, 0.5, 1.72, -3.1107965, 3.0]      # Box 3 (keine Überlappung)
# ], dtype=np.float32)

# points = first_sample['points']

# print("GT Boxes:", len(gt_boxes))
# print("GT Boxes Shape:", gt_boxes.shape)
# print("Points:", len(points))
# print("Points Shape:", points.shape)

# Test für mehrere 3D-BB in der Szene mit mehreren Kollisionen
# first_sample = dataset[2][0]
# gt_boxes = first_sample['gt_boxes'] = np.array([
#     # Box 1
#     [25.299036, 0.70895809, -0.68842334, 3.2, 1.66, 1.61, 0.019203663, 1.0],  # Box 1
#     [25.5, 0.8, -0.68842334, 3.2, 1.66, 1.61, 0.0, 1.0],                      # Box 2 (überlappt mit Box 1)
#     [34.0, 12.0, -0.6, 1.5, 0.5, 1.7, -3.0, 1.0],                             # Box 3 (überlappt mit Box 4)
#     [34.1, 12.1, -0.6, 1.5, 0.5, 1.7, -3.1, 1.0],                             # Box 4 (überlappt mit Box 3)
#     [40.0, 15.0, -0.5, 2.0, 0.5, 1.5, 0.0, 1.0]                               # Box 5 (keine Überlappung)
# ], dtype=np.float32)

# points = first_sample['points']

# print("GT Boxes:", len(gt_boxes))
# print("GT Boxes Shape:", gt_boxes.shape)
# print("Points:", len(points))
# print("Points Shape:", points.shape)

## Test for augmentation functions

In [None]:
import numpy as np
import torch
from functools import partial
import itertools

from pcdet.ops.iou3d_nms.iou3d_nms_utils import *
from pcdet.datasets.augmentor.augmentor_utils import *
from pcdet.datasets.augmentor import augmentor_utils
from pcdet.utils.box_utils import *


# Testimplementierung der Klasse mit random_local_rotation_v2
class DataAugmentor:
    def random_local_rotation_v2(self, data_dict=None, config=None):
        """
        Please check the correctness of it before using. Modified version of random_local_rotation.
        """
        if data_dict is None:
            print('DataAugmentor: random_local_rotation() called with no data_dict')
            return partial(self.random_local_rotation_v2, config=config)
        rot_range = config['LOCAL_ROT_ANGLE']
        if not isinstance(rot_range, list):
            rot_range = [-rot_range, rot_range]

        num_gt_boxes = len(data_dict['gt_boxes'])
        print('DataAugmentor: no. gt_boxes in sample: ', num_gt_boxes)
        collision_count = 0
       
        if num_gt_boxes < 2: # only one bbox in gt_boxes
            print(f'DataAugmentor: {num_gt_boxes} 3D-BB in dict entry. Skip boxes_iou3d_gpu(), applying random local rotation')
            gt_boxes, points, noise_rot, enable = augmentor_utils.local_rotation(
                data_dict['gt_boxes'], data_dict['points'], rot_range=rot_range)
            print(f'DataAugmentor: random local rotation with range: {rot_range}, enabled: {enable}')
            
            data_dict['gt_boxes'] = gt_boxes
            data_dict['points'] = points
            data_dict['noise_loc_rot'] = noise_rot
            print('DataAugmentor: random local rotation completed')
            return data_dict

        else: # several bboxes in gt_boxess
            print(f'DataAugmentor: {num_gt_boxes} 3D-BB in dict entry. Check all IoU with boxes_iou3d_gpu()')
            
            gt_boxes = data_dict['gt_boxes']
            gt_indices = list(range(gt_boxes.shape[0]))
            iou_matrix = torch.zeros((len(gt_indices), len(gt_indices)), dtype=torch.float32).cuda()
            
            if gt_boxes.shape[1] == 8:
                # (N, 8) => (N, 7) [x, y, z, dx, dy, dz, heading]
                gt_boxes = data_dict['gt_boxes'][:, :-1]
            else:
                # IoU for all box pairs (excluding itself)
                for box_a_idx, box_b_idx in itertools.combinations(gt_indices, 2):
                    boxes_a = torch.tensor(gt_boxes[box_a_idx:box_a_idx + 1], dtype=torch.float32).cuda()
                    boxes_b = torch.tensor(gt_boxes[box_b_idx:box_b_idx + 1], dtype=torch.float32).cuda()
                    iou3d = boxes_iou3d_gpu(boxes_a, boxes_b)
                    iou_matrix[box_a_idx, box_b_idx] = iou3d # save results (idx, idx)
            
            overlap_matrix = (iou_matrix > 0).fill_diagonal_(False) # True or False in no._boxes x no._boxes matrix
            overlapping_indices = torch.nonzero(overlap_matrix) # box pairs (i, j)
           
            if overlapping_indices.numel() > 0:  # collision between 3D-BB in scene
                collision_count += overlapping_indices.size(0)
                print(f"DataAugmentor: detected {collision_count} between boxes. Retrying rotation on this pair(s)...")
                overlapping_boxes_set = set()

                for idx in overlapping_indices:
                    box_a_idx, box_b_idx = idx[0].item(), idx[1].item()
                    overlapping_boxes_set.add(box_a_idx)
                    overlapping_boxes_set.add(box_b_idx)
                
                overlapping_boxes = np.array([gt_boxes[i] for i in overlapping_boxes_set], dtype=np.float32)
                non_overlapping_indices = [i for i in gt_indices if i not in overlapping_boxes_set]
                non_overlapping_boxes = np.array([gt_boxes[i] for i in non_overlapping_indices], dtype=np.float32)
                points = data_dict['points']
                points_in_overlap = np.empty((0, points.shape[1]))
                
                # extract point cloud in 3D-BB with IoU
                for gt_box in overlapping_boxes:
                    box_points, _ = get_points_in_box(points, gt_box)
                    points_in_overlap = np.vstack((points_in_overlap, box_points))
                
                # try second rotation
                gt_boxes, points, noise_rot, enable = augmentor_utils.local_rotation(
                    overlapping_boxes, points_in_overlap, rot_range=rot_range)
                print(f'DataAugmentor: local rotation applied to overlapping boxes_idx: {box_a_idx} and {box_b_idx}')
                print(f'DataAugmentor: 2. random local rotation with range: {rot_range}, enabled: {enable}')
                
                # prepare type and check again IoU
                gt_boxes = torch.tensor(gt_boxes, dtype=torch.float32).cuda() # required for boxes_iou3d_gpu
                iou_matrix_after_rotation = torch.zeros((gt_boxes.shape[0], gt_boxes.shape[0]), dtype=torch.float32).cuda()
                
                for box_a_idx, box_b_idx in itertools.combinations(range(gt_boxes.shape[0]), 2):
                    boxes_a = gt_boxes[box_a_idx:box_a_idx + 1]
                    boxes_b = gt_boxes[box_b_idx:box_b_idx + 1]
                    iou3d_after_rotation = boxes_iou3d_gpu(boxes_a, boxes_b)
                    iou_matrix_after_rotation[box_a_idx, box_b_idx] = iou3d_after_rotation
                overlap_exists = (iou3d_after_rotation.item() > 0)

                if overlap_exists:
                    print("DataAugmentor: overlap still exists after second rotation")
                    gt_boxes = gt_boxes[::2].cpu().numpy()
                    print(f"DataAugmentor: removed every second bbox of each overlapping pair to resolve overlap")
                                    
                    all_boxes = np.vstack((gt_boxes, non_overlapping_boxes)).astype(np.float32)
                
                    data_dict['gt_boxes'] = all_boxes
                    data_dict['points'] = points
                    data_dict['noise_loc_rot'] = noise_rot

                    print('DataAugmentor: local rotation on overlapping boxes completed')
                    print(f'DataAugmentor: Total collisions detected: {collision_count}')
                    return data_dict
                
                # no overlap exists
                data_dict['gt_boxes'] = gt_boxes
                data_dict['points'] = points
                data_dict['noise_loc_rot'] = noise_rot

                print('DataAugmentor: local rotation on overlapping boxes completed')
                return data_dict
            
            else:
                print(f'DataAugmentor: no overlaps found between the {num_gt_boxes}-BBs. Applying random local rotation')
                gt_boxes, points, noise_rot, enable = augmentor_utils.local_rotation(
                    data_dict['gt_boxes'], data_dict['points'], rot_range=rot_range)
                print(f'DataAugmentor: random local rotation with range: {rot_range}, enabled: {enable}')
                
                data_dict['gt_boxes'] = gt_boxes
                data_dict['points'] = points
                data_dict['noise_loc_rot'] = noise_rot

                print('DataAugmentor: random local rotation completed')
                return data_dict


# Konfiguration für den Test
config = {
    'LOCAL_ROT_ANGLE': [-0.15707963267, 0.15707963267]
}

# Test ausführen
data_augmentor = DataAugmentor()
data_dict = {
    'gt_boxes': gt_boxes,
    'points': points
}
print("GT Boxes:", len(data_dict['gt_boxes']))
print("Points:", len(data_dict['points']))
result = data_augmentor.random_local_rotation_v2(data_dict=data_dict, config=config)

#print("Resulting Data Dictionary:")
#print(result)
