Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DDS: Added subscriber support #23629

Merged
merged 1 commit into from Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
116 changes: 91 additions & 25 deletions libraries/AP_DDS/AP_DDS_Client.cpp
Expand Up @@ -401,6 +401,32 @@ bool AP_DDS_Client::start(void)
return true;
}

// read function triggered at every subscription callback
void AP_DDS_Client::on_topic (uxrSession* uxr_session, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* ub, uint16_t length, void* args)
{
(void) uxr_session; (void) object_id; (void) request_id; (void) stream_id; (void) length;
/*
TEMPLATE for reading to the subscribed topics
1) Store the read contents into the ucdr buffer
2) Deserialize the said contents into the topic instance
*/
sensor_msgs_msg_Joy* topic = nullptr;
const bool success = sensor_msgs_msg_Joy_deserialize_topic(ub, topic);

if (success == false || topic == nullptr) {
return;
}

uint32_t* count_ptr = (uint32_t*) args;
(*count_ptr)++;
if (topic->axes_size >= 4) {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"Receivied sensor_msgs/Joy: %f, %f, %f, %f",
topic->axes[0], topic->axes[1], topic->axes[2], topic->axes[3]);
} else {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"Receivied sensor_msgs/Joy: Insufficient axes size ");
}
}

/*
main loop for DDS thread
*/
Expand Down Expand Up @@ -438,6 +464,10 @@ bool AP_DDS_Client::init()
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"DDS Client: Transport Initialization failed");
return false;
}

// Register topic callbacks
uxr_set_topic_callback(&session, AP_DDS_Client::on_topic, &count);

while (!uxr_create_session(&session)) {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"DDS Client: Initialization waiting...");
hal.scheduler->delay(1000);
Expand Down Expand Up @@ -479,8 +509,8 @@ bool AP_DDS_Client::create()
constexpr uint8_t nRequestsParticipant = 1;
const uint16_t requestsParticipant[nRequestsParticipant] = {participant_req_id};

constexpr int maxTimeMsPerRequestMs = 250;
constexpr int requestTimeoutParticipantMs = nRequestsParticipant * maxTimeMsPerRequestMs;
constexpr uint8_t maxTimeMsPerRequestMs = 250;
constexpr uint16_t requestTimeoutParticipantMs = (uint16_t) nRequestsParticipant * maxTimeMsPerRequestMs;
uint8_t statusParticipant[nRequestsParticipant];
if (!uxr_run_session_until_all_status(&session, requestTimeoutParticipantMs, requestsParticipant, statusParticipant, nRequestsParticipant)) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Participant session request failure");
Expand All @@ -497,35 +527,71 @@ bool AP_DDS_Client::create()
const char* topic_ref = topics[i].topic_profile_label;
const auto topic_req_id = uxr_buffer_create_topic_ref(&session,reliable_out,topic_id,participant_id,topic_ref,UXR_REPLACE);

// Publisher
const uxrObjectId pub_id = {
.id = topics[i].pub_id,
.type = UXR_PUBLISHER_ID
};
const char* pub_xml = "";
const auto pub_req_id = uxr_buffer_create_publisher_xml(&session,reliable_out,pub_id,participant_id,pub_xml,UXR_REPLACE);

// Data Writer
const char* data_writer_ref = topics[i].dw_profile_label;
const auto dwriter_req_id = uxr_buffer_create_datawriter_ref(&session,reliable_out,topics[i].dw_id,pub_id,data_writer_ref,UXR_REPLACE);

// Status requests
constexpr uint8_t nRequests = 3;
const uint16_t requests[nRequests] = {topic_req_id, pub_req_id, dwriter_req_id};
constexpr int requestTimeoutMs = nRequests * maxTimeMsPerRequestMs;
uint16_t requests[nRequests];
constexpr uint16_t requestTimeoutMs = (uint8_t) nRequests * maxTimeMsPerRequestMs;
uint8_t status[nRequests];
if (!uxr_run_session_until_all_status(&session, requestTimeoutMs, requests, status, nRequests)) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Topic/Pub/Writer session request failure for index 'TODO'");
for (int s = 0 ; s < nRequests; s++) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Status '%d' result '%u'", s, status[s]);

