Skip to content

Commit

Permalink
#5231: Redesign MessageBus to increase performance when sending large…
Browse files Browse the repository at this point in the history
… volumes of messages through the pipe.
  • Loading branch information
codereader committed Jun 26, 2020
1 parent 8abc3c2 commit d967ebd
Show file tree
Hide file tree
Showing 16 changed files with 119 additions and 23 deletions.
49 changes: 35 additions & 14 deletions include/imessagebus.h
Expand Up @@ -19,6 +19,10 @@ class IMessage
public:
virtual ~IMessage() {}

// Returns a unique ID of this message, see IMessage::Type enum
// for well-known types
virtual std::size_t getId() const = 0;

// Returns true if this message has been handled
bool isHandled() const
{
Expand All @@ -29,13 +33,31 @@ class IMessage
{
_handled = handled;
}

public:
// Pre-defined message type IDs.
// Plugin code can define their own IDs in the range of 1000+
enum Type : std::size_t
{
AutomaticMapSaveRequest,
CommandExecutionFailed,
GameConfigNeeded,
LongRunningOperation,
MapFileOperation,
Notification,
TextureChanged,

UserDefinedMessagesGoHigherThanThis = 999,
};
};

/**
* General-purpose handler used to send and process message to registered listeners.
* Client code can send arbitrary message objects as long as they are deriving from
* IMessage.
* To receive the message, client code needs to add a listener.
* Listeners need to subscribe to each message type they're interested in.
* To receive the message, client code needs to add a listener, which can either
* listen for a specific type of messages or all of them.
*/
class IMessageBus
{
Expand All @@ -45,19 +67,19 @@ class IMessageBus
typedef std::function<void(IMessage&)> Listener;

/**
* Register a listener which gets called with the message as argument.
* An ID of the listener will be returned which can be used for unsubscription.
* Registers a listener that is only called when the give message type
* is sent across the wire. Use the returned ID to unsubscribe the listener.
*/
virtual std::size_t addListener(const Listener& listener) = 0;
virtual std::size_t addListener(std::size_t messageType, const Listener& listener) = 0;

/**
* Unsubscribe the given listener.
*/
virtual void removeListener(std::size_t listenerId) = 0;

/**
* Send the given message along the wire. Clients need to construct
* the message themselves, and check the possible result.
* Send the given message to the given channel. The channel ID refers to
* a given message type and must have been acquired using getChannelId() beforehand.
*/
virtual void sendMessage(IMessage& message) = 0;
};
Expand All @@ -73,22 +95,21 @@ class TypeListener :
public:
TypeListener(void (*specialisedFunc)(T&)) :
std::function<void(T&)>(specialisedFunc)
{}
{
static_assert(std::is_base_of<IMessage, T>::value, "Listener must accept IMessage types");
}

TypeListener(const std::function<void(T&)>& specialisedFunc) :
std::function<void(T&)>(specialisedFunc)
{}
{
static_assert(std::is_base_of<IMessage, T>::value, "Listener must accept IMessage types");
}

// Fulfills the Listener function signature
void operator()(IMessage& message)
{
// Perform a type cast and dispatch to our base class' call operator
try
{
std::function<void(T&)>::operator()(dynamic_cast<T&>(message));
}
catch (std::bad_cast&)
{}
std::function<void(T&)>::operator()(static_cast<T&>(message));
}
};

Expand Down
5 changes: 5 additions & 0 deletions libs/messages/AutomaticMapSaveRequest.h
Expand Up @@ -16,6 +16,11 @@ class AutomaticMapSaveRequest :
AutomaticMapSaveRequest()
{}

std::size_t getId() const override
{
return Type::AutomaticMapSaveRequest;
}

