Skip to content

Commit

Permalink
TileDB: Add append option to writer (#2577)
Browse files Browse the repository at this point in the history
* TileDB: Add append option to writer

* tiledb: add check that attributes exists when appending data
  • Loading branch information
normanb authored and abellgithub committed Jun 21, 2019
1 parent 7d6b575 commit 62143d0
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 36 deletions.
3 changes: 3 additions & 0 deletions doc/stages/writers.tiledb.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ compression
compression_level
TileDB compression level for chosen compression [Optional]

append
Append to an existing TileDB array with the same schema [Optional]

stats
Dump query stats to stdout [Optional]

Expand Down
99 changes: 63 additions & 36 deletions plugins/tiledb/io/TileDBWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ void TileDBWriter::addArgs(ProgramArgs& args)
m_compressor);
args.add("compression_level", "TileDB compression level",
m_compressionLevel, -1);
args.add("append", "Append to existing TileDB array",
m_append, false);
}


Expand All @@ -190,73 +192,98 @@ void TileDBWriter::initialize()
m_ctx.reset(new tiledb::Context());

// If the array already exists on disk, throw an error
if (tiledb::Object::object(*m_ctx, m_arrayName).type() ==
tiledb::Object::Type::Array)
throwError("Array already exists.");

m_schema.reset(new tiledb::ArraySchema(*m_ctx, TILEDB_SPARSE));

if (!m_compressor.empty())
if (!m_append)
{
tiledb::Filter filter = createFilter(*m_ctx, m_compressor);
filter.set_option(TILEDB_COMPRESSION_LEVEL, m_compressionLevel);
m_filterList.reset(new tiledb::FilterList(*m_ctx));
m_filterList->add_filter(filter);
m_schema->set_coords_filter_list(*m_filterList);
if (tiledb::Object::object(*m_ctx, m_arrayName).type() ==
tiledb::Object::Type::Array)
throwError("Array already exists.");
m_schema.reset(new tiledb::ArraySchema(*m_ctx, TILEDB_SPARSE));

if (!m_compressor.empty())
{
tiledb::Filter filter = createFilter(*m_ctx, m_compressor);
filter.set_option(TILEDB_COMPRESSION_LEVEL, m_compressionLevel);
m_filterList.reset(new tiledb::FilterList(*m_ctx));
m_filterList->add_filter(filter);
m_schema->set_coords_filter_list(*m_filterList);
}
}
}


void TileDBWriter::ready(pdal::BasePointTable &table)
{
// get a list of all the dimensions & their types and add to schema
// x,y,z will be tiledb dimensions other pdal dimensions will be
// tiledb attributes
tiledb::Domain domain(*m_ctx);
auto layout = table.layout();
auto all = layout->dims();
double dimMin = std::numeric_limits<double>::lowest();
double dimMax = std::numeric_limits<double>::max();

domain.add_dimension(tiledb::Dimension::create<double>(*m_ctx, "X",
{{dimMin, dimMax}}, m_x_tile_size))
.add_dimension(tiledb::Dimension::create<double>(*m_ctx, "Y",
{{dimMin, dimMax}}, m_y_tile_size))
.add_dimension(tiledb::Dimension::create<double>(*m_ctx, "Z",
{{dimMin, dimMax}}, m_z_tile_size));

// get a list of all the dimensions & their types and add to schema
// x,y,z will be tiledb dimensions other pdal dimensions will be
// tiledb attributes
if (!m_append)
{
tiledb::Domain domain(*m_ctx);
double dimMin = std::numeric_limits<double>::lowest();
double dimMax = std::numeric_limits<double>::max();

domain.add_dimension(tiledb::Dimension::create<double>(*m_ctx, "X",
{{dimMin, dimMax}}, m_x_tile_size))
.add_dimension(tiledb::Dimension::create<double>(*m_ctx, "Y",
{{dimMin, dimMax}}, m_y_tile_size))
.add_dimension(tiledb::Dimension::create<double>(*m_ctx, "Z",
{{dimMin, dimMax}}, m_z_tile_size));

m_schema->set_domain(domain).set_order(
{{TILEDB_ROW_MAJOR, TILEDB_ROW_MAJOR}});
m_schema->set_capacity(m_tile_capacity);
}
else
{
m_array.reset(new tiledb::Array(*m_ctx, m_arrayName, TILEDB_WRITE));
}

for (const auto& d : all)
{
std::string dimName = layout->dimName(d);
if ((dimName != "X") && (dimName != "Y") && (dimName != "Z"))
{
Dimension::Type type = layout->dimType(d);
tiledb::Attribute att = createAttribute(*m_ctx, dimName, type);
if (!m_compressor.empty())
att.set_filter_list(*m_filterList);
m_schema->add_attribute(att);
if (!m_append)
{
tiledb::Attribute att = createAttribute(*m_ctx, dimName, type);
if (!m_compressor.empty())
att.set_filter_list(*m_filterList);
m_schema->add_attribute(att);
}
else
{
// check attribute exists in original tiledb array
auto attrs = m_array->schema().attributes();
auto it = attrs.find(dimName);
if (it == attrs.end())
throwError("Attribute " + dimName + " does not exist in original array.");
}

m_attrs.emplace_back(dimName, d, type);
// Size the buffers.
m_attrs.back().m_buffer.resize(
m_cache_size * Dimension::size(type));
}
}

m_schema->set_domain(domain).set_order(
{{TILEDB_ROW_MAJOR, TILEDB_ROW_MAJOR}});
m_schema->set_capacity(m_tile_capacity);
if (!m_append)
{
tiledb::Array::create(m_arrayName, *m_schema);
m_array.reset(new tiledb::Array(*m_ctx, m_arrayName, TILEDB_WRITE));
}

tiledb::Array::create(m_arrayName, *m_schema);
m_array.reset(new tiledb::Array(*m_ctx, m_arrayName, TILEDB_WRITE));
m_query.reset(new tiledb::Query(*m_ctx, *m_array));
m_query->set_layout(TILEDB_UNORDERED);
}


