Skip to content
This repository has been archived by the owner on Jul 8, 2022. It is now read-only.

Commit

Permalink
Pipeline and Pipefilters can be reset for re-use
Browse files Browse the repository at this point in the history
  • Loading branch information
Ahmet Bilgili committed Mar 4, 2016
1 parent 8e24893 commit 6db9e3f
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 18 deletions.
5 changes: 5 additions & 0 deletions livre/core/pipeline/Executable.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class Executable
*/
virtual Futures getPreconditions() const { return Futures(); }

/**
* Resets the executable
*/
virtual void reset() {}

};

}
Expand Down
6 changes: 6 additions & 0 deletions livre/core/pipeline/Future.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ class Future
*/
const AsyncData& getAsyncData() const;

/**
* @param future is the future to be checked with
* @return true if both futures are same
*/
bool operator==( const Future& future ) const { return _impl == future._impl; }

private:

friend bool livre::waitForAny( const Futures& future );
Expand Down
3 changes: 1 addition & 2 deletions livre/core/pipeline/FutureMap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ struct FutureMapImpl
if( portName == ALL_PORTS )
{
Futures futures;
futures.reserve( _futureMap.size( ));
for( const auto& pair: _futureMap )
futures.push_back( pair.second );
return futures;
Expand Down Expand Up @@ -163,7 +162,7 @@ Futures OutFutureMap::getFutures() const

Future OutFutureMap::getFuture( const std::string& portName ) const
{
return _impl->getFutures( portName )[ 0 ];
return _impl->getFutures( portName ).front();
}

bool OutFutureMap::isReady( const std::string& portName ) const
Expand Down
19 changes: 19 additions & 0 deletions livre/core/pipeline/InputPort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ struct InputPort::Impl
_futures.push_back( outputPort.getPromise()->getFuture( ));
}

void disconnect( const OutputPort& outputPort )
{
Futures::iterator it = _futures.begin();
while( it != _futures.end())
{
if( *it == outputPort.getPromise()->getFuture( ))
{
_futures.erase( it );
return;
}
++it;
}
}

Futures _futures;
const PortInfo _info;
};
Expand All @@ -78,6 +92,11 @@ void InputPort::connect( const OutputPort& outputPort )
_impl->connect( outputPort );
}

void InputPort::disconnect( const OutputPort& outputPort )
{
_impl->disconnect( outputPort );
}

const std::string& InputPort::getName() const
{
return _impl->getName();
Expand Down
6 changes: 6 additions & 0 deletions livre/core/pipeline/InputPort.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ class InputPort
*/
void connect( const OutputPort& inputPort );

/**
* Disconnects an output port from input port
* @param inputPort input port
*/
void disconnect( const OutputPort& inputPort );

private:

struct Impl;
Expand Down
26 changes: 25 additions & 1 deletion livre/core/pipeline/OutputPort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ namespace
Therefore, the get/set/query operations on promises/futures causes deadlocks
when used with wait_for_any( futurelist ) if futurelist is including the
future to be queried. With below implementation no locking is needed between
future operations. A higher granularity can be added for checking whet */
future operations. A higher granularity can be added for checking whether
the list includes the future or not, but simply exiting the wait_for_any
operation when lock is owned is simpler.
*/

ReadWriteMutex waitForAnyLock;
}
Expand Down Expand Up @@ -88,6 +91,13 @@ struct AsyncData::Impl
_future.wait();
}

void reset()
{
ConstPortDataPromise newPromise;
_promise.swap( newPromise );
_future = _promise.get_future();
}

ConstPortDataPromise _promise;
mutable ConstPortDataFuture _future;
const std::string _name;
Expand Down Expand Up @@ -124,6 +134,10 @@ void AsyncData::wait() const
_impl->wait();
}

void AsyncData::reset()
{
_impl->reset();
}

struct OutputPort::Impl
{
Expand All @@ -148,6 +162,11 @@ struct OutputPort::Impl
return _info.name;
}

void reset()
{
_data.reset();
}

const PortInfo _info;
AsyncData _data;
PromisePtr _portPromise;
Expand Down Expand Up @@ -181,6 +200,11 @@ void OutputPort::connect( InputPort& inputPort )
inputPort.connect( *this );
}

void OutputPort::reset()
{
_impl->reset();
}

