Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions source_common/comms/comms_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class CommsInterface
*
* @return Returns @c true if connected, @c false otherwise.
*/
virtual bool is_connected() = 0;
virtual bool isConnected() = 0;

/**
* @brief Get the service endpoint address for the named service.
Expand All @@ -79,7 +79,7 @@ class CommsInterface
*
* @return The service address, or @c NO_ENDPOINT if service is not found.
*/
virtual EndpointID get_endpoint_id(
virtual EndpointID getEndpointID(
const std::string& name) = 0;

/**
Expand All @@ -91,7 +91,7 @@ class CommsInterface
* @param endpoint The address of the destination service.
* @param data The data to transmit.
*/
virtual void tx_async(
virtual void txAsync(
EndpointID endpoint,
std::unique_ptr<MessageData> data) = 0;

Expand Down Expand Up @@ -122,7 +122,7 @@ class CommsInterface
*
* @return The response message data payload.
*/
virtual std::unique_ptr<MessageData> tx_rx(
virtual std::unique_ptr<MessageData> txRx(
EndpointID endpoint,
std::unique_ptr<MessageData> data) = 0;
};
Expand Down
46 changes: 23 additions & 23 deletions source_common/comms/comms_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ enum class MessageType: uint8_t {
*/
typedef struct __attribute__((packed))
{
uint8_t message_type; // Is this tx_async (0), tx (1), or tx_rx (2)?
uint8_t endpoint_id; // The endpoint service address.
uint64_t message_id; // The unique message ID for a tx_rx pair.
uint32_t payload_size; // The size of the payload in bytes.
uint8_t messageType; // Is this tx_async (0), tx (1), or tx_rx (2)?
uint8_t endpointID; // The endpoint service address.
uint64_t messageID; // The unique message ID for a tx_rx pair.
uint32_t payloadSize; // The size of the payload in bytes.
} MessageHeader;

/**
Expand All @@ -74,53 +74,53 @@ class Message: public Task
/**
* @brief Construct a new message.
*
* @param endpoint_id The destination endpoint.
* @param message_type The type of the message.
* @param message_id The sequence ID of the message.
* @param transmit_data The data to transmit.
* @param endpointID The destination endpoint.
* @param messageType The type of the message.
* @param messageID The sequence ID of the message.
* @param transmitData The data to transmit.
*/
Message(
EndpointID endpoint_id,
MessageType message_type,
MessageID message_id,
std::unique_ptr<MessageData> transmit_data) :
endpoint_id(endpoint_id),
message_type(message_type),
message_id(message_id),
transmit_data(std::move(transmit_data)) { }
EndpointID endpointID,
MessageType messageType,
MessageID messageID,
std::unique_ptr<MessageData> transmitData) :
endpointID(endpointID),
messageType(messageType),
messageID(messageID),
transmitData(std::move(transmitData)) { }

/**
* @brief The type of the message.
*/
EndpointID endpoint_id;
EndpointID endpointID;

/**
* @brief The type of the message.
*/
MessageType message_type;
MessageType messageType;

/**
* @brief The sequence ID of the message.
*
* Only required if @c message_type is @c TX_RX and we have to match a
* Only required if @c messageType is @c TX_RX and we have to match a
* response to a triggering message.
*/
MessageID message_id;
MessageID messageID;

/**
* @brief The data to transmit.
*
* Can be reset and data discarded once the data is transmitted.
*/
std::unique_ptr<MessageData> transmit_data;
std::unique_ptr<MessageData> transmitData;

/**
* @brief The data that was received.
*
* Only present if @c message_type is @c TX_RX and we have received a
* Only present if @c messageType is @c TX_RX and we have received a
* response from the host.
*/
std::unique_ptr<MessageData> response_data;
std::unique_ptr<MessageData> responseData;
};

}
62 changes: 31 additions & 31 deletions source_common/comms/comms_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ namespace Comms

/** See header for documentation. */
CommsModule::CommsModule(
const std::string& domain_address
const std::string& domainAddress
) {
sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sockfd < 0)
Expand All @@ -52,17 +52,17 @@ CommsModule::CommsModule(
return;
}

struct sockaddr_un serv_addr {};
serv_addr.sun_family = AF_UNIX;
struct sockaddr_un servAddr {};
servAddr.sun_family = AF_UNIX;

