Skip to content

Commit

Permalink
AP_DDS: Added ROS 2 service support
Browse files Browse the repository at this point in the history
  • Loading branch information
arshPratap committed Aug 9, 2023
1 parent b0cede3 commit 073ec4e
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 25 deletions.
156 changes: 134 additions & 22 deletions libraries/AP_DDS/AP_DDS_Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
#include <GCS_MAVLink/GCS.h>
#include <AP_BattMonitor/AP_BattMonitor.h>
#include <AP_AHRS/AP_AHRS.h>
#include <AP_Arming/AP_Arming.h>

#include "AP_DDS_Client.h"
#include "AP_DDS_Topic_Table.h"
#include "AP_DDS_Service_Table.h"

static constexpr uint16_t DELAY_TIME_TOPIC_MS = 10;
static constexpr uint16_t DELAY_BATTERY_STATE_TOPIC_MS = 1000;
Expand All @@ -19,6 +22,7 @@ static constexpr uint16_t DELAY_LOCAL_VELOCITY_TOPIC_MS = 33;
static constexpr uint16_t DELAY_GEO_POSE_TOPIC_MS = 33;
static constexpr uint16_t DELAY_CLOCK_TOPIC_MS = 10;
static char WGS_84_FRAME_ID[] = "WGS-84";

// https://www.ros.org/reps/rep-0105.html#base-link
static char BASE_LINK_FRAME_ID[] = "base_link";

Expand All @@ -29,6 +33,8 @@ sensor_msgs_msg_Joy AP_DDS_Client::rx_joy_topic {};
tf2_msgs_msg_TFMessage AP_DDS_Client::rx_dynamic_transforms_topic {};
geometry_msgs_msg_TwistStamped AP_DDS_Client::rx_velocity_control_topic {};

#define debug(fmt, args ...) do { if (debug_level) { GCS_SEND_TEXT(MAV_SEVERITY_INFO, "DDS Client: " fmt, ## args); } } while (0)

