Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deferred connection evaluation #41

Closed
wants to merge 9 commits into from
1 change: 1 addition & 0 deletions src/kdbindings/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ set(HEADERS
property.h
property_updater.h
signal.h
connection_evaluator.h
utils.h
)

Expand Down
65 changes: 65 additions & 0 deletions src/kdbindings/connection_evaluator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#pragma once

#include <functional>
#include <list>
#include <mutex>

namespace KDBindings {

/**
* @brief Manages and evaluates conditions for deferred connections.
LeonMatthesKDAB marked this conversation as resolved.
Show resolved Hide resolved
*
* The ConnectionEvaluator class is responsible for managing and evaluating conditions
* that determine when deferred connections should be executed. It provides mechanisms
* to evaluate connections based on specific criteria. This class is
* used in conjunction with the Signal class to establish and control connections.
LeonMatthesKDAB marked this conversation as resolved.
Show resolved Hide resolved
*/
class ConnectionEvaluator
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved
{

public:
ConnectionEvaluator() = default;

ConnectionEvaluator(const ConnectionEvaluator &) noexcept = default;
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved

ConnectionEvaluator &operator=(const ConnectionEvaluator &) noexcept = default;

ConnectionEvaluator(ConnectionEvaluator &&other) noexcept = delete;
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved

ConnectionEvaluator &operator=(ConnectionEvaluator &&other) noexcept = delete;

/**
* @brief Evaluate and execute deferred connections.
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved
*
* This function is responsible for evaluating and executing deferred connections.
* It locks the `connectionsMutex` to ensure thread safety while accessing the list
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved
* of deferred connections. It then copies the list of connections to avoid
* interference with reentrant emissions. Finally, it iterates through the copied
* connections and executes each one by calling the associated function object.
*/
void evaluateDeferredConnections()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I think the approach of simply copying (or moving out of) the connections is quite cool, as it allows to quickly release the mutex again.

However, this approach conflicts with deleting connections.
When the following happens asynchronously

  • emit a deferred signal
  • disconnect the connection
  • evaluateDeferredConnections

The connection should either be evaluated before the disconnect call returns, or it should not be evaluated, if the disconnect call returns before evaluateDeferredConnections is called. As it's highly likely that the connection includes dangling pointers after the call to disconnect the connection.

So if an evaluation is in progress the call to disconnect cannot return until the evaluation has completely finished.

If you still want to keep the approach of simply moving the connections over to release the mutex early, you could add a second mutex for "disconnect", which would be held longer, whilst the mutex for adding new connections could already be released.
However, this double-locking may be less performant in the end, depending on how many connections are queued, as we now need to lock two mutexes when evaluating and when disconnecting.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really have a doubt about this, how i am thinking about is, so follow the order
-> emit a deferred signal
-> disconnect the connection
-> evaluateDeferredConnections

disconnecting the signal after the emit or before the evaluation wouldn't be have any issue, as evaluation depends on the list of connections in the evaluator which has already queued up through the call to emit. Call to disconnect removes the connection from the m_connections of the Signal class, which is fine.

I can only think about the issue when evaluation or disconnect either be used in mulithreading context. Or i might be thinking wrong?

I am adding the test case for this, to look up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like we discussed with @seanharmer and @lemirep , both approaches are feasible.
Removing queued invocations on disconnect is really a lot better to protect from dangling references.
But as Sean noted, it might just be something that needs to be expected with a new paradigm.

So if you're not sure what to do yet, feel free to explore your initial implementation a bit further (i.e. still evaluating queued invocations even after disconnect).
I'd just like to see the implications of this in use in an actual application.
Basically: How easy is it to foot-gun yourself with it?

{
std::list<std::function<void()>> copiedConnections;
{
std::lock_guard<std::mutex> lock(connectionsMutex);
copiedConnections = connections;
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved
}

for (auto &connection : copiedConnections) {
connection();
}
}

private:
template<typename...>
friend class Signal;

void addConnection(std::function<void()> connection)
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved
{
connections.push_back(connection);
}

std::list<std::function<void()>> connections;
std::mutex connectionsMutex;
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved
};
} // namespace KDBindings
44 changes: 43 additions & 1 deletion src/kdbindings/signal.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
#include <stdexcept>
#include <type_traits>
#include <utility>
#include <mutex>

#include <kdbindings/connection_evaluator.h>
#include <kdbindings/genindex_array.h>
#include <kdbindings/utils.h>