// Copy the domain address, inserting leading NUL needed for abstract UDS
std::strcpy(serv_addr.sun_path + 1, domain_address.c_str());
serv_addr.sun_path[0] = '\0';
std::strcpy(servAddr.sun_path + 1, domainAddress.c_str());
servAddr.sun_path[0] = '\0';

int conn = connect(
sockfd,
reinterpret_cast<const struct sockaddr*>(&serv_addr),
sizeof(serv_addr));
reinterpret_cast<const struct sockaddr*>(&servAddr),
sizeof(servAddr));
if (conn != 0)
{
std::cout << " - ERROR: Client connection failed" << std::endl;
Expand All @@ -77,7 +77,7 @@ CommsModule::CommsModule(

/** See header for documentation. */
CommsModule::CommsModule(
const std::string& host_address,
const std::string& hostAddress,
int port
) {
sockfd = socket(AF_INET, SOCK_STREAM, 0);
Expand All @@ -87,15 +87,15 @@ CommsModule::CommsModule(
return;
}

struct sockaddr_in serv_addr {};
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(port);
serv_addr.sin_addr.s_addr = inet_addr(host_address.c_str());
struct sockaddr_in servAddr {};
servAddr.sin_family = AF_INET;
servAddr.sin_port = htons(port);
servAddr.sin_addr.s_addr = inet_addr(hostAddress.c_str());

int conn = connect(
sockfd,
reinterpret_cast<const struct sockaddr*>(&serv_addr),
sizeof(serv_addr));
reinterpret_cast<const struct sockaddr*>(&servAddr),
sizeof(servAddr));
if (conn != 0)
{
std::cout << " - ERROR: Client connection failed" << std::endl;
Expand Down Expand Up @@ -132,21 +132,21 @@ CommsModule::~CommsModule()
}

/** See header for documentation. */
bool CommsModule::is_connected()
bool CommsModule::isConnected()
{
return sockfd >= 0;
}

/** See header for documentation. */
EndpointID CommsModule::get_endpoint_id(
EndpointID CommsModule::getEndpointID(
const std::string& name
) {
std::lock_guard<std::mutex> lock(registry_lock);
std::lock_guard<std::mutex> lock(registryLock);
if (registry.empty())
{
// Request the registry from the host
auto data = std::make_unique<std::vector<uint8_t>>();
auto resp = tx_rx(0, std::move(data));
auto resp = txRx(0, std::move(data));

// Process the response
while (resp->size())
Expand Down Expand Up @@ -192,7 +192,7 @@ EndpointID CommsModule::get_endpoint_id(
}

/** See header for documentation. */
void CommsModule::tx_async(
void CommsModule::txAsync(
EndpointID endpoint,
std::unique_ptr<MessageData> data
) {
Expand All @@ -202,7 +202,7 @@ void CommsModule::tx_async(
0,
std::move(data));

enqueue_message(std::move(message));
enqueueMessage(std::move(message));
}

/** See header for documentation. */
Expand All @@ -216,44 +216,44 @@ void CommsModule::tx(
0,
std::move(data));

enqueue_message(message);
enqueueMessage(message);
message->wait();
}

/** See header for documentation. */
std::unique_ptr<MessageData> CommsModule::tx_rx(
std::unique_ptr<MessageData> CommsModule::txRx(
EndpointID endpoint,
std::unique_ptr<MessageData> data
) {
auto message = std::make_shared<Message>(
endpoint,
MessageType::TX_RX,
assign_message_id(),
assignMessageID(),
std::move(data));

enqueue_message(message);
enqueueMessage(message);
message->wait();

return std::move(message->response_data);
return std::move(message->responseData);
}

/** See header for documentation. */
MessageID CommsModule::assign_message_id()
MessageID CommsModule::assignMessageID()
{
return next_message_id.fetch_add(1, std::memory_order_relaxed);
return nextMessageID.fetch_add(1, std::memory_order_relaxed);
}

/** See header for documentation. */
void CommsModule::enqueue_message(
void CommsModule::enqueueMessage(
std::shared_ptr<Message> message
) {
message_queue.put(std::move(message));
messageQueue.put(std::move(message));
}

/** See header for documentation. */
std::shared_ptr<Message> CommsModule::dequeue_message()
std::shared_ptr<Message> CommsModule::dequeueMessage()
{
return message_queue.get();
return messageQueue.get();
}

}
Loading