if (strlen(topics[i].dw_profile_label) > 0) {
// Publisher
const uxrObjectId pub_id = {
.id = topics[i].pub_id,
.type = UXR_PUBLISHER_ID
};
const char* pub_xml = "";
const auto pub_req_id = uxr_buffer_create_publisher_xml(&session,reliable_out,pub_id,participant_id,pub_xml,UXR_REPLACE);

// Data Writer
const char* data_writer_ref = topics[i].dw_profile_label;
const auto dwriter_req_id = uxr_buffer_create_datawriter_ref(&session,reliable_out,topics[i].dw_id,pub_id,data_writer_ref,UXR_REPLACE);

// save the request statuses
requests[0] = topic_req_id;
requests[1] = pub_req_id;
requests[2] = dwriter_req_id;

if (!uxr_run_session_until_all_status(&session, requestTimeoutMs, requests, status, nRequests)) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Topic/Pub/Writer session request failure for index 'TODO'");
for (uint8_t s = 0 ; s < nRequests; s++) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Status '%d' result '%u'", s, status[s]);
}
// TODO add a failure log message sharing the status results
return false;
} else {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"XRCE Client: Topic/Pub/Writer session pass for index 'TOOO'");
}
} else if (strlen(topics[i].dr_profile_label) > 0) {
// Subscriber
const uxrObjectId sub_id = {
.id = topics[i].sub_id,
.type = UXR_SUBSCRIBER_ID
};
const char* sub_xml = "";
const auto sub_req_id = uxr_buffer_create_subscriber_xml(&session,reliable_out,sub_id,participant_id,sub_xml,UXR_REPLACE);

// Data Reader
const char* data_reader_ref = topics[i].dr_profile_label;
const auto dreader_req_id = uxr_buffer_create_datareader_ref(&session,reliable_out,topics[i].dr_id,sub_id,data_reader_ref,UXR_REPLACE);

// save the request statuses
requests[0] = topic_req_id;
requests[1] = sub_req_id;
requests[2] = dreader_req_id;

if (!uxr_run_session_until_all_status(&session, requestTimeoutMs, requests, status, nRequests)) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Topic/Sub/Reader session request failure for index '%d'",(int)i);
for (uint8_t s = 0 ; s < nRequests; s++) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Status '%d' result '%u'", s, status[s]);
}
// TODO add a failure log message sharing the status results
return false;
} else {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"XRCE Client: Topic/Sub/Reader session pass for index '%d'",(int)i);
uxr_buffer_request_data(&session, reliable_out, topics[i].dr_id, reliable_in, &delivery_control);
}
// TODO add a failure log message sharing the status results
return false;
} else {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"XRCE Client: Topic/Pub/Writer session pass for index 'TOOO'");
}
}

return true;
}

Expand Down
19 changes: 19 additions & 0 deletions libraries/AP_DDS/AP_DDS_Client.h
Expand Up @@ -10,6 +10,7 @@
#include "sensor_msgs/msg/NavSatFix.h"
#include "tf2_msgs/msg/TFMessage.h"
#include "sensor_msgs/msg/BatteryState.h"
#include "sensor_msgs/msg/Joy.h"
#include "geometry_msgs/msg/PoseStamped.h"
#include "geometry_msgs/msg/TwistStamped.h"
#include "geographic_msgs/msg/GeoPoseStamped.h"
Expand Down Expand Up @@ -58,6 +59,7 @@ class AP_DDS_Client
sensor_msgs_msg_NavSatFix nav_sat_fix_topic;
tf2_msgs_msg_TFMessage static_transforms_topic;
sensor_msgs_msg_BatteryState battery_state_topic;
sensor_msgs_msg_Joy joy_topic;
geometry_msgs_msg_PoseStamped local_pose_topic;
geometry_msgs_msg_TwistStamped local_velocity_topic;
geographic_msgs_msg_GeoPoseStamped geo_pose_topic;
Expand All @@ -77,6 +79,20 @@ class AP_DDS_Client
static void update_topic(geographic_msgs_msg_GeoPoseStamped& msg);
static void update_topic(rosgraph_msgs_msg_Clock& msg);

// subscription callback function
static void on_topic(uxrSession* session, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* ub, uint16_t length, void* args);

// count of subscribed samples
uint32_t count;

// delivery control parameters
uxrDeliveryControl delivery_control {
.max_samples = UXR_MAX_SAMPLES_UNLIMITED,
.max_elapsed_time = 0,
.max_bytes_per_second = 0,
.min_pace_period = 0
};

