Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into issue-2001
Browse files Browse the repository at this point in the history
  • Loading branch information
abellgithub committed Jul 2, 2018
2 parents d0dbf13 + a3135db commit c5abf08
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 7 deletions.
3 changes: 2 additions & 1 deletion pdal/StageExtensions.cpp
Expand Up @@ -133,7 +133,8 @@ void StageExtensions::load()
}
}

void StageExtensions::set(const std::string& stage, const StringList& exts)
PDAL_DLL void StageExtensions::set(const std::string& stage,
const StringList& exts)
{
std::lock_guard<std::mutex> lock(m_mutex);
if (Utils::startsWith(stage, "readers."))
Expand Down
2 changes: 1 addition & 1 deletion pdal/StageExtensions.hpp
Expand Up @@ -45,7 +45,7 @@ class StageExtensions
public:
StageExtensions(LogPtr log);

void set(const std::string& stage, const StringList& exts);
PDAL_DLL void set(const std::string& stage, const StringList& exts);
std::string defaultReader(const std::string& filename);
std::string defaultWriter(const std::string& filename);
private:
Expand Down
8 changes: 4 additions & 4 deletions pdal/Streamable.cpp
Expand Up @@ -140,10 +140,10 @@ void Streamable::execute(StreamPointTable& table)
}
else
{
for (auto s2 : s->m_inputs)
for (auto bi = s->m_inputs.rbegin(); bi != s->m_inputs.rend(); bi++)
{
StreamableList newStages(stages);
newStages.push_front(dynamic_cast<Streamable *>(s2));
newStages.push_front(dynamic_cast<Streamable *>(*bi));
lists.push_front(newStages);
}
}
Expand All @@ -152,8 +152,8 @@ void Streamable::execute(StreamPointTable& table)
lastRunStages.done(table);
break;
}
stages = lists.back();
lists.pop_back();
stages = lists.front();
lists.pop_front();
s = stages.front();
}
}
Expand Down
163 changes: 162 additions & 1 deletion test/unit/StreamingTest.cpp
@@ -1,5 +1,4 @@
/******************************************************************************
* Copyright (c) 2011, Michael P. Gerlek (mpg@flaxen.com)
*
* All rights reserved.
*
Expand Down Expand Up @@ -108,6 +107,168 @@ TEST(Streaming, filter)
EXPECT_EQ(cnt, 400);
}

namespace
{

class R : public Reader, public Streamable
{

public:
R() : m_entered(false)
{}

std::string getName() const
{ return "readers.r"; }

private:
virtual bool processOne(PointRef& point)
{
if (!m_entered)
{
std::cout << tag();
m_entered = true;
return true;
}
return false;
}

virtual point_count_t read(PointViewPtr, point_count_t)
{
std::cout << tag();
return 0;
}
private:
bool m_entered;
};

class F : public Filter, public Streamable
{

public:
std::string getName() const
{ return "filters.f"; };

private:
virtual bool processOne(PointRef& point)
{
std::cout << tag();
return true;
}

virtual void filter(PointView& view)
{
std::cout << tag();
}
};

} // unnamed pdal

TEST(Streaming, order)
{

StaticPluginInfo const f_info
{
"filters.f",
"F Filter",
"",
{}
};

CREATE_STATIC_STAGE(F, f_info)

StaticPluginInfo const r_info
{
"readers.r",
"R Reader",
"",
{}
};

CREATE_STATIC_STAGE(R, r_info)

std::string pipeline =
R"(
{
"pipeline" : [
{
"type": "readers.r",
"tag": "G"
},
{
"type": "readers.r",
"tag": "H"
},
{
"type": "readers.r",
"tag": "D"
},
{
"type": "filters.f",
"tag": "E",
"inputs": [ "G", "H" ]
},
{
"type": "readers.r",
"tag": "F"
},
{
"type": "filters.f",
"tag": "B",
"inputs": [ "D", "E", "F" ]
},
{
"type": "readers.r",
"tag": "C"
},
{
"type": "filters.f",
"tag": "A",
"inputs": [ "B", "C" ]
}
]
}
)";

/**
Tree representation of the pipeline above:
G H
\ /
D E F
\ | /
\ | /
B C
| /
A
**/

std::ostringstream oss;

// Order of traversal based on one point from each source.
auto ctx = Utils::redirect(std::cout, oss);
std::istringstream iss(pipeline);
PipelineManager mgr;
mgr.readPipeline(iss);
FixedPointTable t(10000);
mgr.executeStream(t);
Utils::restore(std::cout, ctx);
std::string output(oss.str());
EXPECT_NE(output.find("DBAGEBAHEBAFBACA"), std::string::npos);

// In non-stream mode we get a letter for each point view.
oss.clear();
oss.str("");
std::cerr << oss.str() << "!\n";
iss.seekg(0);
ctx = Utils::redirect(std::cout, oss);
PipelineManager mgr2;
mgr2.readPipeline(iss);
mgr2.execute();
output = oss.str();
Utils::restore(std::cout, ctx);
EXPECT_NE(output.find("DGHEEFBBBBCAAAAA"), std::string::npos);
}

// Test that SRS changes aren't repeated when new input is processed.
TEST(Streaming, issue_2009)
{
Expand Down

0 comments on commit c5abf08

Please sign in to comment.