Skip to content

Commit

Permalink
fix(integration): Fix messages not being received from clients (#343)
Browse files Browse the repository at this point in the history
* Update testing script with stand call

* fix(integration): Messages not getting sent to clients

* Fix test
  • Loading branch information
AndyTWF committed Oct 2, 2021
1 parent e89dd19 commit 7d85c9b
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 155 deletions.
25 changes: 23 additions & 2 deletions script/IntegrationClientSocketTest.cpp
Expand Up @@ -50,11 +50,32 @@ thread = std::make_shared<std::thread>([clientSocket]() {

int bytesReceived;
std::array<char, 4096> receiveBuffer;
bool sentStandRequest = false;
do {
bytesReceived = recv(clientSocket, &receiveBuffer[0], 4096, 0);
if (bytesReceived > 0) {
LogInfo("Message received: " + std::string(&receiveBuffer[0], &receiveBuffer[bytesReceived]) +
"\n\n");
std::string message = std::string(receiveBuffer.cbegin(), receiveBuffer.cbegin() + bytesReceived);
LogInfo("Message received: " + message + "\n\n");
if (!sentStandRequest) {
nlohmann::json stand{
{"type", "assign_stand"},
{"version", 1},
{"id", "test_message_id"},
{"data",
{{"callsign", "BAW999"},
{"airfield", "EGLL"},
{"stand", "502"},
}},
};
std::string standMessage = stand.dump().c_str();
standMessage.append({'\x1F'});
int sendResult = send(clientSocket, standMessage.c_str(), standMessage.size(), 0);
if (sendResult == SOCKET_ERROR) {
LogError("Failed to send: " + std::to_string(WSAGetLastError()));
}
sentStandRequest = true;
}
}
} while (bytesReceived > 0);
});
thread->detach();
99 changes: 40 additions & 59 deletions src/plugin/integration/ClientInitialisationManager.cpp
@@ -1,21 +1,21 @@
#include "pch/pch.h"
#include "integration/ClientInitialisationManager.h"
#include "integration/InitialisationSuccessMessage.h"
#include "integration/InitialisationFailureMessage.h"
#include "integration/IntegrationClient.h"
#include "integration/MessageInterface.h"
#include "integration/IntegrationConnection.h"
#include "integration/IntegrationClientManager.h"
#include "ClientInitialisationManager.h"
#include "InitialisationFailureMessage.h"
#include "InitialisationSuccessMessage.h"
#include "IntegrationClient.h"
#include "IntegrationClientManager.h"
#include "IntegrationConnection.h"
#include "time/SystemClock.h"

namespace UKControllerPlugin::Integration {

ClientInitialisationManager::ClientInitialisationManager(std::shared_ptr<IntegrationClientManager> clientManager):
clientManager(std::move(clientManager)) {}
ClientInitialisationManager::ClientInitialisationManager(std::shared_ptr<IntegrationClientManager> clientManager)
: clientManager(std::move(clientManager))
{
}

void ClientInitialisationManager::AddConnection(std::shared_ptr<IntegrationConnection> connection)
{
if (this->connections.count(connection)) {
if (this->connections.count(connection) > 0) {
LogWarning("A duplicate integration client was added");
return;
}
Expand All @@ -25,12 +25,9 @@ namespace UKControllerPlugin::Integration {

void ClientInitialisationManager::TimedEventTrigger()
{
for (
auto connection = this->connections.begin();
connection != this->connections.end();
) {
for (auto connection = this->connections.begin(); connection != this->connections.end();) {
// If it's taken too long to initialise, then kill the connection
if (Time::TimeNow() > connection->second + std::chrono::seconds(10)) {
if (Time::TimeNow() > connection->second + INITIALISATION_TIMEOUT) {
LogInfo("Integration has not initialised in time");
this->connections.erase(connection++);
continue;
Expand All @@ -44,29 +41,29 @@ namespace UKControllerPlugin::Integration {
}

if (this->AttemptInitialisation(connection->first, incomingMessages)) {
if (incomingMessages.size() > 1) {
LogWarning("Dropped some messages received before initialisation completed");
}
this->connections.erase(connection++);
} else {
++connection;
}
}
}

size_t ClientInitialisationManager::CountConnections() const
auto ClientInitialisationManager::CountConnections() const -> size_t
{
return this->connections.size();
}

bool ClientInitialisationManager::AttemptInitialisation(
auto ClientInitialisationManager::AttemptInitialisation(
const std::shared_ptr<IntegrationConnection>& connection,
std::queue<std::shared_ptr<MessageInterface>> incomingMessages
)
std::queue<std::shared_ptr<MessageInterface>> incomingMessages) -> bool
{
while (!incomingMessages.empty()) {
auto validationErrors = ValidateMessage(incomingMessages.front());
if (!validationErrors.empty()) {
LogError(
"Invalid integration initialisation message: " + incomingMessages.front()->ToJson().dump()
);
LogError("Invalid integration initialisation message: " + incomingMessages.front()->ToJson().dump());
SendInitialisationFailureMessage(connection, incomingMessages.front(), validationErrors);
incomingMessages.pop();
continue;
Expand All @@ -80,24 +77,19 @@ namespace UKControllerPlugin::Integration {
return false;
}

std::vector<std::string> ClientInitialisationManager::ValidateMessage(
const std::shared_ptr<MessageInterface>& message
)
auto ClientInitialisationManager::ValidateMessage(const std::shared_ptr<MessageInterface>& message)
-> std::vector<std::string>
{
auto errors = ValidateMessageType(message);
auto dataErrors = ValidateMessageData(message);
errors.insert(
errors.end(),
std::make_move_iterator(dataErrors.cbegin()),
std::make_move_iterator(dataErrors.cend())
);
errors.end(), std::make_move_iterator(dataErrors.cbegin()), std::make_move_iterator(dataErrors.cend()));

return errors;
}

std::vector<std::string> ClientInitialisationManager::ValidateMessageType(
const std::shared_ptr<MessageInterface>& message
)
auto ClientInitialisationManager::ValidateMessageType(const std::shared_ptr<MessageInterface>& message)
-> std::vector<std::string>
{
std::vector<std::string> errors;
auto messageType = message->GetMessageType();
Expand All @@ -112,22 +104,20 @@ namespace UKControllerPlugin::Integration {
return errors;
}

std::vector<std::string> ClientInitialisationManager::ValidateMessageData(
const std::shared_ptr<MessageInterface>& message
)
auto ClientInitialisationManager::ValidateMessageData(const std::shared_ptr<MessageInterface>& message)
-> std::vector<std::string>
{
auto messageData = message->GetMessageData();
std::vector<std::string> errors = ValidateIntegrationDetails(messageData);
auto eventSubscriptionErrors = ValidateEventSubscriptions(messageData);
errors.insert(
errors.end(),
std::make_move_iterator(eventSubscriptionErrors.cbegin()),
std::make_move_iterator(eventSubscriptionErrors.cend())
);
std::make_move_iterator(eventSubscriptionErrors.cend()));
return errors;
}

std::vector<std::string> ClientInitialisationManager::ValidateIntegrationDetails(const nlohmann::json& data)
auto ClientInitialisationManager::ValidateIntegrationDetails(const nlohmann::json& data) -> std::vector<std::string>
{
std::vector<std::string> errors;
if (!data.contains("integration_name") || !data.at("integration_name").is_string()) {
Expand All @@ -141,7 +131,7 @@ namespace UKControllerPlugin::Integration {
return errors;
}

std::vector<std::string> ClientInitialisationManager::ValidateEventSubscriptions(const nlohmann::json& data)
auto ClientInitialisationManager::ValidateEventSubscriptions(const nlohmann::json& data) -> std::vector<std::string>
{
std::vector<std::string> errors;
if (!data.contains("event_subscriptions") || !data.at("event_subscriptions").is_array()) {
Expand Down Expand Up @@ -169,42 +159,33 @@ namespace UKControllerPlugin::Integration {

void ClientInitialisationManager::UpgradeToClient(
const std::shared_ptr<IntegrationConnection>& connection,
const std::shared_ptr<MessageInterface>& initialisationMessage
)
const std::shared_ptr<MessageInterface>& initialisationMessage)
{
auto data = initialisationMessage->GetMessageData();
auto client = std::make_shared<IntegrationClient>(
this->nextIntegrationId++,
data.at("integration_name").get<std::string>(),
data.at("integration_version").get<std::string>(),
connection
);
connection);
for (const auto& interestedEvent : data.at("event_subscriptions")) {
client->AddInterestedMessage(
std::make_shared<MessageType>(MessageType{
interestedEvent.at("type").get<std::string>(),
interestedEvent.at("version").get<int>()
})
);
client->AddInterestedMessage(std::make_shared<MessageType>(
MessageType{interestedEvent.at("type").get<std::string>(), interestedEvent.at("version").get<int>()}));
}
this->clientManager->AddClient(client);
}

void ClientInitialisationManager::SendInitialisationSuccessMessage(
const std::shared_ptr<IntegrationConnection>& connection,
const std::shared_ptr<MessageInterface>& initialisationMessage
) {
const std::shared_ptr<MessageInterface>& initialisationMessage)
{
connection->Send(std::make_shared<InitialisationSuccessMessage>(initialisationMessage->GetMessageId()));
}

void ClientInitialisationManager::SendInitialisationFailureMessage(
const std::shared_ptr<IntegrationConnection>& connection,
const std::shared_ptr<MessageInterface>& initialisationMessage,
const std::vector<std::string>& errors
) {
connection->Send(std::make_shared<InitialisationFailureMessage>(
initialisationMessage->GetMessageId(),
errors
));
const std::vector<std::string>& errors)
{
connection->Send(std::make_shared<InitialisationFailureMessage>(initialisationMessage->GetMessageId(), errors));
}
} // namespace UKControllerPlugin::Integration
92 changes: 43 additions & 49 deletions src/plugin/integration/ClientInitialisationManager.h
Expand Up @@ -14,67 +14,61 @@ namespace UKControllerPlugin::Integration {
class ClientInitialisationManager : public TimedEvent::AbstractTimedEvent
{
public:
explicit ClientInitialisationManager(std::shared_ptr<IntegrationClientManager> clientManager);
~ClientInitialisationManager() override = default;
void AddConnection(std::shared_ptr<IntegrationConnection> connection);
void TimedEventTrigger() override;
size_t CountConnections() const;
explicit ClientInitialisationManager(std::shared_ptr<IntegrationClientManager> clientManager);
void AddConnection(std::shared_ptr<IntegrationConnection> connection);
void TimedEventTrigger() override;
[[nodiscard]] auto CountConnections() const -> size_t;

const inline static std::string VALIDATION_ERROR_INVALID_TYPE =
"Initialisation messages must have type \"initialise\"";
const inline static std::string VALIDATION_ERROR_INVALID_TYPE =
"Initialisation messages must have type \"initialise\"";

const inline static std::string VALIDATION_ERROR_INVALID_VERSION =
"Initialisation messages must have version 1";
const inline static std::string VALIDATION_ERROR_INVALID_VERSION =
"Initialisation messages must have version 1";

const inline static std::string VALIDATION_ERROR_INVALID_INTEGRATION_NAME =
"Invalid integration name";
const inline static std::string VALIDATION_ERROR_INVALID_INTEGRATION_NAME = "Invalid integration name";

const inline static std::string VALIDATION_ERROR_INVALID_INTEGRATION_VERSION =
"Invalid integration version";
const inline static std::string VALIDATION_ERROR_INVALID_INTEGRATION_VERSION = "Invalid integration version";

const inline static std::string VALIDATION_ERROR_INVALID_SUBSCRIPTIONS =
"Expected array of integration subscription";
const inline static std::string VALIDATION_ERROR_INVALID_SUBSCRIPTIONS =
"Expected array of integration subscription";

const inline static std::string VALIDATION_ERROR_INVALID_SUBSCRIPTION =
"Invalid subscription - must be an object";
const inline static std::string VALIDATION_ERROR_INVALID_SUBSCRIPTION =
"Invalid subscription - must be an object";

const inline static std::string VALIDATION_ERROR_INVALID_SUBSCRIPTION_TYPE =
"Invalid subscription type";
const inline static std::string VALIDATION_ERROR_INVALID_SUBSCRIPTION_TYPE = "Invalid subscription type";

const inline static std::string VALIDATION_ERROR_INVALID_SUBSCRIPTION_VERSION =
"Invalid subscription version";
const inline static std::string VALIDATION_ERROR_INVALID_SUBSCRIPTION_VERSION = "Invalid subscription version";

private:
bool AttemptInitialisation(
const std::shared_ptr<IntegrationConnection>& connection,
std::queue<std::shared_ptr<MessageInterface>> incomingMessages
);
static std::vector<std::string> ValidateMessage(const std::shared_ptr<MessageInterface>& message);
static std::vector<std::string> ValidateMessageType(const std::shared_ptr<MessageInterface>& message);
static std::vector<std::string> ValidateMessageData(const std::shared_ptr<MessageInterface>& message);
static std::vector<std::string> ValidateIntegrationDetails(const nlohmann::json& data);
static std::vector<std::string> ValidateEventSubscriptions(const nlohmann::json& data);
void UpgradeToClient(
const std::shared_ptr<IntegrationConnection>& connection,
const std::shared_ptr<MessageInterface>& initialisationMessage
);
static void SendInitialisationSuccessMessage(
const std::shared_ptr<IntegrationConnection>& connection,
const std::shared_ptr<MessageInterface>& initialisationMessage
);
static void SendInitialisationFailureMessage(
const std::shared_ptr<IntegrationConnection>& connection,
const std::shared_ptr<MessageInterface>& initialisationMessage,
const std::vector<std::string>& errors
);
auto AttemptInitialisation(
const std::shared_ptr<IntegrationConnection>& connection,
std::queue<std::shared_ptr<MessageInterface>> incomingMessages) -> bool;
static auto ValidateMessage(const std::shared_ptr<MessageInterface>& message) -> std::vector<std::string>;
static auto ValidateMessageType(const std::shared_ptr<MessageInterface>& message) -> std::vector<std::string>;
static auto ValidateMessageData(const std::shared_ptr<MessageInterface>& message) -> std::vector<std::string>;
static auto ValidateIntegrationDetails(const nlohmann::json& data) -> std::vector<std::string>;
static auto ValidateEventSubscriptions(const nlohmann::json& data) -> std::vector<std::string>;
void UpgradeToClient(
const std::shared_ptr<IntegrationConnection>& connection,
const std::shared_ptr<MessageInterface>& initialisationMessage);
static void SendInitialisationSuccessMessage(
const std::shared_ptr<IntegrationConnection>& connection,
const std::shared_ptr<MessageInterface>& initialisationMessage);
static void SendInitialisationFailureMessage(
const std::shared_ptr<IntegrationConnection>& connection,
const std::shared_ptr<MessageInterface>& initialisationMessage,
const std::vector<std::string>& errors);

// Manages clients once fully initialised
std::shared_ptr<IntegrationClientManager> clientManager;
// Manages clients once fully initialised
std::shared_ptr<IntegrationClientManager> clientManager;

// Clients that are fully initialised
std::map<std::shared_ptr<IntegrationConnection>, std::chrono::system_clock::time_point> connections;
// Clients that are fully initialised
std::map<std::shared_ptr<IntegrationConnection>, std::chrono::system_clock::time_point> connections;

// The next unique id for the integration
int nextIntegrationId = 1;
// The next unique id for the integration
int nextIntegrationId = 1;

// How long an integration has to initialise before we drop the connection
const std::chrono::seconds INITIALISATION_TIMEOUT = std::chrono::seconds(10);
};
} // namespace UKControllerPlugin::Integration

0 comments on commit 7d85c9b

Please sign in to comment.