Skip to content

Commit

Permalink
Make handler functions return a value indicating success
Browse files Browse the repository at this point in the history
  • Loading branch information
achim-k committed Apr 27, 2023
1 parent 1c76a20 commit 1fc7ba0
Show file tree
Hide file tree
Showing 4 changed files with 313 additions and 174 deletions.
25 changes: 25 additions & 0 deletions foxglove_bridge_base/include/foxglove_bridge/server_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,31 @@ constexpr size_t DEFAULT_SEND_BUFFER_LIMIT_BYTES = 10000000UL; // 10 MB

using MapOfSets = std::unordered_map<std::string, std::unordered_set<std::string>>;

template <typename IdType>
class ExeptionWithId : public std::runtime_error {
public:
explicit ExeptionWithId(IdType id, const std::string& what_arg)
: std::runtime_error(what_arg)
, _id(id) {}

IdType id() const {
return _id;
}

private:
IdType _id;
};

class ChannelError : public ExeptionWithId<ChannelId> {
using ExeptionWithId::ExeptionWithId;
};
class ClientChannelError : public ExeptionWithId<ClientChannelId> {
using ExeptionWithId::ExeptionWithId;
};
class ServiceError : public ExeptionWithId<ServiceId> {
using ExeptionWithId::ExeptionWithId;
};

