# Point Cloud File Format

## PCD

Using lib pypcd4, see [pypcd4/pypcd4.py](https://github.com/MapIV/pypcd4/blob/main/src/pypcd4/pypcd4.py).

## 3rd Part Dependencies

```shell
pip install numpy
pip install pypcd4
```

In [2]:
# Import libraries
import base64
import numpy as np
import pypcd4
import os
import urllib

In [7]:
# Definitions

## CnosDB API Constants
api_sql = 'http://127.0.0.1:8902/api/v1/sql'
api_write_point_cloud = 'http://127.0.0.1:8902/api/v1/point_cloud/write'
api_dump_point_cloud = 'http://127.0.0.1:8902/api/v1/point_cloud/dump'
api_delete_point_cloud = 'http://127.0.0.1:8902/api/v1/point_cloud/delete'
http_headers = {
    'Authorization': 'Basic ' + base64.b64encode(bytes('root:', 'utf-8')).decode('utf-8')
}
database_name_point_cloud = 'pcd'

## Test Constants

base_dir = './pcd_files'
test_source_dir = base_dir + '/test_source'
test_target_dir = base_dir + '/test_target'

test_file_num =10
test_point_num_each_file = 100
test_point_type = 'xyz'
# test_point_type = 'xyzi' # x y z intensity
# test_point_type = 'xyzl' # x y z label
# test_point_type = 'xyzrgb' # x y z rgb
# test_point_type = 'xyzrgbl' # x y z rgb label
# test_point_type = 'xyzil' # x y z rgb intensity label
# test_point_type = 'xyzirgb' # x y z rgb intensity rgb
# test_point_type = 'xyzirgbl' # x y z intensity rgb label
# test_point_type = 'xyzt' # x y z sec nsec
# test_point_type = 'xyzir' # x y z intensity ring
# test_point_type = 'xyzirt' # x y z intensity ring time
# test_point_type = 'xyzit' # x y z intensity timestamp
# test_point_type = 'xyzis' # x y z intensity stamp
# test_point_type = 'xyzisc' # x y z intensity stamp classification
# test_point_type = 'xyzrgbs' # x y z rgb stamp
# test_point_type = 'xyzirgbs' # x y z intensity rgb stamp
# test_point_type = 'xyzirgbsc' # x y z intensity rgb stamp classification
# test_point_type = 'xyziradt' # x y z intensity ring azimuth distance return_type tine_stamp
# test_point_type = 'ouster' # x y z intensity t reflectivity ring ambient range
test_file_encoding = pypcd4.Encoding.ASCII # ascii
# test_file_encoding = Encoding.BINARY # binary
# test_file_encoding = Encoding.BINARY_COMPRESSED # binary_compressed
# test_file_encoding = Encoding.BINARYSCOMPRESSED # binaryscompressed

## Variables & Functions

sql_create_point_cloud_database = 'CREATE DATABASE IF NOT EXISTS %s WITH point_cloud true;' % database_name_point_cloud

def mkdir_ignore_error(path):
    try:
        os.mkdir(path)
    except:
        pass

mkdir_ignore_error(base_dir)
print("Base directory created:", base_dir)
mkdir_ignore_error(test_source_dir)
print("Source directory created:", test_source_dir)
mkdir_ignore_error(test_target_dir)
print("Target directory created:", test_target_dir)

test_file_ids_and_names = [] # [(point_cloud_id, file_name)]

def request_api_v1_sql(sql):
    url = api_sql + '?db=public'
    req = urllib.request.Request(url, method='POST', headers=http_headers, data=sql.encode('utf-8'))
    resp = urllib.request.urlopen(req)
    if resp.status != 200:
        raise Exception('Failed to execute SQL: %s' % sql)
    return resp.read().decode('utf-8')

request_api_v1_sql(sql_create_point_cloud_database)
print("Database created:", database_name_point_cloud)

def request_api_v1_point_cloud_write(id, in_path):
    url = '%s?db=%s&id=%s&fmt=%s' % (api_write_point_cloud, database_name_point_cloud, id, 'pcd')
    print("Uploading point cloud file:", url)
    file_bytes = open(in_path, 'rb').read()
    req = urllib.request.Request(url, method='POST', headers=http_headers, data=file_bytes)
    resp = urllib.request.urlopen(req)
    if resp.status != 200:
        raise Exception('Failed to write point cloud file: id=%s, in_path=%s' % (id, in_path))
    return resp.read().decode('utf-8')

def request_api_v1_point_cloud_dump(id, out_path):
    url = '%s?db=%s&id=%s' % (api_dump_point_cloud, database_name_point_cloud, id)
    print("Downloading point cloud file:", url)
    req = urllib.request.Request(url, method='GET', headers=http_headers)
    resp = urllib.request.urlopen(req)
    if resp.status != 200:
        raise Exception('Failed to dump point cloud file: id=%s, out_path=%s' % (id, out_path))
    file_bytes = resp.read()
    open(out_path, 'wb').write(file_bytes)

def request_api_v1_point_cloud_delete(id):
    url = '%s?db=%s&id=%s' % (api_delete_point_cloud, database_name_point_cloud, id)
    print("Deleting point cloud file:", url)
    req = urllib.request.Request(url, method='POST', headers=http_headers)
    resp = urllib.request.urlopen(req)
    if resp.status != 200:
        raise Exception('Failed to delete point cloud file: id=%s' % id)
    file_bytes = resp.read()
    return resp.read().decode('utf-8')

Base directory created: ./pcd_files
Source directory created: ./pcd_files/test_source
Target directory created: ./pcd_files/test_target
Database created: pcd


In [None]:
# Generate pcd files
for i in range(test_file_num):
    # Create a point cloud with random points
    if test_point_type == 'xyz':
        pc = pypcd4.PointCloud.from_xyz_points(np.random.rand(test_point_num_each_file, 3))
    elif test_point_type == 'xyzi':
        pc = pypcd4.PointCloud.from_xyzi_points(np.random.rand(test_point_num_each_file, 4))
    elif test_point_type == 'xyzl':
        pc = pypcd4.PointCloud.from_xyzl_points(np.random.rand(test_point_num_each_file, 4))
    elif test_point_type == 'xyzrgb':
        pc = pypcd4.PointCloud.from_xyzrgb_points(np.random.rand(test_point_num_each_file, 4))
    else:
        raise Exception('Unsupported point type: %s' % test_point_type)

    # Save the point cloud to a file
    pcd_file_id = 'pcd_%d' % i
    pcd_file_name = 'case_%d.pcd' % i
    test_file_ids_and_names.append((pcd_file_id, pcd_file_name))
    test_source_file_path = '%s/%s' % (test_source_dir, pcd_file_name)
    pc.save(test_source_file_path, encoding=test_file_encoding)
    print('Generated source file:', test_source_file_path)

In [8]:
# Write to CnosDB
for (pcd_file_id, pcd_file_name) in test_file_ids_and_names:
    test_source_file_path = '%s/%s' % (test_source_dir, pcd_file_name)
    request_api_v1_point_cloud_write(pcd_file_id, test_source_file_path)
    print('Uploaded source file:', test_source_file_path)

In [None]:
# Dump from CnosDB
for (pcd_file_id, pcd_file_name) in test_file_ids_and_names:
    test_target_file_path = '%s/%s' % (test_target_dir, pcd_file_name)
    request_api_v1_point_cloud_dump(pcd_file_id, test_target_file_path)
    print('Downloaded target file:', test_target_file_path)

In [None]:
# Compare the original and dumped files
for (pcd_file_id, pcd_file_name) in test_file_ids_and_names:
    test_source_file_path = '%s/%s' % (test_source_dir, pcd_file_name)
    test_target_file_path = '%s/%s' % (test_target_dir, pcd_file_name)
    print('Comparing source and target files:', test_source_file_path, test_target_file_path)
    source_file = pypcd4.PointCloud.from_path(test_source_file_path)
    target_file = pypcd4.PointCloud.from_path(test_target_file_path)
    print('Meta equals? ', source_file.metadata == target_file.metadata)
    print('Data equals? ', np.array_equal(source_file.pc_data, target_file.pc_data))

In [None]:
# Clear CnosDB
for (pcd_file_id, pcd_file_name) in test_file_ids_and_names:
    request_api_v1_point_cloud_delete(pcd_file_id)
    print('Deleted file:', pcd_file_name)