const AP_Param::GroupInfo AP_DDS_Client::var_info[] {

// @Param: _ENABLE
Expand All @@ -47,13 +53,19 @@ const AP_Param::GroupInfo AP_DDS_Client::var_info[] {
// @RebootRequired: True
// @User: Standard
AP_GROUPINFO("_PORT", 2, AP_DDS_Client, udp.port, 2019),

// @Param: _DEBUG
// @DisplayName: DDS Debug level
// @Description: When set to 1 enables debug output. This can be used to diagnose failures.
// @Values: 0:Disabled,1:Enabled
// @RebootRequired: True
// @User: Standard
AP_GROUPINFO("_DEBUG", 3, AP_DDS_Client, debug_level, 0),
#endif

AP_GROUPEND
};

#include "AP_DDS_Topic_Table.h"

void AP_DDS_Client::update_topic(builtin_interfaces_msg_Time& msg)
{
uint64_t utc_usec;
Expand Down Expand Up @@ -409,15 +421,24 @@ bool AP_DDS_Client::start(void)
}

// read function triggered at every subscription callback
void AP_DDS_Client::on_topic (uxrSession* uxr_session, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* ub, uint16_t length, void* args)
void AP_DDS_Client::on_topic_trampoline(uxrSession* uxr_session, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* ub, uint16_t length,
void* args)
{
AP_DDS_Client *dds = (AP_DDS_Client *)args;
dds->on_topic(uxr_session, object_id, request_id, stream_id, ub, length);
}

void AP_DDS_Client::on_topic(uxrSession* uxr_session, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* ub, uint16_t length)
{
(void) uxr_session; (void) object_id; (void) request_id; (void) stream_id; (void) length;
/*
TEMPLATE for reading to the subscribed topics
1) Store the read contents into the ucdr buffer
2) Deserialize the said contents into the topic instance
*/

(void) uxr_session;
(void) request_id;
(void) stream_id;
(void) length;
switch (object_id.id) {
case topics[to_underlying(TopicIndex::JOY_SUB)].dr_id.id: {
const bool success = sensor_msgs_msg_Joy_deserialize_topic(ub, &rx_joy_topic);
Expand All @@ -426,14 +447,13 @@ void AP_DDS_Client::on_topic (uxrSession* uxr_session, uxrObjectId object_id, ui
break;
}

uint32_t* count_ptr = (uint32_t*) args;
(*count_ptr)++;
subscribe_sample_count++;
if (rx_joy_topic.axes_size >= 4) {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"Received sensor_msgs/Joy: %f, %f, %f, %f",
debug("Received sensor_msgs/Joy: %f, %f, %f, %f",
rx_joy_topic.axes[0], rx_joy_topic.axes[1], rx_joy_topic.axes[2], rx_joy_topic.axes[3]);
// TODO implement joystick RC control to AP
} else {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"Received sensor_msgs/Joy: Insufficient axes size ");
debug("Received sensor_msgs/Joy: Insufficient axes size ");
}
break;
}
Expand All @@ -442,14 +462,12 @@ void AP_DDS_Client::on_topic (uxrSession* uxr_session, uxrObjectId object_id, ui
if (success == false) {
break;
}
uint32_t* count_ptr = (uint32_t*) args;
(*count_ptr)++;
if (rx_dynamic_transforms_topic.transforms_size > 0) {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"Received tf2_msgs/TFMessage of length: %u",
debug("Received tf2_msgs/TFMessage of length: %u",
static_cast<unsigned>(rx_dynamic_transforms_topic.transforms_size));
// TODO implement external odometry to AP
} else {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"Received tf2_msgs/TFMessage: Insufficient size ");
debug("Received tf2_msgs/TFMessage: Insufficient size ");
}
break;
}
Expand All @@ -468,6 +486,64 @@ void AP_DDS_Client::on_topic (uxrSession* uxr_session, uxrObjectId object_id, ui

}

/*
callback on request completion
*/
void AP_DDS_Client::on_request_trampoline(uxrSession* uxr_session, uxrObjectId object_id, uint16_t request_id, SampleIdentity* sample_id, ucdrBuffer* ub, uint16_t length, void* args)
{
AP_DDS_Client *dds = (AP_DDS_Client *)args;
dds->on_request(uxr_session, object_id, request_id, sample_id, ub, length);
}

void AP_DDS_Client::on_request(uxrSession* uxr_session, uxrObjectId object_id, uint16_t request_id, SampleIdentity* sample_id, ucdrBuffer* ub, uint16_t length)
{
(void) request_id;
(void) length;
switch (object_id.id) {
case services[to_underlying(ServiceIndex::ARMING_MOTORS)].rep_id: {
bool arm;
bool result;
const bool deserialize_success = ucdr_deserialize_bool(ub,&arm);
if (deserialize_success == false) {
break;
}

if (arm) {
debug("Request for arming received");
result = AP::arming().arm(AP_Arming::Method::DDS);
} else {
debug("Request for disarming received");
result = AP::arming().disarm(AP_Arming::Method::DDS);
}

const uxrObjectId replier_id = {
.id = services[to_underlying(ServiceIndex::ARMING_MOTORS)].rep_id,
.type = UXR_REPLIER_ID
};

//Todo : Fix the size-handling of services with the help of the functions autogenerated via Micro-XRCE-DDS Gen
uint8_t reply_buffer[8] {};
ucdrBuffer reply_ub;

ucdr_init_buffer(&reply_ub, reply_buffer, sizeof(reply_buffer));
const bool serialize_success = ucdr_serialize_bool(&reply_ub,result);
if (serialize_success == false) {
break;
}

request_sample_count++;

uxr_buffer_reply(uxr_session, reliable_out, replier_id, sample_id, reply_buffer, ucdr_buffer_length(&reply_ub));
if (result) {
debug("DDS Client: Request for Arming/Disarming : SUCCESS");
} else {
debug("DDS Client: Request for Arming/Disarmign : FAIL");
}
break;
}
}
}

/*
main loop for DDS thread
*/
Expand Down Expand Up @@ -507,7 +583,10 @@ bool AP_DDS_Client::init()
}