// Deny this request to prevent the save from happening
void denyWithReason(const std::string& reason)
{
Expand Down
5 changes: 5 additions & 0 deletions libs/messages/CommandExecutionFailed.h
Expand Up @@ -22,6 +22,11 @@ class CommandExecutionFailedMessage :
_exception(exception)
{}

std::size_t getId() const override
{
return Type::CommandExecutionFailed;
}

std::string getMessage() const
{
return _exception.what();
Expand Down
5 changes: 5 additions & 0 deletions libs/messages/GameConfigNeededMessage.h
Expand Up @@ -27,6 +27,11 @@ class ConfigurationNeeded :
ConfigurationNeeded()
{}

std::size_t getId() const override
{
return Type::GameConfigNeeded;
}

const GameConfiguration& getConfig() const
{
return _config;
Expand Down
5 changes: 5 additions & 0 deletions libs/messages/LongRunningOperationMessage.h
Expand Up @@ -38,6 +38,11 @@ class LongRunningOperationMessage :
_message(message)
{}

std::size_t getId() const override
{
return Type::LongRunningOperation;
}

OperationEvent getType() const
{
return _event;
Expand Down
5 changes: 5 additions & 0 deletions libs/messages/MapFileOperation.h
Expand Up @@ -76,6 +76,11 @@ class FileOperation :
}
}

std::size_t getId() const override
{
return IMessage::Type::MapFileOperation;
}

const std::string& getText() const
{
return _message;
Expand Down
5 changes: 5 additions & 0 deletions libs/messages/NotificationMessage.h
Expand Up @@ -36,6 +36,11 @@ class NotificationMessage :
return _message;
}

std::size_t getId() const override
{
return IMessage::Type::Notification;
}

Type getType() const
{
return _type;
Expand Down
5 changes: 5 additions & 0 deletions libs/messages/TextureChanged.h
Expand Up @@ -18,6 +18,11 @@ class TextureChangedMessage :
TextureChangedMessage()
{}

std::size_t getId() const override
{
return Type::TextureChanged;
}

// Convenience method
static void Send()
{
Expand Down
4 changes: 2 additions & 2 deletions radiant/RadiantApp.cpp
Expand Up @@ -146,9 +146,9 @@ void RadiantApp::onStartupEvent(wxCommandEvent& ev)
// In first-startup scenarios the game configuration is not present
// in which case the GameManager will dispatch a message asking
// for showing a dialog or similar. Connect the listener.
_coreModule->get()->getMessageBus().addListener(
_coreModule->get()->getMessageBus().addListener(radiant::IMessage::Type::GameConfigNeeded,
radiant::TypeListener(ui::GameSetupDialog::HandleGameConfigMessage));

// Pick up all the statically defined modules and register them
module::internal::StaticModuleList::RegisterModules();

Expand Down
1 change: 1 addition & 0 deletions radiant/ui/AutoSaveRequestHandler.h
Expand Up @@ -17,6 +17,7 @@ class AutoSaveRequestHandler
AutoSaveRequestHandler()
{
_msgSubscription = GlobalRadiantCore().getMessageBus().addListener(
radiant::IMessage::Type::AutomaticMapSaveRequest,
radiant::TypeListener<map::AutomaticMapSaveRequest>(
sigc::mem_fun(this, &AutoSaveRequestHandler::handleRequest)));
}
Expand Down
2 changes: 1 addition & 1 deletion radiant/ui/LongRunningOperationHandler.h
Expand Up @@ -26,7 +26,7 @@ class LongRunningOperationHandler
LongRunningOperationHandler() :
_level(0)
{
GlobalRadiantCore().getMessageBus().addListener(
GlobalRadiantCore().getMessageBus().addListener(radiant::IMessage::Type::LongRunningOperation,
radiant::TypeListener<radiant::LongRunningOperationMessage>(
std::bind(&LongRunningOperationHandler::onMessage, this, std::placeholders::_1)
)
Expand Down
1 change: 1 addition & 0 deletions radiant/ui/MapFileProgressHandler.h
Expand Up @@ -24,6 +24,7 @@ class MapFileProgressHandler
MapFileProgressHandler()
{
_msgSubscription = GlobalRadiantCore().getMessageBus().addListener(
radiant::IMessage::Type::MapFileOperation,
radiant::TypeListener<map::FileOperation>(
sigc::mem_fun(this, &MapFileProgressHandler::handleFileOperation)));
}
Expand Down
3 changes: 3 additions & 0 deletions radiant/ui/UserInterfaceModule.cpp
Expand Up @@ -158,13 +158,16 @@ void UserInterfaceModule::initialiseModule(const ApplicationContext& ctx)
initialiseEntitySettings();

_execFailedListener = GlobalRadiantCore().getMessageBus().addListener(
radiant::IMessage::Type::CommandExecutionFailed,
radiant::TypeListener<radiant::CommandExecutionFailedMessage>(
sigc::mem_fun(this, &UserInterfaceModule::handleCommandExecutionFailure)));

_textureChangedListener = GlobalRadiantCore().getMessageBus().addListener(
radiant::IMessage::Type::TextureChanged,
radiant::TypeListener(UserInterfaceModule::HandleTextureChanged));

_notificationListener = GlobalRadiantCore().getMessageBus().addListener(
radiant::IMessage::Type::Notification,
radiant::TypeListener(UserInterfaceModule::HandleNotificationMessage));

// Initialise the AAS UI
Expand Down
1 change: 1 addition & 0 deletions radiant/ui/surfaceinspector/SurfaceInspector.cpp
Expand Up @@ -688,6 +688,7 @@ void SurfaceInspector::_preShow()

// Get notified about texture changes
_textureMessageHandler = GlobalRadiantCore().getMessageBus().addListener(
radiant::IMessage::Type::TextureChanged,
radiant::TypeListener<radiant::TextureChangedMessage>(
sigc::mem_fun(this, &SurfaceInspector::handleTextureChangedMessage)));

Expand Down
15 changes: 15 additions & 0 deletions radiantcore/brush/BrushModule.cpp
Expand Up @@ -17,6 +17,7 @@
#include "ipreferencesystem.h"
#include "module/StaticModule.h"
#include "messages/TextureChanged.h"
#include "debugging/ScopedDebugTimer.h"

#include "selection/algorithm/Primitives.h"

Expand Down Expand Up @@ -142,10 +143,24 @@ void BrushModuleImpl::shutdownModule()
destroy();
}

void benchmarkMessagebus(const cmd::ArgumentList& args)
{
int count = args[0].getInt();

ScopedDebugTimer timer("TextureChangedMessage");

for (int i = 0; i < count; ++i)
{
radiant::TextureChangedMessage::Send();
}
}

void BrushModuleImpl::registerBrushCommands()
{
GlobalEventManager().addRegistryToggle("TogTexLock", RKEY_ENABLE_TEXTURE_LOCK);

GlobalCommandSystem().addCommand("BenchmarkMessagebus", benchmarkMessagebus, { cmd::ARGTYPE_INT });

GlobalCommandSystem().addCommand("BrushMakePrefab", selection::algorithm::brushMakePrefab, { cmd::ARGTYPE_INT, cmd::ARGTYPE_INT | cmd::ARGTYPE_OPTIONAL });
GlobalCommandSystem().addCommand("BrushMakeSided", selection::algorithm::brushMakeSided, { cmd::ARGTYPE_INT });

Expand Down
31 changes: 25 additions & 6 deletions radiantcore/messagebus/MessageBus.h
Expand Up @@ -2,6 +2,7 @@

#include <mutex>
#include "imessagebus.h"
#include "itextstream.h"

namespace radiant
{
Expand All @@ -11,7 +12,12 @@ class MessageBus :
{
private:
std::recursive_mutex _lock;
std::map<std::size_t, Listener> _listeners;

// Listener and its registration handle
typedef std::pair<std::size_t, Listener> ListenerPlusId;

// Maps message types to Listeners
std::multimap<std::size_t, ListenerPlusId> _listeners;

bool _processingMessage;
std::size_t _nextId;
Expand All @@ -21,12 +27,12 @@ class MessageBus :
_nextId(1)
{}

std::size_t addListener(const Listener& listener) override
std::size_t addListener(std::size_t messageType, const Listener& listener) override
{
std::lock_guard<std::recursive_mutex> guard(_lock);

auto id = _nextId++;
_listeners.emplace(id, listener);
_listeners.emplace(messageType, std::make_pair(id, listener));

return id;
}
Expand All @@ -35,16 +41,29 @@ class MessageBus :
{
std::lock_guard<std::recursive_mutex> guard(_lock);

_listeners.erase(listenerId);
for (auto it = _listeners.begin(); it != _listeners.end(); ++it)
{
if (it->second.first == listenerId)
{
_listeners.erase(it);
return;
}
}

rWarning() << "MessageBus: Could not locate listener with ID " << listenerId << std::endl;
}

void sendMessage(IMessage& message) override
{
std::lock_guard<std::recursive_mutex> guard(_lock);

for (auto it = _listeners.begin(); it != _listeners.end(); /* in-loop */)
// Get the message type ID
auto msgId = message.getId();

for (auto it = _listeners.lower_bound(msgId);
it != _listeners.end() && it != _listeners.upper_bound(msgId); /* in-loop */)
{
(*it++).second(message);
(*it++).second.second(message);
}
}
};
Expand Down

0 comments on commit d967ebd

Please sign in to comment.