Skip to content

Commit

Permalink
Filtered streaming works.
Browse files Browse the repository at this point in the history
Update RangeFilter to streaming.
Update FauxReader to streaming.
Add real streaming test.
  • Loading branch information
abellgithub committed Oct 28, 2015
1 parent 13d3c7f commit a6222e3
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 22 deletions.
11 changes: 6 additions & 5 deletions filters/range/RangeFilter.cpp
Expand Up @@ -180,6 +180,7 @@ void RangeFilter::prepared(PointTableRef table)
std::sort(m_range_list.begin(), m_range_list.end());
}


// Determine if a point passes a single range.
bool RangeFilter::dimensionPasses(double v, const Range& r) const
{
Expand All @@ -196,7 +197,7 @@ bool RangeFilter::dimensionPasses(double v, const Range& r) const
// as ORs between ranges of the same dimension and ANDs between ranges
// of different dimensions. This is simple logic, but is probably the most
// common case.
bool RangeFilter::pointPasses(PointView *view, PointId idx) const
bool RangeFilter::processOne(PointRef point)
{
Dimension::Id::Enum lastId = m_range_list.front().m_id;
bool passes = false;
Expand All @@ -216,8 +217,7 @@ bool RangeFilter::pointPasses(PointView *view, PointId idx) const
// a new dimension.
else if (passes)
continue;
double v = view->getFieldAs<double>(r.m_id, idx);
passes = dimensionPasses(v, r);
passes = dimensionPasses(point.getFieldAs<double>(r.m_id), r);
}
return passes;
}
Expand All @@ -232,11 +232,12 @@ PointViewSet RangeFilter::run(PointViewPtr inView)
PointViewPtr outView = inView->makeNew();

for (PointId i = 0; i < inView->size(); ++i)
if (pointPasses(inView.get(), i))
if (processOne(inView->point(i)))
outView->appendPoint(*inView, i);

viewSet.insert(outView);
return viewSet;
}

} // pdal
} // namespace pdal

1 change: 0 additions & 1 deletion filters/range/RangeFilter.hpp
Expand Up @@ -95,7 +95,6 @@ class PDAL_DLL RangeFilter : public pdal::Filter
virtual bool processOne(PointRef point);
virtual PointViewSet run(PointViewPtr view);
bool dimensionPasses(double v, const Range& r) const;
bool pointPasses(PointView *view, PointId idx) const;

RangeFilter& operator=(const RangeFilter&); // not implemented
RangeFilter(const RangeFilter&); // not implemented
Expand Down
2 changes: 1 addition & 1 deletion io/faux/FauxReader.cpp
Expand Up @@ -173,7 +173,6 @@ Dimension::IdList FauxReader::getDefaultDimensions()
return ids;
}


void FauxReader::ready(PointTableRef /*table*/)
{
m_returnNum = 1;
Expand Down Expand Up @@ -234,6 +233,7 @@ bool FauxReader::processOne(PointRef point)
return true;
}


point_count_t FauxReader::read(PointViewPtr view, point_count_t count)
{
for (PointId idx = 0; idx < count; ++idx)
Expand Down
27 changes: 15 additions & 12 deletions src/Stage.cpp
Expand Up @@ -150,53 +150,56 @@ void Stage::execute(FixedPointTable& table)
s = s->m_inputs[0];
}

// Separate the first stage.
// Separate out the first stage.
Stage *reader = stages.front();
// We may have a reader in the filter list, but we treat them in the
// same way.

// Build a list of all stages except the first. We may have a writer in
// this list in addition to filters, but we treat them in the same way.
auto begin = stages.begin();
begin++;
std::copy(begin, stages.end(), std::back_inserter(filters));

for (Stage *s : stages)
s->ready(table);

// Loop until we're finished. We handle the number of points up to
// the capacity of the FixedPointTable that we've been provided.
bool finished = false;
while (!finished)
{
PointId idx = 0;
PointRef point(&table, idx);
point_count_t pointLimit = table.capacity();

// When we get false back from a reader, we're done, so set
// the point limit to the number of points processed in this loop
// of the table.
for (PointId idx = 0; idx < pointLimit; idx++)
{
point.setPointId(idx);
finished = !reader->processOne(point);
if (finished)
pointLimit = idx + 1;
pointLimit = idx;
}
reader->l_done(table);

std::cerr << "Filters list size() = " << filters.size() << "!\n";
std::cerr << "Point limit = " << pointLimit << "!\n";
// When we get a false back from a filter, we're filtering out a
// point, so add it to the list of skips so that it doesn't get
// processed by subsequent filters.
for (Stage *s : filters)
{
std::cerr << "Stage name = " << s->getName() << "!\n";
for (PointId idx = 0; idx < pointLimit; idx++)
{
if (skips[idx])
continue;
point.setPointId(idx);
if (!s->processOne(point))
{
std::cerr << "SKIP for " << idx << "!\n";
skips[idx] = true;
}
else
std::cerr << "No SKIP for " << idx << "!\n";
}
s->l_done(table);
}

// Yes, vector<bool> is terrible. Can do something better later.
for (size_t i = 0; i < skips.size(); ++i)
skips[i] = false;
table.reset();
Expand Down
6 changes: 3 additions & 3 deletions test/unit/StreamingTest.cpp
Expand Up @@ -110,7 +110,7 @@ TEST(Streaming, filter)
int i = point.getFieldAs<int>(Dimension::Id::X);
std::cerr << "Testing " << m_val << " against " << i << "!\n";
EXPECT_EQ(m_val, i);
if (m_val / 100 == 0)
if (m_val % 100 == 0)
m_val += 100;
m_val++;
m_cnt++;
Expand All @@ -127,6 +127,6 @@ TEST(Streaming, filter)
w.setInput(f);

FixedPointTable t(50);
f.prepare(t);
f.execute(t);
w.prepare(t);
w.execute(t);
}

0 comments on commit a6222e3

Please sign in to comment.