// Register topic callbacks
uxr_set_topic_callback(&session, AP_DDS_Client::on_topic, &count);
uxr_set_topic_callback(&session, AP_DDS_Client::on_topic_trampoline, this);

// ROS-2 Service : Register service request callbacks
uxr_set_request_callback(&session, AP_DDS_Client::on_request_trampoline, this);

while (!uxr_create_session(&session)) {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"DDS Client: Initialization waiting...");
Expand Down Expand Up @@ -559,7 +638,7 @@ bool AP_DDS_Client::create()
return false;
}

for (size_t i = 0 ; i < ARRAY_SIZE(topics); i++) {
for (uint16_t i = 0 ; i < ARRAY_SIZE(topics); i++) {
// Topic
const uxrObjectId topic_id = {
.id = topics[i].topic_id,
Expand All @@ -571,7 +650,7 @@ bool AP_DDS_Client::create()
// Status requests
constexpr uint8_t nRequests = 3;
uint16_t requests[nRequests];
constexpr uint16_t requestTimeoutMs = (uint8_t) nRequests * maxTimeMsPerRequestMs;
constexpr uint16_t requestTimeoutMs = nRequests * maxTimeMsPerRequestMs;
uint8_t status[nRequests];

if (strlen(topics[i].dw_profile_label) > 0) {
Expand All @@ -593,14 +672,14 @@ bool AP_DDS_Client::create()
requests[2] = dwriter_req_id;

if (!uxr_run_session_until_all_status(&session, requestTimeoutMs, requests, status, nRequests)) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Topic/Pub/Writer session request failure for index 'TODO'");
debug("XRCE Client: Topic/Pub/Writer session request failure for index '%u'",i);
for (uint8_t s = 0 ; s < nRequests; s++) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Status '%d' result '%u'", s, status[s]);
debug("XRCE Client: Status '%d' result '%u'", s, status[s]);
}
// TODO add a failure log message sharing the status results
return false;
} else {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"XRCE Client: Topic/Pub/Writer session pass for index 'TOOO'");
debug("XRCE Client: Topic/Pub/Writer session pass for index '%u'",i);
}
} else if (strlen(topics[i].dr_profile_label) > 0) {
// Subscriber
Expand All @@ -621,18 +700,51 @@ bool AP_DDS_Client::create()
requests[2] = dreader_req_id;

if (!uxr_run_session_until_all_status(&session, requestTimeoutMs, requests, status, nRequests)) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Topic/Sub/Reader session request failure for index '%d'",(int)i);
debug("XRCE Client: Topic/Sub/Reader session request failure for index '%u'",i);
for (uint8_t s = 0 ; s < nRequests; s++) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Status '%d' result '%u'", s, status[s]);
debug("XRCE Client: Status '%d' result '%u'", s, status[s]);
}
// TODO add a failure log message sharing the status results
return false;
} else {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"XRCE Client: Topic/Sub/Reader session pass for index '%d'",(int)i);
debug("XRCE Client: Topic/Sub/Reader session pass for index '%u'",i);
uxr_buffer_request_data(&session, reliable_out, topics[i].dr_id, reliable_in, &delivery_control);
}
}
}

// ROS-2 Service : else case for service requests

for (uint16_t i = 0; i < ARRAY_SIZE(services); i++) {

constexpr uint16_t requestTimeoutMs = maxTimeMsPerRequestMs;

if (strlen(services[i].rep_profile_label) > 0) {
const uxrObjectId rep_id = {
.id = services[i].rep_id,
.type = UXR_REPLIER_ID
};
const char* replier_ref = services[i].rep_profile_label;
const auto replier_req_id = uxr_buffer_create_replier_ref(&session, reliable_out, rep_id, participant_id, replier_ref, UXR_REPLACE);

uint16_t request = replier_req_id;
uint8_t status;

if (!uxr_run_session_until_all_status(&session, requestTimeoutMs, &request, &status, 1)) {
debug("XRCE Client: Service/Replier session request failure for index '%u'",i);
debug("XRCE Client: Status result '%u'", status);
// TODO add a failure log message sharing the status results
return false;
} else {
debug("XRCE Client: Service/Replier session pass for index '%u'",i);
uxr_buffer_request_data(&session, reliable_out, rep_id, reliable_in, &delivery_control);
}

} else if (strlen(services[i].req_profile_label) > 0) {
// TODO : Add Similar Code for Requester Profile
}
}

