In [50]:
import json
import numpy as np
import quaternion
import math
import os
import cv2
import open3d as o3d
import natsort 
import shutil
from PIL import Image


output_path='/home/server-003/dl/mmdetection3d/data/superb/'

asset_path ='/home/server-003/workspace/data/superb_mobis/jupyter_apps/asset/'
label_path ='/home/server-003/workspace/data/superb_mobis/jupyter_apps/label/'

mode = "lidar"  #lidar,cam,both

num_files =0
train_ratio = 0.7
test_ratio=0.1
val_ratio=0.2
data_limit=1
allow_jpg=False

img = cv2.imread(asset_path+'0a10c009-dc8b-4479-80f4-49db9e60b473/image_00000001_00000001.jpg')
img_height,img_width = img.shape[:2]

class_name_arr=np.array([])

In [51]:
# !python -m pip install --upgrade --force-reinstall numpy-quaternion
# !conda install -c conda-forge quaternion
#https://quaternion.readthedocs.io/en/latest/
#!pip install natsort

In [52]:
'''
convert calibration info to kitti form
'''
def convert_calib_suite2kitti(calib_path,output_path,calib_idx):
    train_output_path = output_path+'training/calib/'
    
    try:
        if not os.path.exists(train_output_path):
            os.makedirs(train_output_path)
    except OSError:
        print("Error: Failed to create the directory.")
    
    try:
        file = open(calib_path)
    except:
        print("Error: Failed to open the file")
        
    jsonString = json.load(file)
    jsonArray = jsonString.get("manifest").get('frames')
    idx=0
    for list in jsonArray:
        imageArray = list.get("images")
        for camArray in imageArray:
            positionArray = camArray.get('position')
            pose_x = positionArray.get('x')
            pose_y = positionArray.get('y')
            pose_z = positionArray.get('z')
            
            quaternionArray = camArray.get('heading')
            qw = quaternionArray.get('qw')
            qx = quaternionArray.get('qx')
            qy = quaternionArray.get('qy')
            qz = quaternionArray.get('qz')
            q = np.quaternion(qw,qx,qy,qz)
            
            rot_matrix = quaternion.as_rotation_matrix(q)
            rot_matrix = np.linalg.inv(rot_matrix)
            translation_matrix = np.array([pose_x,pose_y,pose_z]).reshape(3,1)
#             translation_matrix = -np.matmul(rot_matrix, translation_matrix).reshape(3,1)
            kitti_style_extrinsic = np.concatenate([rot_matrix, -np.matmul(rot_matrix, translation_matrix).reshape(3, 1)], axis=1)
            cx = camArray.get('cx')
            cy = camArray.get('cy')
            skew = camArray.get('skew')
            fx = camArray.get('fx')
            fy = camArray.get('fy')
            
            projectionArray = camArray.get('new_camera_matrix')
            pro_cx=projectionArray.get('cx')
            pro_fx=projectionArray.get('fx')
            pro_skew=projectionArray.get('skew')
            pro_cy=projectionArray.get('cy')
            pro_fy=projectionArray.get('fy')
          
            f = open(train_output_path+str(calib_idx+idx).zfill(6)+".txt", 'w')
            f.write(f'P0: {fx} {skew} {cx} 0.0 0.0 {fy} {cy} 0.0 0.0 0.0 1.0 0.0\n')
            f.write(f'P1: {fx} {skew} {cx} 0.0 0.0 {fy} {cy} 0.0 0.0 0.0 1.0 0.0\n')
            f.write(f'P2: {fx} {skew} {cx} 0.0 0.0 {fy} {cy} 0.0 0.0 0.0 1.0 0.0\n')
            f.write(f'P3: {pro_fx} {pro_skew} {pro_cx} 0.0 0.0 {pro_fy} {pro_cy} 0.0 0.0 0.0 1.0 0.0\n')
            f.write(f'R0_rect: 1 0 0 0 1 0 0 0 1\n')
            f.write(f'Tr_velo_to_cam: {kitti_style_extrinsic[0,0]} {kitti_style_extrinsic[0,1]} {kitti_style_extrinsic[0,2]} {kitti_style_extrinsic[0,3]} {kitti_style_extrinsic[1,0]} {kitti_style_extrinsic[1,1]} {kitti_style_extrinsic[1,2]} {kitti_style_extrinsic[1,3]} {kitti_style_extrinsic[2,0]} {kitti_style_extrinsic[2,1]} {kitti_style_extrinsic[2,2]} {kitti_style_extrinsic[2,3]}\n')
            f.write(f'Tr_imu_to_velo: 1 0 0 0 0 1 0 0 0 0 1 0\n')
            f.close()
            idx+=1

    calib_idx+=idx    
    return int(idx)