bool waitForAny( const Futures& futures )
{
if( futures.empty( ))
Expand Down
10 changes: 10 additions & 0 deletions livre/core/pipeline/OutputPort.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ class AsyncData
*/
void wait() const;

/**
* Resets the promise/future
*/
void reset();

private:

/**
Expand Down Expand Up @@ -113,6 +118,11 @@ class OutputPort
*/
void connect( InputPort& inputPort );

/**
* Resets the promise/future
*/
void reset();

private:

struct Impl;
Expand Down
16 changes: 15 additions & 1 deletion livre/core/pipeline/PipeFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ struct PipeFilter::Impl
Futures getPostconditions() const
{
Futures futures;
futures.reserve(_outputMap.size( ));
for( const auto& pair: _outputMap )
{
const Future& outputFuture = pair.second->getPromise()->getFuture();
Expand Down Expand Up @@ -203,6 +202,16 @@ struct PipeFilter::Impl
inputPort->connect( *outputPort );
}

void reset()
{
for( auto& pair: _readyPortsMap )
_inputMap[ pair.first ]->disconnect( *pair.second );

_readyPortsMap.clear();
for( const auto& pair: _outputMap )
pair.second->reset();
}

PipeFilter& _pipeFilter;
const std::string _name;
const servus::uint128_t _id;
Expand Down Expand Up @@ -241,6 +250,11 @@ const servus::uint128_t& PipeFilter::getId() const
return _impl->_id;
}

void PipeFilter::reset()
{
_impl->reset();
}

const std::string& PipeFilter::getName() const
{
return _impl->_name;
Expand Down
5 changes: 5 additions & 0 deletions livre/core/pipeline/PipeFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ class PipeFilter : public Executable
*/
const servus::uint128_t& getId() const;

/**
* Resets the filter. At this point pipe filter execution should be complete.
*/
void reset();

private:

friend class Pipeline;
Expand Down
11 changes: 11 additions & 0 deletions livre/core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ struct Pipeline::Impl
return _waitFutures;
}

void reset()
{
for( auto pair: _executableMap )
pair.second->reset();
}

~Impl()
{}

Expand Down Expand Up @@ -185,6 +191,11 @@ void Pipeline::execute()
return _impl->execute();
}

void Pipeline::reset()
{
_impl->reset();
}