return true;
}

Expand Down
30 changes: 27 additions & 3 deletions libraries/AP_DDS/AP_DDS_Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class AP_DDS_Client
private:

AP_Int8 enabled;
AP_Int8 debug_level;

// Serial Allocation
uxrSession session; //Session
Expand Down Expand Up @@ -87,10 +88,16 @@ class AP_DDS_Client
static void update_topic(rosgraph_msgs_msg_Clock& msg);

// subscription callback function
static void on_topic(uxrSession* session, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* ub, uint16_t length, void* args);

static void on_topic_trampoline(uxrSession* session, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* ub, uint16_t length, void* args);
void on_topic(uxrSession* session, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* ub, uint16_t length);
// count of subscribed samples
uint32_t count;
uint32_t subscribe_sample_count;

// service replier callback function
static void on_request_trampoline(uxrSession* session, uxrObjectId object_id, uint16_t request_id, SampleIdentity* sample_id, ucdrBuffer* ub, uint16_t length, void* args);
void on_request(uxrSession* session, uxrObjectId object_id, uint16_t request_id, SampleIdentity* sample_id, ucdrBuffer* ub, uint16_t length);
// count of request samples
uint32_t request_sample_count;

// delivery control parameters
uxrDeliveryControl delivery_control {
Expand Down Expand Up @@ -195,7 +202,24 @@ class AP_DDS_Client
};
static const struct Topic_table topics[];

//! @brief Convenience grouping for a single "channel" of services
struct Service_table {
//! @brief Request ID for the service
const uint8_t req_id;

//! @brief Reply ID for the service
const uint8_t rep_id;

//! @brief Profile Label for the service
const char* srv_profile_label;

//! @brief Profile Label for the service requester
const char* req_profile_label;

//! @brief Profile Label for the service replier
const char* rep_profile_label;
};
static const struct Service_table services[];
};

#endif // AP_DDS_ENABLED
Expand Down
21 changes: 21 additions & 0 deletions libraries/AP_DDS/AP_DDS_Service_Table.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#include "uxr/client/client.h"

enum class ServiceIndex: uint8_t {
ARMING_MOTORS
};

static inline constexpr uint8_t to_underlying(const ServiceIndex index)
{
static_assert(sizeof(index) == sizeof(uint8_t));
return static_cast<uint8_t>(index);
}

constexpr struct AP_DDS_Client::Service_table AP_DDS_Client::services[] = {
{
.req_id = to_underlying(ServiceIndex::ARMING_MOTORS),
.rep_id = to_underlying(ServiceIndex::ARMING_MOTORS),
.srv_profile_label = "ArmMotorsService",
.req_profile_label = "",
.rep_profile_label = "ArmMotors_Replier",
}
};
36 changes: 36 additions & 0 deletions libraries/AP_DDS/Is-Config/Arm_Motors_DDS_IS_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
types:
idls:
- >
struct ArmMotors_Request
{
boolean arm;
};
struct ArmMotors_Response
{
boolean result;
};
systems:
dds: { type: fastdds }
ros2: { type: ros2 }

routes:
dds_server:
server: dds
clients: ros2

services:
arm_motors: {
request_type: ArmMotors_Request,
reply_type: ArmMotors_Response,
route: dds_server,
remap: {
dds: {
topic: ArmMotorsService,
},
ros2: {
request_type: "ardupilot_msgs/ArmMotors:request",
reply_type: "ardupilot_msgs/ArmMotors:response"
}
}
}

0 comments on commit 073ec4e

Please sign in to comment.