Skip to content

Commit

Permalink
Merge pull request #42 from eEcoLiDAR/branch_plywriter_30
Browse files Browse the repository at this point in the history
Branch plywriter 30
  • Loading branch information
cwmeijer committed Jan 23, 2018
2 parents ac74524 + 5dd0c49 commit c994319
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 22 deletions.
2 changes: 2 additions & 0 deletions laserchicken/keys.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
point = 'vertex'
point_cloud = 'pointcloud'
provenance = 'log'
6 changes: 1 addition & 5 deletions laserchicken/test_read_ply.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@ class TestReadPly(unittest.TestCase):
test_file_path = os.path.join(_test_dir, _test_file_name)

def test_nonexistentFile_error(self):
# Catching very general Exception instead of something more specific because of following reason:
# In 3.x a FileNotFoundException is raised while in 2.7 an IOError is raised. The most specific superclass
# is the very general Exception class. See:
# https://docs.python.org/2/library/exceptions.html#exception-hierarchy
# https://docs.python.org/3/library/exceptions.html#exception-hierarchy
# Catch most specific subclass of FileNotFoundException (3.6) and IOError (2.7).
with raises(Exception):
read('nonexistentfile.ply')

Expand Down
83 changes: 78 additions & 5 deletions laserchicken/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,80 @@
from laserchicken.keys import point
from laserchicken.keys import *
import numpy as np
import datetime as dt


def generate_test_point_cloud():
pc = {point: {'x': {'type': 'double', 'data': [1, 2, 3]}, 'y': {'type': 'double', 'data': [2, 3, 4]},
'z': {'type': 'double', 'data': [3, 4, 5]}}}
return pc
class SimpleTestData(object):
""" Test data within this class should all be in sync (reflect the same data)."""
@staticmethod
def get_point_cloud():
# This simple_test_point cloud and the simple_test_header should be in sync. Some tests depend on it.
pc = {point: {'x': {'type': 'float', 'data': np.array([1, 2, 3])},
'y': {'type': 'float', 'data': np.array([20, 30, 40])},
'z': {'type': 'float', 'data': np.array([300, 400, 500])}}}
return pc

@staticmethod
def get_header():
# This simple_test_header cloud and the simple_test_point should be in sync. Some tests depend on it.
header = """ply
format ascii 1.0
element vertex 3
property float x
property float y
property float z
"""
return header

@staticmethod
def get_data():
data = """1 20 300
2 30 400
3 40 500
"""
return data


class ComplexTestData(object):
""" Test data within this class should all be in sync (reflect the same data)."""
@staticmethod
def get_point_cloud():
# This complex_test_point cloud and the complex_test_header should be in sync. Some tests depend on it.
pc = {point: {'x': {'type': 'float', 'data': np.array([1, 2, 3, 4, 5])},
'y': {'type': 'float', 'data': np.array([2, 3, 4, 5, 6])},
'z': {'type': 'float', 'data': np.array([3, 4, 5, 6, 7])},
'return': {'type': 'int', 'data': np.array([1, 1, 2, 2, 1])}
},
point_cloud: {'offset': {'type': 'double', 'data': 12.1}},
provenance: [{'time': (dt.datetime(2018, 1, 18, 16, 1, 0)), 'module': 'filter'}]
}
return pc

@staticmethod
def get_header():
# This complex_test_header cloud and the complex_test_point should be in sync. Some tests depend on it.
comment = {"time": dt.datetime(2018, 1, 18, 16, 1, 0), "module": "filter"}
header = """ply
format ascii 1.0
comment [
comment %s
comment ]
element vertex 5
property float x
property float y
property float z
property int return
element pointcloud 1
property double offset
""" % str(comment)
return header

@staticmethod
def get_data():
data = """1 2 3 1
2 3 4 1
3 4 5 2
4 5 6 2
5 6 7 1
12.1
"""
return data
74 changes: 63 additions & 11 deletions laserchicken/test_write_ply.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,31 @@
import shutil
import unittest

import numpy as np
import pytest

from laserchicken.test_utils import SimpleTestData, ComplexTestData
from laserchicken.write_ply import write

from laserchicken.test_utils import generate_test_point_cloud

def read_header(ply):
header = ''
line = ply.readline()
while line.strip() != 'end_header':
header = header + line
line = ply.readline()
return header


def read_data(ply):
data = ''
in_header = True
for line in ply:
if (line == 'end_header\n'):
in_header = False
else:
if not in_header:
data = data + line
return data


class TestWritePly(unittest.TestCase):
Expand All @@ -15,23 +35,55 @@ class TestWritePly(unittest.TestCase):
_test_data_source = 'testdata'
test_file_path = os.path.join(_test_dir, _test_file_name)

@unittest.skip('Production code for writing not yet implemented.')
def test_write_nonExistingFile(self):
""" Should create a file. """
pc = generate_test_point_cloud()
pc = SimpleTestData.get_point_cloud()
write(pc, self.test_file_path)
assert (os.path.exists(self.test_file_path))