Futures Pipeline::getPreconditions() const
{
return _impl->getConnectedInFutureMap();
Expand Down
5 changes: 5 additions & 0 deletions livre/core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ class Pipeline : public Executable
*/
void execute() final;

/**
* Resets the pipeline. At this point pipeline execution should be complete.
*/
void reset() final;

private:

/**
Expand Down
2 changes: 1 addition & 1 deletion livre/core/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,9 @@ typedef std::vector< ConstOutputPortPtr > ConstOutputPorts;
typedef std::vector< ConstInputPortPtr > ConstInputPorts;
typedef std::vector< NameTypePair > NameTypePairs;
typedef std::vector< PortInfo > PortInfos;
typedef std::vector< Future > Futures;
typedef std::vector< PromisePtr > Promises;

typedef std::list< Future > Futures;
typedef std::list< ExecutablePtr > Executables;

template <class T>
Expand Down
62 changes: 49 additions & 13 deletions tests/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,14 @@ BOOST_AUTO_TEST_CASE( testConnection )
BOOST_CHECK_EQUAL( outputData.thanksForAllTheFish, 222 );
}

livre::PipelinePtr createPipeline( livre::PipeFilterPtr& pipeOutput,
livre::PipelinePtr createPipeline( livre::PipeFilterPtr& pipeInput,
livre::PipeFilterPtr& pipeOutput,
uint32_t inputValue,
size_t nConvertFilter = 1 )
{
livre::FilterPtr filter( new TestFilter( ));
livre::PipelinePtr pipeline( new livre::Pipeline( ));
livre::PipeFilterPtr pipeInput = pipeline->add( "Producer", filter );
pipeInput = pipeline->add( "Producer", filter );
pipeOutput = pipeline->add( "Consumer", filter );

livre::FilterPtr convertFilter( new ConvertFilter( ));
Expand All @@ -229,9 +230,15 @@ livre::ExecutorPtr createExecutor(const size_t nbOfWorkerThreads )

BOOST_AUTO_TEST_CASE( testSynchronousPipeline )
{
livre::PipeFilterPtr pipeOutput;
const uint32_t inputValue = 90;
livre::PipelinePtr pipeline = createPipeline( pipeOutput, inputValue );
livre::PipeFilterPtr pipeInput;
livre::PipeFilterPtr pipeOutput;

livre::PipelinePtr pipeline = createPipeline( pipeInput,
pipeOutput,
inputValue,
1 );

pipeline->execute();

const livre::OutFutureMap portFutures( pipeOutput->getPostconditions( ));
Expand All @@ -241,9 +248,15 @@ BOOST_AUTO_TEST_CASE( testSynchronousPipeline )

BOOST_AUTO_TEST_CASE( testWaitPipeline )
{
livre::PipeFilterPtr pipeOutput;
const uint32_t inputValue = 90;
livre::PipelinePtr pipeline = createPipeline( pipeOutput, inputValue );
livre::PipeFilterPtr pipeInput;
livre::PipeFilterPtr pipeOutput;

livre::PipelinePtr pipeline = createPipeline( pipeInput,
pipeOutput,
inputValue,
1 );

livre::ExecutorPtr executor = createExecutor( 2 );

const livre::FutureMap pipelineFutures( executor->execute( pipeline->getExecutables( )));
Expand All @@ -256,9 +269,15 @@ BOOST_AUTO_TEST_CASE( testWaitPipeline )

BOOST_AUTO_TEST_CASE( testAsynchronousPipeline )
{
livre::PipeFilterPtr pipeOutput;
const uint32_t inputValue = 90;
livre::PipelinePtr pipeline = createPipeline( pipeOutput, inputValue );
livre::PipeFilterPtr pipeInput;
livre::PipeFilterPtr pipeOutput;

livre::PipelinePtr pipeline = createPipeline( pipeInput,
pipeOutput,
inputValue,
1 );

livre::ExecutorPtr executor = createExecutor( 2 );

executor->execute( pipeline->getExecutables( ));
Expand All @@ -274,11 +293,14 @@ BOOST_AUTO_TEST_CASE( testOneToManyManyToOnePipeline )
{
const size_t convertFilterCount = 10;
const uint32_t inputValue = 90;
livre::PipeFilterPtr pipeInput;
livre::PipeFilterPtr pipeOutput;

livre::PipelinePtr pipeline = createPipeline( pipeOutput,
livre::PipelinePtr pipeline = createPipeline( pipeInput,
pipeOutput,
inputValue,
convertFilterCount );

livre::ExecutorPtr executor = createExecutor( 1 );
executor->execute( pipeline->getExecutables( ));

Expand All @@ -291,17 +313,31 @@ BOOST_AUTO_TEST_CASE( testOneToManyManyToOnePipeline )
{
const size_t convertFilterCount = 10;
const uint32_t inputValue = 90;
livre::PipeFilterPtr pipeInput;
livre::PipeFilterPtr pipeOutput;

livre::PipelinePtr pipeline = createPipeline( pipeOutput,
livre::PipelinePtr pipeline = createPipeline( pipeInput,
pipeOutput,
inputValue,
convertFilterCount );
livre::ExecutorPtr executor = createExecutor( 8 );
const livre::Futures& futures = executor->execute( pipeline->getExecutables( ));

const livre::OutFutureMap portFutures1( pipeOutput->getPostconditions( ));
const OutputData& outputData1 = portFutures1.get< OutputData >( "TestOutputData" );
BOOST_CHECK_EQUAL( outputData1.thanksForAllTheFish, 1761 );

// Reset the pipeline but wait pipeline execution before
const livre::FutureMap futureMap( futures );
futureMap.wait();

pipeline->reset();
executor->execute( pipeline->getExecutables( ));
pipeInput->getPromise( "TestInputData" )->set( InputData( inputValue ));

const livre::OutFutureMap portFutures( pipeOutput->getPostconditions( ));
const OutputData& outputData = portFutures.get< OutputData >( "TestOutputData" );
BOOST_CHECK_EQUAL( outputData.thanksForAllTheFish, 1761 );
const livre::OutFutureMap portFutures2( pipeOutput->getPostconditions( ));
const OutputData& outputData2 = portFutures2.get< OutputData >( "TestOutputData" );
BOOST_CHECK_EQUAL( outputData2.thanksForAllTheFish, 1761 );
}


Expand Down

0 comments on commit 6db9e3f

Please sign in to comment.