In [53]:
'''
convert label info to kitti form

form : {class_name} {truncated} {occluded} {alpha} {bbox[0]} {bbox[1]} {bbox[2]} {bbox[3]} {height} {width} {length} {x} {y} {z} {y-axis}

'''
#3965, class 총 종류
def get_calib_param(filepath):
    
    with open(filepath, "r") as f:
        file = f.readlines()

        for line in file:
            (key, val) = line.split(':', 1)
            if key == 'Tr_velo_to_cam':
                RT = np.fromstring(val, sep=' ')
                RT = RT.reshape(3, 4)
        
    return RT


def convert_label_suite2kitti(label_path,label_output, label_idx):
    label_output = label_output+"training/label_2/"
    global class_name_arr
    
    try:
        if not os.path.exists(label_output):
            os.makedirs(label_output)
    except OSError:
        print("Error: Failed to create the directory.")

    try:
        file = open(label_path)
    except:
        print("Error: Failed to open the file")


    jsonString = json.load(file)   
    frame_num = jsonString.get("label").get('asset').get("info").get("frame_count")
    for i in range(frame_num):
        try:
            f = open(label_output+str(label_idx+i).zfill(6)+".txt", 'w')
            f.close()
        except:
            print("Fail to create file")

    try:
        jsonArray = jsonString.get("label_info").get('result').get("objects")
    except:
        print("can't find objects")
        return frame_num
    
    calib_path = output_path+'training/calib/'
    for iter_num, list in enumerate(jsonArray):
        frames = list.get("frames")
        class_name = list.get('class_name')
        if np.any(class_name_arr == str(class_name)) == False:
            class_name_arr= np.append(class_name_arr,[class_name])
        if class_name == 'Heavy Equipment':
            class_name = 'HeavyEquipment'  # TODO : generalization
        for frame in frames:
            idx = frame.get('num')
            
            ann = frame.get('annotation').get('coord')
            height = np.round_(ann.get('size').get('z'),5)
            width = np.round_(ann.get('size').get('y'),5)
            length = np.round_(ann.get('size').get('x'),5)
            # TODO : check coordinate , lidar or cam? -> cam
            
            calib_idx = label_idx+idx
            lidar2cam_RT=get_calib_param(calib_path+str(calib_idx).zfill(6)+".txt")
            ht_dummy = np.array([[0,0,0,1]])
            lidar2cam_RT= np.concatenate([lidar2cam_RT,ht_dummy],axis=0)
            
            lidar_coor_pose=[]
            lidar_coor_pose.append(np.round_(ann.get('position').get('x'),5))
            lidar_coor_pose.append(np.round_(ann.get('position').get('y'),5))
            lidar_coor_pose.append(np.round_(ann.get('position').get('z'),5)-height/2) #height/2 : gravity center
            lidar_coor_pose.append(1)
            lidar_coor_pose = np.array(lidar_coor_pose)
            cam_coor_pose = np.matmul(lidar2cam_RT,lidar_coor_pose)


            q = np.quaternion(ann.get('rotation_quaternion').get('w'),ann.get('rotation_quaternion').get('x'),ann.get('rotation_quaternion').get('y'),ann.get('rotation_quaternion').get('z'))
            lidar_coor_RPY = np.round_(quaternion.as_euler_angles(q),5)

            #TODO check in suite label file
            truncated =0.0
            occluded = 0

            if mode == "lidar":
                alpha = 0
                bbox = [0,0,img_width,img_height]

            f = open(label_output+str(label_idx+idx).zfill(6)+".txt", 'a')
            f.write(f'{class_name} {truncated} {occluded} {alpha} {bbox[0]} {bbox[1]} {bbox[2]} {bbox[3]} {height} {width} {length} {cam_coor_pose[0]} {cam_coor_pose[1]} {cam_coor_pose[2]} {lidar_coor_RPY[1]}\n')
            f.close()
    return frame_num

