Skip to content

Commit

Permalink
Prototype of EPT processing with parallelization of point processing …
Browse files Browse the repository at this point in the history
…rather than just for remote fetching.
  • Loading branch information
connormanning committed Apr 13, 2019
1 parent fc5e6bb commit d98e0b0
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 54 deletions.
130 changes: 81 additions & 49 deletions io/EptReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,18 +511,16 @@ PointViewSet EptReader::run(PointViewPtr view)

m_pool->add([this, &view, &key, nodeId]()
{
uint64_t startId(0);
const std::vector<PointId> added(readNode(*view, key, nodeId));

if (m_info->dataType() == EptInfo::DataType::Laszip)
startId = readLaszip(*view, key, nodeId);
else
startId = readBinary(*view, key, nodeId);
std::lock_guard<std::mutex> lock(m_mutex);
m_index.insert(m_index.end(), added.begin(), added.end());

// Read addon information after the native data, we'll possibly
// overwrite attributes.
for (const auto& addon : m_addons)
{
readAddon(*view, key, *addon, startId);
readAddon(*view, key, *addon, added);
}
});

Expand All @@ -531,14 +529,29 @@ PointViewSet EptReader::run(PointViewPtr view)

m_pool->await();

// Our view has unpopulated points which were not matched by the bounds or
// origin query. Cull them from the resulting point view.
PointViewPtr culled(view->makeNew());
for (const PointId i : m_index)
culled->appendPoint(*view, i);

log()->get(LogLevel::Debug) << "Done reading!" << std::endl;

PointViewSet views;
views.insert(view);
views.insert(culled);
return views;
}