def test_write_sameFileTwice(self):
""" Should not throw an exception. """
pc = generate_test_point_cloud()
write(pc, self.test_file_path)
""" Should throw an exception. """
pc = SimpleTestData.get_point_cloud()
write(pc, self.test_file_path)
# Catch most specific subclass of FileExistsError (3.6) and IOError (2.7).
with pytest.raises(Exception):
write(pc, self.test_file_path)

def test_write_loadTheSameSimpleHeader(self):
""" Writing a simple point cloud and loading it afterwards should result in the same point cloud."""
pc_in = SimpleTestData.get_point_cloud()
header_in = SimpleTestData.get_header()
write(pc_in, self.test_file_path)
with open(self.test_file_path, 'r') as ply:
header_out = read_header(ply)
self.assertMultiLineEqual(header_in, header_out)

def test_write_loadTheSameComplexHeader(self):
""" Writing a complex point cloud and loading it afterwards should result in the same point cloud."""
pc_in = ComplexTestData.get_point_cloud()
header_in = ComplexTestData.get_header()
write(pc_in, self.test_file_path)
with open(self.test_file_path, 'r') as ply:
header_out = read_header(ply)
self.assertMultiLineEqual(header_in, header_out)

def test_write_loadTheSameSimpleData(self):
""" Writing point cloud data and loading it afterwards should result in the same point cloud data. """
pc_in = SimpleTestData.get_point_cloud()
write(pc_in, self.test_file_path)
data_in = SimpleTestData.get_data()
with open(self.test_file_path, 'r') as ply:
data_out = read_data(ply)
self.assertEqual(data_in, data_out)

@unittest.skip('Production code for writing not yet implemented.')
def test_write_loadTheSame(self):
""" Writing a point cloud and loading it afterwards should result in the same point cloud. """
pass
def test_write_loadTheSameComplexData(self):
""" Writing point cloud data and loading it afterwards should result in the same point cloud data. """
pc_in = ComplexTestData.get_point_cloud()
write(pc_in, self.test_file_path)
data_in = ComplexTestData.get_data()
with open(self.test_file_path, 'r') as ply:
data_out = read_data(ply)
self.assertEqual(data_in, data_out)

def setUp(self):
os.mkdir(self._test_dir)
Expand Down
80 changes: 79 additions & 1 deletion laserchicken/write_ply.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,82 @@
import os
import numpy as np

from laserchicken import keys

# Writes the pointcloud data structure to a ply-file
def write(pc, path):
pass
if os.path.exists(path):
# Raise most specific subclass of FileExistsError (3.6) and IOError (2.7).
raise Exception('Cannot write because path {} already exists.'.format(path))
with open(path, 'w') as ply:
write_header(pc,ply)
write_data(pc,ply)

# Writes the ply-header
def write_header(pc,ply):
ply.write("ply" + '\n')
ply.write("format ascii 1.0" + '\n')
write_comment(pc,ply)
for elem_name in get_ordered_elems(pc.keys()):
get_num_elems = (lambda d: len(d["x"].get("data",[]))) if elem_name == keys.point else None
write_elements(pc,ply,elem_name,get_num_elems = get_num_elems)
ply.write("end_header" + '\n')

# Writes the ply-data
def write_data(pc,ply):
delim = ' '
for elem_name in get_ordered_elems(pc.keys()):
props = get_ordered_props(elem_name,pc[elem_name].keys())
num_elems = len(pc[elem_name]["x"].get("data",[])) if elem_name == keys.point else 1
for i in range(num_elems):
for prop in props:
datavalues = pc[elem_name][prop]["data"]
if(isinstance(datavalues,np.ndarray)):
if(prop == props[-1]):
ply.write(formatply(datavalues[i]))
else:
ply.write(formatply(datavalues[i]) + delim)
else:
if(i != 0):
raise Exception("Scalar quantity does not have element at index %d" % i)
ply.write(formatply(datavalues))
ply.write('\n')

# Formatting helper function
def formatply(obj):
return str(obj)

# Defines the element ordering
def get_ordered_elems(elem_names):
if(keys.point in elem_names):
return [keys.point] + sorted([e for e in elem_names if e not in [keys.point,keys.provenance]])
else:
return sorted([e for e in elem_names if e not in [keys.point,keys.provenance]])

# Defines the property ordering for a given element
def get_ordered_props(elem_name,prop_list):
if(elem_name == keys.point):
return ['x','y','z'] + [k for k in sorted(prop_list) if k not in ['x','y','z']]
else:
return sorted(prop_list)

# Writes the comment
# TODO: Use json for this
def write_comment(pc,ply):
log = pc.get(keys.provenance,[])
if(any(log)):
ply.write("comment [" + '\n')
for msg in log:
ply.write("comment " + str(msg) + '\n')
ply.write("comment ]" + '\n')

# Writes elements for the header
def write_elements(pc,ply,elem_name,get_num_elems = None):
if(elem_name in pc):
num_elems = get_num_elems(pc[elem_name]) if get_num_elems else 1
ply.write("element %s %d\n" % (elem_name,num_elems))
keylist = get_ordered_props(elem_name,pc[elem_name].keys())
for key in keylist:
property_type = pc[elem_name][key]["type"]
property_tuple = ("property",property_type,key)
ply.write(" ".join(property_tuple) + '\n')

0 comments on commit c994319

Please sign in to comment.