Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions include/sampleflow/consumers/action.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ namespace SampleFlow
*
* The implementation of this class is thread-safe, i.e., its
* consume() member function can be called concurrently and from multiple
* threads. However, it ensures that the action is executed only once
* at a time.
* threads. A constructor argument determines whether the action is executed
* only once at any given time, or whether it can be executed concurrently.
*
*
* @tparam InputType The C++ type used for the samples $x_k$.
Expand All @@ -107,13 +107,19 @@ namespace SampleFlow
/**
* Constructor. Take the action (a function object) as argument.
*
* The second argument (defaulted to ParallelMode::synchronous) indicates
* The second argument determines whether the action needs to be
* protected by a mutex or whether the action can be executed multiple
* times concurrently if there are several samples coming in in short
* succession on different threads.
*
* The third argument (defaulted to ParallelMode::synchronous) indicates
* whether incoming samples should be processed immediately, on the current
* thread, or can be deferred to (possibly out of order) processing on
* a separate thread. See ParallelMode for more information. Whether this
* is possible or not depends on what the `action` function does.
*/
Action (const std::function<void (InputType, AuxiliaryData)> &action,
const bool allow_concurrent_action = false,
const ParallelMode supported_parallel_modes = ParallelMode::synchronous);

/**
Expand All @@ -140,21 +146,25 @@ namespace SampleFlow

private:
/**
* A mutex used to lock access to all member variables when running
* on multiple threads.
* A mutex used to synchronize the call to the action function, if so
* desired by the caller.
*/
mutable std::mutex mutex;

const bool allow_concurrent_action;

const std::function<void (InputType, AuxiliaryData)> action_function;
};


template <typename InputType>
Action<InputType>::
Action (const std::function<void (InputType, AuxiliaryData)> &action,
const bool allow_concurrent_action,
const ParallelMode supported_parallel_modes)
:
Consumer<InputType>(supported_parallel_modes),
allow_concurrent_action (allow_concurrent_action),
action_function (action)
{}

Expand All @@ -173,9 +183,13 @@ namespace SampleFlow
Action<InputType>::
consume (InputType sample, AuxiliaryData aux_data)
{
std::lock_guard<std::mutex> lock(mutex);

action_function (std::move(sample), std::move(aux_data));
if (allow_concurrent_action)
action_function (std::move(sample), std::move(aux_data));
else
{
std::lock_guard<std::mutex> lock(mutex);
action_function (std::move(sample), std::move(aux_data));
}
}
}
}
Expand Down