bool TileDBWriter::processOne(PointRef& point)
{
auto attrs = m_schema->attributes();

auto attrs = m_array->schema().attributes();
double x = point.getFieldAs<double>(Dimension::Id::X);
double y = point.getFieldAs<double>(Dimension::Id::Y);
double z = point.getFieldAs<double>(Dimension::Id::Z);
Expand Down
1 change: 1 addition & 0 deletions plugins/tiledb/io/TileDBWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class PDAL_DLL TileDBWriter : public Writer, public Streamable


bool m_stats;
bool m_append;

BOX3D m_bbox;

Expand Down
60 changes: 60 additions & 0 deletions plugins/tiledb/test/TileDBWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ namespace pdal
options.add("mode", "ramp");
options.add("count", count);
m_reader.setOptions(options);
m_reader2.setOptions(options);
}

FauxReader m_reader;
FauxReader m_reader2;

};

Expand Down Expand Up @@ -129,5 +131,63 @@ namespace pdal
ASSERT_DOUBLE_EQ(subarray[3], 1.0);
ASSERT_DOUBLE_EQ(subarray[5], 1.0);
}

TEST_F(TileDBWriterTest, write_append)
{
tiledb::Context ctx;
tiledb::VFS vfs(ctx);
std::string pth = Support::temppath("tiledb_test_append_out");

Options options;
std::string sidecar = pth + "/pdal.json";
options.add("array_name", pth);
options.add("chunk_size", 80);

if (vfs.is_dir(pth))
{
vfs.remove_dir(pth);
}

TileDBWriter writer;
writer.setOptions(options);
writer.setInput(m_reader);

FixedPointTable table(100);
writer.prepare(table);
writer.execute(table);

// check the sidecar exists so that the execute has completed
EXPECT_TRUE(pdal::Utils::fileExists(sidecar));

options.add("append", true);
TileDBWriter append_writer;
append_writer.setOptions(options);
append_writer.setInput(m_reader2);

FixedPointTable table2(100);
append_writer.prepare(table2);
append_writer.execute(table2);

tiledb::Array array(ctx, pth, TILEDB_READ);
auto domain = array.non_empty_domain<double>();
std::vector<double> subarray;

for (const auto& kv: domain)
{
subarray.push_back(kv.second.first);
subarray.push_back(kv.second.second);
}

tiledb::Query q(ctx, array, TILEDB_READ);
q.set_subarray(subarray);

auto max_el = array.max_buffer_elements(subarray);
std::vector<double> coords(max_el[TILEDB_COORDS].second);
q.set_coordinates(coords);
q.submit();
array.close();

EXPECT_EQ((m_reader.count() * 3) + (m_reader2.count() * 3), coords.size());
}
}

0 comments on commit 62143d0

Please sign in to comment.