Skip to content

Commit

Permalink
Support multi-source streaming.
Browse files Browse the repository at this point in the history
  • Loading branch information
abellgithub committed Dec 16, 2015
1 parent 6d698ab commit 8deeeb7
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 87 deletions.
2 changes: 2 additions & 0 deletions filters/merge/MergeFilter.hpp
Expand Up @@ -56,6 +56,8 @@ class PDAL_DLL MergeFilter : public MultiFilter
PointViewPtr m_view;

virtual void ready(PointTableRef table);
virtual bool processOne(PointRef& point)
{ return true; }
virtual PointViewSet run(PointViewPtr in);

MergeFilter& operator=(const MergeFilter&); // not implemented
Expand Down
1 change: 1 addition & 0 deletions include/pdal/Stage.hpp
Expand Up @@ -180,6 +180,7 @@ class PDAL_DLL Stage
std::cerr << "Can't run stage = " << getName() << "!\n";
return PointViewSet();
}
void execute(StreamPointTable& table, std::list<Stage *>& stages);
};

PDAL_DLL std::ostream& operator<<(std::ostream& ostr, const Stage&);
Expand Down
52 changes: 41 additions & 11 deletions src/Stage.cpp
Expand Up @@ -155,25 +155,55 @@ PointViewSet Stage::execute(PointTableRef table)
// Streamed execution.
void Stage::execute(StreamPointTable& table)
{
table.finalize();
typedef std::list<Stage *> StageList;

std::list<Stage *> stages;
std::list<Stage *> filters;
std::vector<bool> skips(table.capacity());
SpatialReference srs;
std::list<StageList> lists;
StageList stages;

// Build a list of the stages.
table.finalize();

// Walk from the current stage backwards. As we add each input, copy
// the list of stages and push it on a list. We then pull a list from the
// front of list and keep going. Placing on the back and pulling from the
// front insures that the stages will be executed in the order that they
// were added. If we hit stage with no previous stages, we execute
// the stage list.
// All this often amounts to a bunch of list copying for
// no reason, but it's more simple than what we might otherwise do and
// this should be a nit in the grand scheme of execution time.
//
// As an example, if there are four paths from the end stage (writer) to
// reader stages, there will be four stage lists and execute(table, stages)
// will be called four times.
Stage *s = this;
stages.push_front(s);
while (true)
{
if (s->m_inputs.size() > 1)
throw pdal_error("Can't execute streaming pipeline stages "
"containing multiple inputs.");
stages.push_front(s);
if (s->m_inputs.empty())
execute(table, stages);
else
{
for (auto s2 : s->m_inputs)
{
StageList newStages(stages);
newStages.push_front(s2);
lists.push_front(newStages);
}
}
if (lists.empty())
break;
s = s->m_inputs[0];
stages = lists.back();
lists.pop_back();
s = stages.front();
}
}


void Stage::execute(StreamPointTable& table, std::list<Stage *>& stages)
{
std::vector<bool> skips(table.capacity());
std::list<Stage *> filters;
SpatialReference srs;

// Separate out the first stage.
Stage *reader = stages.front();
Expand Down
129 changes: 53 additions & 76 deletions test/unit/StreamingTest.cpp
Expand Up @@ -36,96 +36,73 @@

#include <pdal/Filter.hpp>
#include <pdal/PointTable.hpp>
#include <SbetReader.hpp>
#include <FauxReader.hpp>
#include <RangeFilter.hpp>
#include <MergeFilter.hpp>
#include <StreamCallbackFilter.hpp>
#include "Support.hpp"

using namespace pdal;

/**
TEST(Streaming, simple)
// This test depends on stages being executed in the order that they were
// added to each parent. If you change order, things will break.
TEST(Streaming, filter)
{
class SimpleFilter : public Filter
{
public:
std::string getName() const
{ return "filters.simple"; }
SimpleFilter() : m_cnt(0)
{}
private:
int m_cnt;
bool processOne(PointRef point)
{
double d = point.getFieldAs<double>(Dimension::Id::X);
std::cerr << "Value[" << m_cnt++ << "] = " << d << "!\n";
return true;
}
};
Options ro1;
ro1.add("bounds", BOX3D(0, 0, 0, 99, 99, 99));
ro1.add("mode", "ramp");
ro1.add("count", 100);
FauxReader r1;
r1.setOptions(ro1);

Options ops;
ops.add("filename", Support::datapath("sbet/autzen_trim.sbet"));
SbetReader r;
r.setOptions(ops);
Options ro2;
ro2.add("bounds", BOX3D(100, 100, 100, 199, 199, 199));
ro2.add("mode", "ramp");
ro2.add("count", 100);
FauxReader r2;
r2.setOptions(ro2);

SimpleFilter f;
f.setInput(r);
Options ro3;
ro3.add("bounds", BOX3D(200, 200, 200, 299, 299, 299));
ro3.add("mode", "ramp");
ro3.add("count", 100);
FauxReader r3;
r3.setOptions(ro3);

FixedPointTable t(100);
f.prepare(t);
f.execute(t);
}
**/
Options ro4;
ro4.add("bounds", BOX3D(300, 300, 300, 399, 399, 399));
ro4.add("mode", "ramp");
ro4.add("count", 100);
FauxReader r4;
r4.setOptions(ro4);

MergeFilter m1;
m1.setInput(r1);
m1.setInput(r2);

TEST(Streaming, filter)
{
Options ro;
ro.add("bounds", BOX3D(1, 1, 1, 1000, 1000, 1000));
ro.add("mode", "ramp");
ro.add("count", 1000);
FauxReader r;
r.setOptions(ro);
MergeFilter m2;
m2.setInput(r3);
m2.setInput(r4);

Options fo;
fo.add("limits", "X(100:200], X(300:400], X(500:600], "
"X(700:800], X(900:1000]");
RangeFilter f;
f.setOptions(fo);
f.setInput(r);
MergeFilter m3;
m3.setInput(m1);
m3.setInput(m2);

class Writer : public Filter
StreamCallbackFilter f;
int cnt = 0;
auto cb = [&cnt](PointRef& point)
{
public:
std::string getName() const
{ return "filterstester"; }
Writer() : m_cnt(0), m_val(101)
{}
private:
int m_cnt;
int m_val;

bool processOne(PointRef& point)
{
int i = point.getFieldAs<int>(Dimension::Id::X);
EXPECT_EQ(m_val, i);
if (m_val % 100 == 0)
m_val += 100;
m_val++;
m_cnt++;
return true;
}

void done(PointTableRef)
{
EXPECT_EQ(m_cnt, 500);
}
static int x(0), y(0), z(0);
EXPECT_EQ(point.getFieldAs<int>(Dimension::Id::X), x++);
EXPECT_EQ(point.getFieldAs<int>(Dimension::Id::Y), y++);
EXPECT_EQ(point.getFieldAs<int>(Dimension::Id::Z), z++);
cnt++;
return true;
};
f.setCallback(cb);
f.setInput(m3);

Writer w;
w.setInput(f);

FixedPointTable t(50);
w.prepare(t);
w.execute(t);
FixedPointTable t(20);
f.prepare(t);
f.execute(t);
EXPECT_EQ(cnt, 400);
}

0 comments on commit 8deeeb7

Please sign in to comment.