uint64_t EptReader::readLaszip(PointView& dst, const Key& key,
std::vector<PointId> EptReader::readNode(PointView& dst, const Key& key,
const uint64_t nodeId) const
{
if (m_info->dataType() == EptInfo::DataType::Laszip)
return readLaszip(dst, key, nodeId);
else
return readBinary(dst, key, nodeId);
}

std::vector<PointId> EptReader::readLaszip(PointView& dst, const Key& key,
const uint64_t nodeId) const
{
// If the file is remote (HTTP, S3, Dropbox, etc.), getLocalHandle will
Expand All @@ -555,54 +568,69 @@ uint64_t EptReader::readLaszip(PointView& dst, const Key& key,
LasReader reader;
reader.setOptions(options);

std::lock_guard<std::mutex> lock(m_mutex);
reader.prepare(table);

const uint64_t startId(dst.size());
{
std::lock_guard<std::mutex> lock(m_mutex);
reader.prepare(table);
}

uint64_t pointId(0);
for (auto& src : reader.execute(table))
const PointViewSet views(reader.execute(table));
if (views.size() != 1)
{
PointRef pr(*src);
for (uint64_t i(0); i < src->size(); ++i)
{
pr.setPointId(i);
process(dst, pr, nodeId, pointId);
++pointId;
}
throwError("Unexpected view count: " + key.toString());
}

return startId;
auto& src(*views.begin());

PointRef pr(*src);
const uint64_t np(src->size());

return processNode(dst, nodeId, pr, np);
}

uint64_t EptReader::readBinary(PointView& dst, const Key& key,
std::vector<PointId> EptReader::readBinary(PointView& dst, const Key& key,
const uint64_t nodeId) const
{
auto data(m_ep->getBinary("ept-data/" + key.toString() + ".bin"));
ShallowPointTable table(*m_remoteLayout, data.data(), data.size());

PointRef pr(table);
const uint64_t np(table.numPoints());

std::lock_guard<std::mutex> lock(m_mutex);
return processNode(dst, nodeId, pr, np);
}

std::vector<PointId> EptReader::processNode(PointView& dst,
const uint64_t nodeId, PointRef& pr, const uint64_t np) const
{
// Reserve all the points we are about to populate from this node.
std::unique_lock<std::mutex> lock(m_mutex);
const uint64_t startId(dst.size());
for (uint64_t pointId(0); pointId < np; ++pointId)
{
dst.getOrAddPoint(startId + pointId);
}
lock.unlock();

uint64_t pointId(0);
for (uint64_t pointId(0); pointId < table.numPoints(); ++pointId)
std::vector<PointId> included;
included.reserve(np);

for (uint64_t pointId(0); pointId < np; ++pointId)
{
pr.setPointId(pointId);
process(dst, pr, nodeId, pointId);
if (process(dst, pr, nodeId, pointId, startId + pointId))
{
included.push_back(startId + pointId);
}
}

return startId;
return included;
}

void EptReader::process(PointView& dst, PointRef& pr, const uint64_t nodeId,
const uint64_t pointId) const
bool EptReader::process(PointView& dst, PointRef& pr, const uint64_t nodeId,
const uint64_t pointId, const uint64_t dstId) const
{
using D = Dimension::Id;

const point_count_t dstId(dst.size());

const double x = pr.getFieldAs<double>(D::X) *
m_xyzTransforms[0].m_scale.m_val + m_xyzTransforms[0].m_offset.m_val;
const double y = pr.getFieldAs<double>(D::Y) *
Expand All @@ -613,30 +641,34 @@ void EptReader::process(PointView& dst, PointRef& pr, const uint64_t nodeId,
const bool selected = m_queryOriginId == -1 ||
pr.getFieldAs<int64_t>(D::OriginId) == m_queryOriginId;

if (selected && m_queryBounds.contains(x, y, z))
if (!selected || !m_queryBounds.contains(x, y, z))
{
dst.setField(Dimension::Id::X, dstId, x);
dst.setField(Dimension::Id::Y, dstId, y);
dst.setField(Dimension::Id::Z, dstId, z);
return false;
}

dst.setField(Dimension::Id::X, dstId, x);
dst.setField(Dimension::Id::Y, dstId, y);
dst.setField(Dimension::Id::Z, dstId, z);

for (const DimType& dt : m_dimTypes)
for (const DimType& dt : m_dimTypes)
{
if (dt.m_id != D::X && dt.m_id != D::Y && dt.m_id != D::Z)
{
if (dt.m_id != D::X && dt.m_id != D::Y && dt.m_id != D::Z)
{
const double d = pr.getFieldAs<double>(dt.m_id) *
dt.m_xform.m_scale.m_val + dt.m_xform.m_offset.m_val;
const double d = pr.getFieldAs<double>(dt.m_id) *
dt.m_xform.m_scale.m_val + dt.m_xform.m_offset.m_val;

dst.setField(dt.m_id, dstId, d);
}
dst.setField(dt.m_id, dstId, d);
}

dst.setField(m_nodeIdDim, dstId, nodeId);
dst.setField(m_pointIdDim, dstId, pointId);
}

dst.setField(m_nodeIdDim, dstId, nodeId);
dst.setField(m_pointIdDim, dstId, pointId);

return true;
}

void EptReader::readAddon(PointView& dst, const Key& key, const Addon& addon,
const uint64_t pointId) const
const std::vector<PointId>& pointIds) const
{
const uint64_t np(addon.points(key));
if (!np)
Expand All @@ -650,7 +682,7 @@ void EptReader::readAddon(PointView& dst, const Key& key, const Addon& addon,
// for an EPT-read of the full dataset. If the native EPT set already
// contains Classification, then we should overwrite it with zeroes
// where the addon leaves off.
for (uint64_t id(pointId); id < pointId + np; ++id)
for (const PointId id : pointIds)
{
dst.setField(addon.id(), id, 0);
}
Expand All @@ -671,7 +703,7 @@ void EptReader::readAddon(PointView& dst, const Key& key, const Addon& addon,
}

const char* pos(data.data());
for (uint64_t id(pointId); id < pointId + np; ++id)
for (const PointId id : pointIds)
{
dst.setField(addon.id(), addon.type(), id, pos);
pos += dimSize;
Expand Down
19 changes: 14 additions & 5 deletions io/EptReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <array>
#include <cstddef>
#include <cstdint>
#include <deque>
#include <memory>
#include <mutex>
#include <set>
Expand Down Expand Up @@ -93,19 +94,27 @@ class PDAL_DLL EptReader : public Reader
const arbiter::Endpoint& ep, std::map<Key, uint64_t>& target,
const Json::Value& current, const Key& key);

uint64_t readLaszip(PointView& view, const Key& key, uint64_t nodeId) const;
uint64_t readBinary(PointView& view, const Key& key, uint64_t nodeId) const;
void process(PointView& view, PointRef& pr, uint64_t nodeId,
uint64_t pointId) const;
std::vector<PointId> readNode(PointView& view, const Key& key,
uint64_t nodeId) const;
std::vector<PointId> readLaszip(PointView& view, const Key& key,
uint64_t nodeId) const;
std::vector<PointId> readBinary(PointView& view, const Key& key,
uint64_t nodeId) const;
std::vector<PointId> processNode(PointView& view, uint64_t nodeId,
PointRef& pr, uint64_t np) const;

bool process(PointView& view, PointRef& pr, uint64_t nodeId,
uint64_t pointId, uint64_t dstId) const;

void readAddon(PointView& dst, const Key& key, const Addon& addon,
uint64_t startId) const;
const std::vector<PointId>& pointIds) const;

std::string m_root;

std::unique_ptr<arbiter::Arbiter> m_arbiter;
std::unique_ptr<arbiter::Endpoint> m_ep;
std::unique_ptr<EptInfo> m_info;
std::deque<PointId> m_index;

class Args
{
Expand Down

0 comments on commit d98e0b0

Please sign in to comment.