img_height,img_width 
                           

(1280, 2048)

In [54]:
'''
move pcd file and img file to each folder and rename as order
'''
def copy_pcd_img(mode,path,output_path,idx):
    try:
        if not os.path.exists(output_path+'training/image_2/'):
            os.makedirs(output_path+'training/image_2/')
    except OSError:
        print("Error: Failed to create the directory.")
  
        
    try:
        if not os.path.exists(output_path+'training/velodyne/'):
            os.makedirs(output_path+'training/velodyne/')
    except OSError:
        print("Error: Failed to create the directory.")
   
    if mode == "jpg":
        output_img_path = output_path+'training/image_2/'
        if allow_jpg ==False:
            im = Image.open(path).convert('RGB')
            im.save(output_img_path+str(idx).zfill(6)+".png", 'png')
        else:
            shutil.copy2(path, output_img_path+str(idx).zfill(6)+".jpg")
    elif mode == "png":
        output_img_path = output_path+'training/image_2/'           
        shutil.copy2(path, output_img_path+str(idx).zfill(6)+".png")
    elif mode == "pcd":
        output_img_path = output_path+'training/velodyne/'            
        shutil.copy2(path, output_img_path+str(idx).zfill(6)+".pcd")
    elif mode == "bin":
        output_img_path = output_path+'training/velodyne/'            
        shutil.copy2(path, output_img_path+str(idx).zfill(6)+".bin")



In [55]:
###calib, file movement exec###

asset_path_list = os.listdir(asset_path)
asset_path_list = natsort.natsorted(asset_path_list)

img_idx=0
pcd_idx=0

calib_num=0

for data_cnt,path in enumerate(asset_path_list):
    if data_limit is not None:
#         print(data_cnt)
        print(f"{data_cnt} : {path}")
        if data_cnt == data_limit:
            break
    files = os.listdir(asset_path+path)
    files = natsort.natsorted(files)
    json_file_path=None
    for file in files:        
        if '.json' in file:
            json_file_path = os.path.join(asset_path+path, file)
            calib_num+=convert_calib_suite2kitti(json_file_path,output_path,calib_num) 
            
    for file in files:        
        if '.png' in file:
            image_file_path = os.path.join(asset_path+path, file)
            copy_pcd_img('png',image_file_path,output_path,img_idx)
            img_idx+=1
            
        if '.jpg' in file:
            image_file_path = os.path.join(asset_path+path, file)
            copy_pcd_img('jpg',image_file_path,output_path,img_idx)
            img_idx+=1
            
        if '.bin' in file:
            #TODO : preprocession
            pcd_file_path = os.path.join(asset_path+path, file)
            copy_pcd_img('bin',pcd_file_path,output_path,pcd_idx)
            pcd_idx+=1
            
        if '.pcd' in file:
            pcd_file_path = os.path.join(asset_path+path, file)
            copy_pcd_img('pcd',pcd_file_path,output_path,pcd_idx)
            pcd_idx+=1
            

if img_idx != pcd_idx:
    print("The number of pcd file and img file is not equal!!")

print(f'img num : {img_idx}')
print(f'pcd num : {pcd_idx}')
print(f'calib num : {calib_num}')

