Skip to content

Commit

Permalink
BRAYNS-595 Refactor network loop. (#1217)
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrien4193 committed Nov 27, 2023
1 parent 3257147 commit 61b6aea
Show file tree
Hide file tree
Showing 46 changed files with 359 additions and 660 deletions.
45 changes: 1 addition & 44 deletions brayns/Brayns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,31 +83,6 @@ class NetworkStartup
return !uri.empty();
}
};

class RateLimiter
{
public:
explicit RateLimiter(brayns::Duration period):
_period(period)
{
}

void wait()
{
auto now = brayns::Clock::now();
auto delta = now - _lastCall;
if (delta >= _period)
{
return;
}
std::this_thread::sleep_for(_period - delta);
_lastCall = brayns::Clock::now();
}

private:
brayns::Duration _period;
brayns::TimePoint _lastCall = brayns::Clock::now();
};
} // namespace

namespace brayns
Expand All @@ -130,24 +105,11 @@ Brayns::Brayns(int argc, const char **argv):

Log::info("Loading plugins.");
_pluginManager.loadPlugins();

if (_network)
{
Log::info("Starting network manager.");
_network->start();
}
}

Brayns::~Brayns()
{
if (_network)
{
_network->stop();
}
_loaderRegistry = {};
// make sure that plugin objects are removed first, as plugins are
// destroyed before the engine, but plugin destruction still should have
// a valid engine and _api (aka this object).
_pluginManager.destroyPlugins();
}

Expand All @@ -163,12 +125,7 @@ void Brayns::runAsService()
throw std::runtime_error("Trying to run a service without URI");
}
Log::info("Brayns service started.");
auto limiter = RateLimiter(std::chrono::milliseconds(1));
while (_engine.isRunning())
{
_network->update();
limiter.wait();
}
_network->run();
}

Engine &Brayns::getEngine()
Expand Down
10 changes: 0 additions & 10 deletions brayns/engine/core/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,6 @@ EngineFactories &Engine::getFactories() noexcept
return _factories;
}

void Engine::setRunning(bool keepRunning) noexcept
{
_keepRunning = keepRunning;
}

bool Engine::isRunning() const noexcept
{
return _keepRunning;
}

const ParametersManager &Engine::getParametersManager() const noexcept
{
return _params;
Expand Down
13 changes: 0 additions & 13 deletions brayns/engine/core/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,6 @@ class Engine
*/
EngineFactories &getFactories() noexcept;

/**
* @brief Sets wether the engine should keep running or not
*/
void setRunning(bool running) noexcept;

/**
* @brief Returns wether the engine is running or not
*/
bool isRunning() const noexcept;

/**
* @brief Returns the system parameters manager
*/
Expand Down Expand Up @@ -131,8 +121,5 @@ class Engine
Renderer _renderer;

EngineFactories _factories;

// Run flag
bool _keepRunning{true};
};
} // namespace brayns
6 changes: 6 additions & 0 deletions brayns/network/INetworkInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,11 @@ class INetworkInterface
* @note The network manager already calls it on each update.
*/
virtual void poll() = 0;

/**
* @brief Stop the network loop.
*
*/
virtual void stop() = 0;
};
} // namespace brayns
110 changes: 85 additions & 25 deletions brayns/network/NetworkManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include <brayns/network/entrypoint/EntrypointBuilder.h>
#include <brayns/network/socket/ClientSocket.h>
#include <brayns/network/socket/ServerSocket.h>
#include <brayns/network/socket/SocketListener.h>
#include <brayns/network/task/TaskDispatcher.h>

#include <brayns/network/entrypoints/AddClipPlaneEntrypoint.h>
#include <brayns/network/entrypoints/AddClippingGeometryEntrypoint.h>
Expand All @@ -43,7 +43,6 @@
#include <brayns/network/entrypoints/ClearRenderablesEntrypoint.h>
#include <brayns/network/entrypoints/ColorRampEntrypoint.h>
#include <brayns/network/entrypoints/EnableSimulationEntrypoint.h>
#include <brayns/network/entrypoints/ExitLaterEntrypoint.h>
#include <brayns/network/entrypoints/ExportGBuffersEntrypoint.h>
#include <brayns/network/entrypoints/FramebufferEntrypoint.h>
#include <brayns/network/entrypoints/GetLoadersEntrypoint.h>
Expand Down Expand Up @@ -73,7 +72,7 @@ class CoreEntrypointRegistry
static void registerEntrypoints(
brayns::INetworkInterface &interface,
brayns::PluginAPI &api,
const brayns::EntrypointRegistry &entrypoints,
brayns::EntrypointRegistry &entrypoints,
brayns::TaskManager &tasks)
{
auto &parameters = api.getParametersManager();
Expand Down Expand Up @@ -112,7 +111,6 @@ class CoreEntrypointRegistry
builder.add<brayns::ClearRenderablesEntrypoint>(models, simulation);
builder.add<brayns::ColorModelEntrypoint>(models);
builder.add<brayns::EnableSimulationEntrypoint>(models);
builder.add<brayns::ExitLaterEntrypoint>(engine);
builder.add<brayns::ExportGBuffersEntrypoint>(engine, token);
builder.add<brayns::GetApplicationParametersEntrypoint>(application);
builder.add<brayns::GetCameraViewEntrypoint>(engine);
Expand Down Expand Up @@ -141,7 +139,7 @@ class CoreEntrypointRegistry
builder.add<brayns::GetSimulationParametersEntrypoint>(simulation);
builder.add<brayns::InspectEntrypoint>(engine);
builder.add<brayns::InstantiateModelEntrypoint>(models);
builder.add<brayns::QuitEntrypoint>(engine);
builder.add<brayns::QuitEntrypoint>(interface);
builder.add<brayns::RegistryEntrypoint>(entrypoints);
builder.add<brayns::RemoveModelEntrypoint>(models, simulation);
builder.add<brayns::RenderImageEntrypoint>(engine);
Expand Down Expand Up @@ -195,34 +193,89 @@ class SocketFactory
return std::make_unique<brayns::ServerSocket>(parameters, std::move(listener));
}
};