Expand Down Expand Up @@ -242,6 +244,22 @@ class Signal
return m_connections.insert({ slot });
}

// Establish a deferred connection between signal and slot, where ConnectionEvaluator object
// used to queue all the connection to evaluate later. The returned
// value can be used to disconnect the function again.
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved
Private::GenerationalIndex connectDeferred(ConnectionEvaluator &evaluator, std::function<void(Args...)> const &slot)
{
std::lock_guard<std::mutex> lock(connectionMutex);
auto connection = [&evaluator, slot](Args... args) {
auto lambda = [slot, args...]() {
slot(args...);
};

evaluator.addConnection(lambda);
};
return m_connections.insert({ connection, true });
}

// Disconnects a previously connected function
void disconnect(const Private::GenerationalIndex &id) override
LeonMatthesKDAB marked this conversation as resolved.
Show resolved Hide resolved
{
Expand Down Expand Up @@ -284,6 +302,8 @@ class Signal
// Calls all connected functions
void emit(Args... p) const
{
std::lock_guard<std::mutex> lock(connectionMutex);

const auto numEntries = m_connections.entriesSize();

// This loop can tolerate signal handles being disconnected inside a slot,
Expand All @@ -294,18 +314,21 @@ class Signal
if (index) {
const auto con = m_connections.get(*index);

if (!con->blocked)
if (!con->blocked) {
con->slot(p...);
}
}
}
}

private:
struct Connection {
std::function<void(Args...)> slot;
bool isDeferred{ false };
LeonMatthesKDAB marked this conversation as resolved.
Show resolved Hide resolved
bool blocked{ false };
};
mutable Private::GenerationalIndexArray<Connection> m_connections;
mutable std::mutex connectionMutex;
LeonMatthesKDAB marked this conversation as resolved.
Show resolved Hide resolved
};

public:
Expand Down Expand Up @@ -349,6 +372,25 @@ class Signal
return ConnectionHandle{ m_impl, m_impl->connect(slot) };
}

/**
* @brief Establishes a deferred connection between the provided evaluator and slot.
*
* This function allows connecting an evaluator and a slot such that the slot's execution
* is deferred until the conditions evaluated by the `evaluator` are met.
*
* First argument to the function is reference to the `ConnectionEvaluator` responsible for determining
* when the slot should be executed.
*
* @return An instance of ConnectionHandle, that can be used to disconnect
* or temporarily block the connection.
*/
ConnectionHandle connectDeferred(ConnectionEvaluator &evaluator, std::function<void(Args...)> const &slot)
{
ensureImpl();

return ConnectionHandle(m_impl, m_impl->connectDeferred(evaluator, slot));
}

/**
* A template overload of Signal::connect that makes it easier to connect arbitrary functions to this
* Signal.
Expand Down
53 changes: 53 additions & 0 deletions tests/signal/tst_signal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@

#include "kdbindings/utils.h"
#include <kdbindings/signal.h>
#include <kdbindings/connection_evaluator.h>

#include <stdexcept>
#include <string>
#include <thread>

#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
#include <doctest.h>
Expand Down Expand Up @@ -77,6 +79,57 @@ TEST_CASE("Signal connections")
REQUIRE(lambdaCalled == true);
}

SUBCASE("Deferred Connections")
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved
{
ConnectionEvaluator evaluator;
Signal<int> signal1;
Signal<std::string> signal2;
Signal<int, std::string> signal3;
int val = 4;

std::thread thread1([&] {
signal1.connectDeferred(evaluator, [&val](int value) {
val--;
});
});

std::thread thread2([&] {
signal2.connectDeferred(evaluator, [&val](std::string) {
val++;
});
});

// Wait for threads to finish connecting
thread1.join();
thread2.join();

const auto result = signal3.connectDeferred(evaluator, [&val](int, std::string) {
val++;
});

REQUIRE(result.isActive());

signal3.disconnect(result);

// Emit signals(reentrant) in different threads
std::thread thread3([&] {
signal1.emit(42);
});

std::thread thread4([&] {
signal2.emit("Hi");
});

thread3.join();
thread4.join();

REQUIRE(!result.isActive());
signal3.emit(1, "Hi"); // it does not affect, as the signal3 already disconnected

evaluator.evaluateDeferredConnections();
REQUIRE(val == 4);
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved
}

SUBCASE("A signal with arguments can be connected to a lambda and invoked with l-value args")
{
Signal<std::string, int> signal;
Expand Down