struct ServerOptions {
std::vector<std::string> capabilities;
std::vector<std::string> supportedEncodings;
Expand Down
185 changes: 136 additions & 49 deletions foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ class Server final : public ServerInterface<ConnHandle> {
void sendJson(ConnHandle hdl, json&& payload);
void sendJsonRaw(ConnHandle hdl, const std::string& payload);
void sendBinary(ConnHandle hdl, const uint8_t* payload, size_t payloadSize);
void sendStatus(ConnHandle clientHandle, const StatusLevel level, const std::string& message);
void sendStatusAndLogMsg(ConnHandle clientHandle, const StatusLevel level,
const std::string& message);
void unsubscribeParamsWithoutSubscriptions(ConnHandle hdl,
Expand Down Expand Up @@ -236,7 +235,7 @@ inline Server<ServerConfiguration>::Server(std::string name, LogCallback logger,
_server.set_listen_backlog(128);

// Callback queue for handling client requests.
_handlerCallbackQueue = std::make_unique<CallbackQueue>(_logger, 1ul /* 1 thread */);
_handlerCallbackQueue = std::make_unique<CallbackQueue>(_logger, /*numThreads=*/1ul);
}

template <typename ServerConfiguration>
Expand Down Expand Up @@ -533,17 +532,6 @@ inline void Server<ServerConfiguration>::sendBinary(ConnHandle hdl, const uint8_
}
}

template <typename ServerConfiguration>
inline void Server<ServerConfiguration>::sendStatus(ConnHandle clientHandle,
const StatusLevel level,
const std::string& message) {
sendJson(clientHandle, json{
{"op", "status"},
{"level", static_cast<uint8_t>(level)},
{"message", message},
});
}

template <typename ServerConfiguration>
inline void Server<ServerConfiguration>::sendStatusAndLogMsg(ConnHandle clientHandle,
const StatusLevel level,
Expand All @@ -554,7 +542,11 @@ inline void Server<ServerConfiguration>::sendStatusAndLogMsg(ConnHandle clientHa
auto logger = level == StatusLevel::Error ? _server.get_elog() : _server.get_alog();
logger.write(logLevel, logMessage);

sendStatus(clientHandle, level, message);
sendJson(clientHandle, json{
{"op", "status"},
{"level", static_cast<uint8_t>(level)},
{"message", message},
});
}

template <typename ServerConfiguration>
Expand All @@ -565,12 +557,20 @@ inline void Server<ServerConfiguration>::handleMessage(ConnHandle hdl, MessagePt
switch (op) {
case OpCode::TEXT: {
_handlerCallbackQueue->addCallback([this, hdl, msg]() {
handleTextMessage(hdl, msg);
try {
handleTextMessage(hdl, msg);
} catch (const std::exception& e) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
}
});
} break;
case OpCode::BINARY: {
_handlerCallbackQueue->addCallback([this, hdl, msg]() {
handleBinaryMessage(hdl, msg);
try {
handleBinaryMessage(hdl, msg);
} catch (const std::exception& e) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
}
});
} break;
default:
Expand Down Expand Up @@ -619,6 +619,9 @@ inline void Server<ServerConfiguration>::handleTextMessage(ConnHandle hdl, Messa

switch (Integer(op)) {
case SUBSCRIBE: {
if (!_handlers.subscribeHandler) {
return;
}
for (const auto& sub : payload.at("subscriptions")) {
SubscriptionId subId = sub.at("id");
ChannelId channelId = sub.at("channelId");
Expand All @@ -635,13 +638,21 @@ inline void Server<ServerConfiguration>::handleTextMessage(ConnHandle hdl, Messa
"Channel " + std::to_string(channelId) + " is not available; ignoring subscription");
continue;
}
clientInfo.subscriptionsByChannel.emplace(channelId, subId);
if (_handlers.subscribeHandler) {

try {
_handlers.subscribeHandler(channelId, hdl);
clientInfo.subscriptionsByChannel.emplace(channelId, subId);
} catch (const ChannelError& e) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
} catch (...) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, op + ": Failed to execute handler");
}
}
} break;
case UNSUBSCRIBE: {
if (!_handlers.unsubscribeHandler) {
return;
}
for (const auto& subIdJson : payload.at("subscriptionIds")) {
SubscriptionId subId = subIdJson;
const auto& sub = findSubscriptionBySubId(subId);
Expand All @@ -651,14 +662,22 @@ inline void Server<ServerConfiguration>::handleTextMessage(ConnHandle hdl, Messa
" did not exist; ignoring unsubscription");
continue;
}

ChannelId chanId = sub->first;
clientInfo.subscriptionsByChannel.erase(sub);
if (_handlers.unsubscribeHandler) {
try {
_handlers.unsubscribeHandler(chanId, hdl);
clientInfo.subscriptionsByChannel.erase(sub);
} catch (const ChannelError& e) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
} catch (...) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, op + ": Failed to execute handler");
}
}
} break;
case ADVERTISE: {
if (!_handlers.clientAdvertiseHandler) {
return;
}
std::unique_lock<std::shared_mutex> clientChannelsLock(_clientChannelsMutex);
auto [clientPublicationsIt, isFirstPublication] =
_clientChannels.emplace(hdl, std::unordered_map<ClientChannelId, ClientAdvertisement>());
Expand All @@ -685,14 +704,22 @@ inline void Server<ServerConfiguration>::handleTextMessage(ConnHandle hdl, Messa
advertisement.topic = topic;
advertisement.encoding = chan.at("encoding").get<std::string>();
advertisement.schemaName = chan.at("schemaName").get<std::string>();
clientPublications.emplace(channelId, advertisement);
clientInfo.advertisedChannels.emplace(channelId);
if (_handlers.clientAdvertiseHandler) {

try {
_handlers.clientAdvertiseHandler(advertisement, hdl);
clientPublications.emplace(channelId, advertisement);
clientInfo.advertisedChannels.emplace(channelId);
} catch (const ClientChannelError& e) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
} catch (...) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, op + ": Failed to execute handler");
}
}
} break;
case UNADVERTISE: {
if (!_handlers.clientUnadvertiseHandler) {
return;
}
std::unique_lock<std::shared_mutex> clientChannelsLock(_clientChannelsMutex);
auto clientPublicationsIt = _clientChannels.find(hdl);
if (clientPublicationsIt == _clientChannels.end()) {
Expand All @@ -708,14 +735,18 @@ inline void Server<ServerConfiguration>::handleTextMessage(ConnHandle hdl, Messa
if (channelIt == clientPublications.end()) {
continue;
}
clientPublications.erase(channelIt);
const auto advertisedChannelIt = clientInfo.advertisedChannels.find(channelId);
if (advertisedChannelIt != clientInfo.advertisedChannels.end()) {
clientInfo.advertisedChannels.erase(advertisedChannelIt);
}

if (_handlers.clientUnadvertiseHandler) {
try {
_handlers.clientUnadvertiseHandler(channelId, hdl);
clientPublications.erase(channelIt);
const auto advertisedChannelIt = clientInfo.advertisedChannels.find(channelId);
if (advertisedChannelIt != clientInfo.advertisedChannels.end()) {
clientInfo.advertisedChannels.erase(advertisedChannelIt);
}
} catch (const ClientChannelError& e) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
} catch (...) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, op + ": Failed to execute handler");
}
}
} break;
Expand All @@ -728,7 +759,14 @@ inline void Server<ServerConfiguration>::handleTextMessage(ConnHandle hdl, Messa
const auto requestId = payload.find("id") == payload.end()
? std::nullopt
: std::optional<std::string>(payload["id"].get<std::string>());
_handlers.parameterRequestHandler(paramNames, requestId, hdl);

try {
_handlers.parameterRequestHandler(paramNames, requestId, hdl);
} catch (const std::exception& e) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
} catch (...) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, op + ": Failed to execute handler");
}
} break;
case SET_PARAMETERS: {
if (!_handlers.parameterChangeHandler) {
Expand All @@ -739,7 +777,13 @@ inline void Server<ServerConfiguration>::handleTextMessage(ConnHandle hdl, Messa
const auto requestId = payload.find("id") == payload.end()
? std::nullopt
: std::optional<std::string>(payload["id"].get<std::string>());
_handlers.parameterChangeHandler(parameters, requestId, hdl);
try {
_handlers.parameterChangeHandler(parameters, requestId, hdl);
} catch (const std::exception& e) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
} catch (...) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, op + ": Failed to execute handler");
}
} break;
case SUBSCRIBE_PARAMETER_UPDATES: {
if (!_handlers.parameterSubscriptionHandler) {
Expand All @@ -761,9 +805,17 @@ inline void Server<ServerConfiguration>::handleTextMessage(ConnHandle hdl, Messa
clientSubscribedParams.insert(paramNames.begin(), paramNames.end());
}

if (!paramsToSubscribe.empty()) {
if (paramsToSubscribe.empty()) {
return;
}

try {
_handlers.parameterSubscriptionHandler(paramsToSubscribe,
ParameterSubscriptionOperation::SUBSCRIBE, hdl);
} catch (const std::exception& e) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
} catch (...) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, op + ": Failed to execute handler");
}
} break;
case UNSUBSCRIBE_PARAMETER_UPDATES: {
Expand All @@ -783,25 +835,36 @@ inline void Server<ServerConfiguration>::handleTextMessage(ConnHandle hdl, Messa
unsubscribeParamsWithoutSubscriptions(hdl, paramNames);
} break;
case SUBSCRIBE_CONNECTION_GRAPH: {
std::unique_lock<std::shared_mutex> lock(_connectionGraphMutex);
_connectionGraph.subscriptionCount++;
if (!_handlers.subscribeConnectionGraphHandler) {
return;
}

bool subscribeToConnnectionGraph = false;
{
std::unique_lock<std::shared_mutex> lock(_connectionGraphMutex);
_connectionGraph.subscriptionCount++;
subscribeToConnnectionGraph = _connectionGraph.subscriptionCount == 1;
}

if (_connectionGraph.subscriptionCount == 1 && _handlers.subscribeConnectionGraphHandler) {
if (subscribeToConnnectionGraph) {
// First subscriber, let the handler know that we are interested in updates.
_server.get_alog().write(APP, "Subscribing to connection graph updates.");
_handlers.subscribeConnectionGraphHandler(true);
clientInfo.subscribedToConnectionGraph = true;
}

json::array_t publishedTopicsJson, subscribedTopicsJson, advertisedServicesJson;
for (const auto& [name, ids] : _connectionGraph.publishedTopics) {
publishedTopicsJson.push_back(nlohmann::json{{"name", name}, {"publisherIds", ids}});
}
for (const auto& [name, ids] : _connectionGraph.subscribedTopics) {
subscribedTopicsJson.push_back(nlohmann::json{{"name", name}, {"subscriberIds", ids}});
}
for (const auto& [name, ids] : _connectionGraph.advertisedServices) {
advertisedServicesJson.push_back(nlohmann::json{{"name", name}, {"providerIds", ids}});
{
std::shared_lock<std::shared_mutex> lock(_connectionGraphMutex);
for (const auto& [name, ids] : _connectionGraph.publishedTopics) {
publishedTopicsJson.push_back(nlohmann::json{{"name", name}, {"publisherIds", ids}});
}
for (const auto& [name, ids] : _connectionGraph.subscribedTopics) {
subscribedTopicsJson.push_back(nlohmann::json{{"name", name}, {"subscriberIds", ids}});
}
for (const auto& [name, ids] : _connectionGraph.advertisedServices) {
advertisedServicesJson.push_back(nlohmann::json{{"name", name}, {"providerIds", ids}});
}
}

const json jsonMsg = {
Expand All @@ -816,11 +879,19 @@ inline void Server<ServerConfiguration>::handleTextMessage(ConnHandle hdl, Messa
sendJsonRaw(hdl, jsonMsg.dump());
} break;
case UNSUBSCRIBE_CONNECTION_GRAPH: {
if (!_handlers.subscribeConnectionGraphHandler) {
return;
}

if (clientInfo.subscribedToConnectionGraph) {
clientInfo.subscribedToConnectionGraph = false;
std::unique_lock<std::shared_mutex> lock(_connectionGraphMutex);
_connectionGraph.subscriptionCount--;
if (_connectionGraph.subscriptionCount == 0 && _handlers.subscribeConnectionGraphHandler) {
bool unsubscribeFromConnnectionGraph = false;
{
std::unique_lock<std::shared_mutex> lock(_connectionGraphMutex);
_connectionGraph.subscriptionCount--;
unsubscribeFromConnnectionGraph = _connectionGraph.subscriptionCount == 0;
}
if (unsubscribeFromConnnectionGraph) {
_server.get_alog().write(APP, "Unsubscribing from connection graph updates.");
_handlers.subscribeConnectionGraphHandler(false);
}
Expand Down Expand Up @@ -860,6 +931,10 @@ inline void Server<ServerConfiguration>::handleBinaryMessage(ConnHandle hdl, Mes

switch (op) {
case ClientBinaryOpcode::MESSAGE_DATA: {
if (!_handlers.clientMessageHandler) {
return;
}

if (length < 5) {
sendStatusAndLogMsg(hdl, StatusLevel::Error,
"Invalid message length " + std::to_string(length));
Expand All @@ -885,7 +960,7 @@ inline void Server<ServerConfiguration>::handleBinaryMessage(ConnHandle hdl, Mes
return;
}

if (_handlers.clientMessageHandler) {
try {
const auto& advertisement = channelIt->second;
const uint32_t sequence = 0;
const ClientMessage clientMessage{static_cast<uint64_t>(timestamp),
Expand All @@ -895,6 +970,10 @@ inline void Server<ServerConfiguration>::handleBinaryMessage(ConnHandle hdl, Mes
length,
data};
_handlers.clientMessageHandler(clientMessage, hdl);
} catch (const ServiceError& e) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
} catch (...) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, "callService: Failed to execute handler");
}
} break;
case ClientBinaryOpcode::SERVICE_CALL_REQUEST: {
Expand Down Expand Up @@ -1263,8 +1342,16 @@ inline void Server<ServerConfiguration>::unsubscribeParamsWithoutSubscriptions(
for (const auto& param : paramsToUnsubscribe) {
_server.get_alog().write(APP, "Unsubscribing from parameter '" + param + "'.");
}
_handlers.parameterSubscriptionHandler(paramsToUnsubscribe,
ParameterSubscriptionOperation::UNSUBSCRIBE, hdl);

try {
_handlers.parameterSubscriptionHandler(paramsToUnsubscribe,
ParameterSubscriptionOperation::UNSUBSCRIBE, hdl);
} catch (const std::exception& e) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
} catch (...) {
sendStatusAndLogMsg(hdl, StatusLevel::Error,
"Failed to unsubscribe from one more more parameters");
}
}
}

Expand Down
Loading

0 comments on commit 1fc7ba0

Please sign in to comment.