if img_idx == pcd_idx == calib_num:
    print("The number of files is Equal!")
    num_files=img_idx
else:
    print("The number of files is Not Equal!")




0 : 0a10c009-dc8b-4479-80f4-49db9e60b473
1 : 0aafd40e-85d8-496b-bfb8-03afbe333caf
img num : 24
pcd num : 24
calib num : 24
The number of files is Equal!


In [56]:
#####label exec

label_path_list = os.listdir(label_path)
label_path_list = natsort.natsorted(label_path_list)

label_idx=0

for label_data_cnt,path in enumerate(label_path_list):
    if label_data_cnt is not None:
        if label_data_cnt == data_limit:
            break
    files = os.listdir(label_path+path)
    for file in files:
        if '.json' in file:
            json_file_path = os.path.join(label_path+path, file)
            file = open(json_file_path)
            jsonString = json.load(file)
            label_idx+=convert_label_suite2kitti(json_file_path,output_path, label_idx)

print(f'num of label : {label_idx}')  
if img_idx == label_idx:
    print("The number of files is Equal!")
else:
    print("The number of files is Not Equal!")

print(class_name_arr) 

num of label : 24
The number of files is Equal!
['Car' 'Truck']




In [57]:
# num_files=3000

In [58]:
#### split train, test

try:
    if not os.path.exists(output_path+'ImageSets/'):
        os.makedirs(output_path+'ImageSets/')
    if not os.path.exists(output_path+'testing/image_2/'):
        os.makedirs(output_path+'testing/image_2/')
    if not os.path.exists(output_path+'testing/velodyne/'):
        os.makedirs(output_path+'testing/velodyne/')
    if not os.path.exists(output_path+'testing/calib/'):
        os.makedirs(output_path+'testing/calib/')
except OSError:
    print("Error: Failed to create the directory.")
    
f1 = open(output_path+'ImageSets/'+'test'+".txt", 'w')
f2 = open(output_path+'ImageSets/'+'train'+".txt", 'w')
f3 = open(output_path+'ImageSets/'+'val'+".txt", 'w')
f4 = open(output_path+'ImageSets/'+'trainval'+".txt", 'w')
for idx in range(num_files):
    if idx < num_files*(train_ratio+val_ratio):
        f4.write(f'{str(idx).zfill(6)}\n')
        if idx < num_files*train_ratio:
            f2.write(f'{str(idx).zfill(6)}\n')
        else:
            f3.write(f'{str(idx).zfill(6)}\n')

    else:
        f1.write(f'{str(idx).zfill(6)}\n')
        try:
            if os.path.isfile(output_path+'training/image_2/'+str(idx).zfill(6)+".png"):
                shutil.move(output_path+'training/image_2/'+str(idx).zfill(6)+".png", output_path+'testing/image_2/'+str(idx).zfill(6)+".png")
            elif os.path.isfile(output_path+'training/image_2/'+str(idx).zfill(6)+".jpg"):
                shutil.move(output_path+'training/image_2/'+str(idx).zfill(6)+".jpg", output_path+'testing/image_2/'+str(idx).zfill(6)+".jpg")
            if os.path.isfile(output_path+'training/velodyne/'+str(idx).zfill(6)+".bin"):
                shutil.move(output_path+'training/velodyne/'+str(idx).zfill(6)+".bin", output_path+'testing/velodyne/'+str(idx).zfill(6)+".bin")
            elif os.path.isfile(output_path+'testing/velodyne/'+str(idx).zfill(6)+".pcd"):
                shutil.move(output_path+'training/velodyne/'+str(idx).zfill(6)+".pcd", output_path+'testing/velodyne/'+str(idx).zfill(6)+".pcd")
            shutil.move(output_path+'training/calib/'+str(idx).zfill(6)+".txt", output_path+'testing/calib/'+str(idx).zfill(6)+".txt")
        except:
            print("Fail to move")
f1.close()
f2.close()
f3.close()
f4.close()