class SocketListener : public brayns::ISocketListener
{
public:
explicit SocketListener(brayns::NetworkMonitor &monitor):
_monitor(monitor)
{
}

void onConnect(const brayns::ClientRef &client) override
{
_monitor.notifyConnection(client);
}

void onDisconnect(const brayns::ClientRef &client) override
{
_monitor.notifyDisonnection(client);
}

void onRequest(brayns::ClientRequest request) override
{
_monitor.notifyRequest(std::move(request));
}

private:
brayns::NetworkMonitor &_monitor;
};

class NetworkReceiver
{
public:
static void receive(
brayns::NetworkBuffer &buffer,
brayns::ClientManager &clients,
brayns::EntrypointRegistry &entrypoints,
brayns::TaskManager &tasks)
{
for (const auto &client : buffer.connectedClients)
{
brayns::Log::info("New client connected: {}.", client);
clients.add(client);
}
for (auto &request : buffer.requests)
{
brayns::Log::info("New request {}.", request);
brayns::TaskDispatcher::dispatch(std::move(request), entrypoints, tasks);
}
for (const auto &client : buffer.disconnectedClients)
{
brayns::Log::info("Client disconnected itself: {}.", client);
tasks.disconnect(client);
clients.remove(client);
}
}
};
} // namespace

namespace brayns
{
NetworkManager::NetworkManager(PluginAPI &api):
_api(api)
NetworkManager::NetworkManager(PluginAPI &api)
{
auto listener = std::make_unique<brayns::SocketListener>(_clients, _entrypoints, _tasks);
_socket = SocketFactory::createSocket(_api, std::move(listener));
CoreEntrypointRegistry::registerEntrypoints(*this, _api, _entrypoints, _tasks);
auto listener = std::make_unique<SocketListener>(_monitor);
_socket = SocketFactory::createSocket(api, std::move(listener));
CoreEntrypointRegistry::registerEntrypoints(*this, api, _entrypoints, _tasks);
}

void NetworkManager::start()
void NetworkManager::run()
{
_socket->start();
}

void NetworkManager::stop()
{
_running = true;
while (_running)
{
Log::debug("Waiting for incoming messages.");
auto buffer = _monitor.wait();
Log::debug("Processing received messages.");
NetworkReceiver::receive(buffer, _clients, _entrypoints, _tasks);
Log::debug("Running all registered tasks.");
_tasks.run();
}
_socket->stop();
}

void NetworkManager::update()
{
Log::trace("Network update");
_socket->poll();
_tasks.runAllTasks();
_entrypoints.forEach([](auto &entrypoint) { entrypoint.onUpdate(); });
_clients.clear();
_monitor.clear();
_tasks.clear();
}

void NetworkManager::registerEntrypoint(EntrypointRef entrypoint)
Expand All @@ -235,7 +288,14 @@ void NetworkManager::registerEntrypoint(EntrypointRef entrypoint)

void NetworkManager::poll()
{
Log::trace("Poll network requests from plugin or entrypoint");
_socket->poll();
Log::trace("Poll network requests from entrypoint");
auto buffer = _monitor.poll();
NetworkReceiver::receive(buffer, _clients, _entrypoints, _tasks);
}

void NetworkManager::stop()
{
Log::info("Network loop stopped.");
_running = false;
}
} // namespace brayns
27 changes: 11 additions & 16 deletions brayns/network/NetworkManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <brayns/pluginapi/PluginAPI.h>

#include "INetworkInterface.h"
#include "NetworkMonitor.h"

namespace brayns
{
Expand All @@ -48,22 +49,10 @@ class NetworkManager : public INetworkInterface
explicit NetworkManager(PluginAPI &api);

/**
* @brief Start network server / client to accept incoming requests.
* @brief Start server / client until stop request is received.
*
*/
void start();

/**
* @brief Close all connections.
*
*/
void stop();

/**
* @brief Poll socket and run pending tasks.
*
*/
void update();
void run();

/**
* @brief Register an entrypoint.
Expand All @@ -75,15 +64,21 @@ class NetworkManager : public INetworkInterface
/**
* @brief Poll socket to receive incoming messages.
*
* Automatically called in update().
*/
virtual void poll() override;

/**
* @brief Stop the network loop.
*
*/
virtual void stop() override;

private:
PluginAPI &_api;
std::unique_ptr<ISocket> _socket;
ClientManager _clients;
EntrypointRegistry _entrypoints;
TaskManager _tasks;
bool _running = false;
NetworkMonitor _monitor;
};
} // namespace brayns

0 comments on commit 61b6aea

Please sign in to comment.