// The last ms timestamp AP_DDS wrote a Time message
uint64_t last_time_time_ms;
// The last ms timestamp AP_DDS wrote a NavSatFix message
Expand Down Expand Up @@ -163,9 +179,12 @@ class AP_DDS_Client
struct Topic_table {
const uint8_t topic_id;
const uint8_t pub_id;
const uint8_t sub_id; // added sub_id fields to avoid confusion
const uxrObjectId dw_id;
const uxrObjectId dr_id; // added dr_id fields to avoid confusion
const char* topic_profile_label;
const char* dw_profile_label;
const char* dr_profile_label;
Generic_serialize_topic_fn_t serialize;
Generic_deserialize_topic_fn_t deserialize;
Generic_size_of_topic_fn_t size_of;
Expand Down
37 changes: 37 additions & 0 deletions libraries/AP_DDS/AP_DDS_Topic_Table.h
Expand Up @@ -16,81 +16,118 @@ const struct AP_DDS_Client::Topic_table AP_DDS_Client::topics[] = {
{
.topic_id = 0x01,
.pub_id = 0x01,
.sub_id = 0x01,
.dw_id = uxrObjectId{.id=0x01, .type=UXR_DATAWRITER_ID},
.dr_id = uxrObjectId{.id=0x01, .type=UXR_DATAREADER_ID},
.topic_profile_label = "time__t",
.dw_profile_label = "time__dw",
.dr_profile_label = "",
.serialize = Generic_serialize_topic_fn_t(&builtin_interfaces_msg_Time_serialize_topic),
.deserialize = Generic_deserialize_topic_fn_t(&builtin_interfaces_msg_Time_deserialize_topic),
.size_of = Generic_size_of_topic_fn_t(&builtin_interfaces_msg_Time_size_of_topic),
},
{
.topic_id = 0x02,
.pub_id = 0x02,
.sub_id = 0x02,
.dw_id = uxrObjectId{.id=0x02, .type=UXR_DATAWRITER_ID},
.dr_id = uxrObjectId{.id=0x02, .type=UXR_DATAREADER_ID},
.topic_profile_label = "navsatfix0__t",
.dw_profile_label = "navsatfix0__dw",
.dr_profile_label = "",
.serialize = Generic_serialize_topic_fn_t(&sensor_msgs_msg_NavSatFix_serialize_topic),
.deserialize = Generic_deserialize_topic_fn_t(&sensor_msgs_msg_NavSatFix_deserialize_topic),
.size_of = Generic_size_of_topic_fn_t(&sensor_msgs_msg_NavSatFix_size_of_topic),
},
{
.topic_id = 0x03,
.pub_id = 0x03,
.sub_id = 0x03,
.dw_id = uxrObjectId{.id=0x03, .type=UXR_DATAWRITER_ID},
.dr_id = uxrObjectId{.id=0x03, .type=UXR_DATAREADER_ID},
.topic_profile_label = "statictransforms__t",
.dw_profile_label = "statictransforms__dw",
.dr_profile_label = "",
.serialize = Generic_serialize_topic_fn_t(&tf2_msgs_msg_TFMessage_serialize_topic),
.deserialize = Generic_deserialize_topic_fn_t(&tf2_msgs_msg_TFMessage_deserialize_topic),
.size_of = Generic_size_of_topic_fn_t(&tf2_msgs_msg_TFMessage_size_of_topic),
},
{
.topic_id = 0x04,
.pub_id = 0x04,
.sub_id = 0x04,
.dw_id = uxrObjectId{.id=0x04, .type=UXR_DATAWRITER_ID},
.dr_id = uxrObjectId{.id=0x04, .type=UXR_DATAREADER_ID},
.topic_profile_label = "batterystate0__t",
.dw_profile_label = "batterystate0__dw",
.dr_profile_label = "",
.serialize = Generic_serialize_topic_fn_t(&sensor_msgs_msg_BatteryState_serialize_topic),
.deserialize = Generic_deserialize_topic_fn_t(&sensor_msgs_msg_BatteryState_deserialize_topic),
.size_of = Generic_size_of_topic_fn_t(&sensor_msgs_msg_BatteryState_size_of_topic),
},
{
.topic_id = 0x05,
.pub_id = 0x05,
.sub_id = 0x05,
.dw_id = uxrObjectId{.id=0x05, .type=UXR_DATAWRITER_ID},
.dr_id = uxrObjectId{.id=0x05, .type=UXR_DATAREADER_ID},
.topic_profile_label = "localpose__t",
.dw_profile_label = "localpose__dw",
.dr_profile_label = "",
.serialize = Generic_serialize_topic_fn_t(&geometry_msgs_msg_PoseStamped_serialize_topic),
.deserialize = Generic_deserialize_topic_fn_t(&geometry_msgs_msg_PoseStamped_deserialize_topic),
.size_of = Generic_size_of_topic_fn_t(&geometry_msgs_msg_PoseStamped_size_of_topic),
},
{
.topic_id = 0x06,
.pub_id = 0x06,
.sub_id = 0x06,
.dw_id = uxrObjectId{.id=0x06, .type=UXR_DATAWRITER_ID},
.dr_id = uxrObjectId{.id=0x06, .type=UXR_DATAREADER_ID},
.topic_profile_label = "localvelocity__t",
.dw_profile_label = "localvelocity__dw",
.dr_profile_label = "",
.serialize = Generic_serialize_topic_fn_t(&geometry_msgs_msg_TwistStamped_serialize_topic),
.deserialize = Generic_deserialize_topic_fn_t(&geometry_msgs_msg_TwistStamped_deserialize_topic),
.size_of = Generic_size_of_topic_fn_t(&geometry_msgs_msg_TwistStamped_size_of_topic),
},
{
.topic_id = 0x07,
.pub_id = 0x07,
.sub_id = 0x07,
.dw_id = uxrObjectId{.id=0x07, .type=UXR_DATAWRITER_ID},
.dr_id = uxrObjectId{.id=0x07, .type=UXR_DATAREADER_ID},
.topic_profile_label = "geopose__t",
.dw_profile_label = "geopose__dw",
.dr_profile_label = "",
.serialize = Generic_serialize_topic_fn_t(&geographic_msgs_msg_GeoPoseStamped_serialize_topic),
.deserialize = Generic_deserialize_topic_fn_t(&geographic_msgs_msg_GeoPoseStamped_deserialize_topic),
.size_of = Generic_size_of_topic_fn_t(&geographic_msgs_msg_GeoPoseStamped_size_of_topic),
},
{
.topic_id = 0x08,
.pub_id = 0x08,
.sub_id = 0x08,
.dw_id = uxrObjectId{.id=0x08, .type=UXR_DATAWRITER_ID},
.dr_id = uxrObjectId{.id=0x08, .type=UXR_DATAREADER_ID},
.topic_profile_label = "clock__t",
.dw_profile_label = "clock__dw",
.dr_profile_label = "",
.serialize = Generic_serialize_topic_fn_t(&rosgraph_msgs_msg_Clock_serialize_topic),
.deserialize = Generic_deserialize_topic_fn_t(&rosgraph_msgs_msg_Clock_deserialize_topic),
.size_of = Generic_size_of_topic_fn_t(&rosgraph_msgs_msg_Clock_size_of_topic),
},
{
.topic_id = 0x09,
.pub_id = 0x09,
.sub_id = 0x09,
.dw_id = uxrObjectId{.id=0x09, .type=UXR_DATAWRITER_ID},
.dr_id = uxrObjectId{.id=0x09, .type=UXR_DATAREADER_ID},
.topic_profile_label = "joy__t",
.dw_profile_label = "",
.dr_profile_label = "joy__dr",
.serialize = Generic_serialize_topic_fn_t(&sensor_msgs_msg_Joy_serialize_topic),
.deserialize = Generic_deserialize_topic_fn_t(&sensor_msgs_msg_Joy_deserialize_topic),
.size_of = Generic_size_of_topic_fn_t(&sensor_msgs_msg_Joy_size_of_topic),
},
};
25 changes: 25 additions & 0 deletions libraries/AP_DDS/Idl/sensor_msgs/msg/Joy.idl
@@ -0,0 +1,25 @@
// generated from rosidl_adapter/resource/msg.idl.em
// with input from sensor_msgs/msg/Joy.msg
// generated code does not contain a copyright notice

#include "std_msgs/msg/Header.idl"

module sensor_msgs {
module msg {
@verbatim (language="comment", text=
"Reports the state of a joystick's axes and buttons.")
struct Joy {
@verbatim (language="comment", text=
"The timestamp is the time at which data is received from the joystick.")
std_msgs::msg::Header header;

@verbatim (language="comment", text=
"The axes measurements from a joystick.")
sequence<float> axes;

@verbatim (language="comment", text=
"The buttons measurements from a joystick.")
sequence<int32> buttons;
};
};
};
15 changes: 15 additions & 0 deletions libraries/AP_DDS/dds_xrce_profile.xml
Expand Up @@ -223,4 +223,19 @@
</historyQos>
</topic>
</data_writer>
<topic profile_name="joy__t">
<name>rt/ap/joy</name>
<dataType>sensor_msgs::msg::dds_::Joy_</dataType>
<historyQos>
<kind>KEEP_LAST</kind>
<depth>5</depth>
</historyQos>
</topic>
<data_reader profile_name="joy__dr">
<topic>
<kind>NO_KEY</kind>
<name>rt/ap/joy</name>
<dataType>sensor_msgs::msg::dds_::Joy_</dataType>
</topic>
</data_reader>
</profiles>