Skip to content

Commit

Permalink
DDS: Added subscriber support
Browse files Browse the repository at this point in the history
  • Loading branch information
arshPratap committed Apr 27, 2023
1 parent f4c5852 commit 7d6bb64
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 22 deletions.
101 changes: 79 additions & 22 deletions libraries/AP_DDS/AP_DDS_Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,26 @@ void AP_DDS_Client::update_topic(geometry_msgs_msg_PoseStamped& msg)
}
}

// read function triggered at every subscription callback
void AP_DDS_Client::on_topic (uxrSession* uxr_session, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* ub, uint16_t length, void* args)
{
(void) uxr_session; (void) object_id; (void) request_id; (void) stream_id; (void) length;
/*
TEMPLATE for reading to the subscribed topics
1) Store the read contents into the ucdr buffer
2) Deserialize the said contents into the topic instance
*/
//HelloWorld topic;
//HelloWorld_deserialize_topic(ub, &topic);

//printf("Received topic: %s, id: %i\n", topic.message, topic.index);

//uint32_t* count_ptr = (uint32_t*) args;
//(*count_ptr)++;
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"Recieved Data");
}

/*
start the DDS thread
*/
Expand Down Expand Up @@ -440,35 +460,72 @@ bool AP_DDS_Client::create()
const char* topic_ref = topics[i].topic_profile_label;
const auto topic_req_id = uxr_buffer_create_topic_ref(&session,reliable_out,topic_id,participant_id,topic_ref,UXR_REPLACE);

// Publisher
const uxrObjectId pub_id = {
.id = topics[i].pub_id,
.type = UXR_PUBLISHER_ID
};
const char* pub_xml = "";
const auto pub_req_id = uxr_buffer_create_publisher_xml(&session,reliable_out,pub_id,participant_id,pub_xml,UXR_REPLACE);

// Data Writer
const char* data_writer_ref = topics[i].dw_profile_label;
const auto dwriter_req_id = uxr_buffer_create_datawriter_ref(&session,reliable_out,topics[i].dw_id,pub_id,data_writer_ref,UXR_REPLACE);

// Status requests
constexpr uint8_t nRequests = 3;
const uint16_t requests[nRequests] = {topic_req_id, pub_req_id, dwriter_req_id};
uint16_t requests[nRequests];
constexpr int requestTimeoutMs = nRequests * maxTimeMsPerRequestMs;
uint8_t status[nRequests];
if (!uxr_run_session_until_all_status(&session, requestTimeoutMs, requests, status, nRequests)) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Topic/Pub/Writer session request failure for index 'TODO'");
for (int s = 0 ; s < nRequests; s++) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Status '%d' result '%u'", s, status[s]);

if(strlen(topics[i].dw_profile_label)) {
// Publisher
const uxrObjectId pub_id = {
.id = topics[i].pub_id,
.type = UXR_PUBLISHER_ID
};
const char* pub_xml = "";
const auto pub_req_id = uxr_buffer_create_publisher_xml(&session,reliable_out,pub_id,participant_id,pub_xml,UXR_REPLACE);

// Data Writer
const char* data_writer_ref = topics[i].dw_profile_label;
const auto dwriter_req_id = uxr_buffer_create_datawriter_ref(&session,reliable_out,topics[i].dw_id,pub_id,data_writer_ref,UXR_REPLACE);

// save the request statuses
requests[0] = topic_req_id;
requests[1] = pub_req_id;
requests[2] = dwriter_req_id;

if (!uxr_run_session_until_all_status(&session, requestTimeoutMs, requests, status, nRequests)) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Topic/Pub/Writer session request failure for index 'TODO'");
for (int s = 0 ; s < nRequests; s++) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Status '%d' result '%u'", s, status[s]);
}
// TODO add a failure log message sharing the status results
return false;
} else {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"XRCE Client: Topic/Pub/Writer session pass for index 'TOOO'");
}
// TODO add a failure log message sharing the status results
return false;
} else {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"XRCE Client: Topic/Pub/Writer session pass for index 'TOOO'");
}
else if(strlen(topics[i].dr_profile_label)) {
// Subscriber
const uxrObjectId sub_id = {
.id = topics[i].sub_id,
.type = UXR_SUBSCRIBER_ID
};
const char* sub_xml = "";
const auto sub_req_id = uxr_buffer_create_subscriber_xml(&session,reliable_out,sub_id,participant_id,sub_xml,UXR_REPLACE);

// Data Reader
const char* data_reader_ref = topics[i].dr_profile_label;
const auto dreader_req_id = uxr_buffer_create_datareader_ref(&session,reliable_out,topics[i].dr_id,sub_id,data_reader_ref,UXR_REPLACE);

// save the request statuses
requests[0] = topic_req_id;
requests[1] = sub_req_id;
requests[2] = dreader_req_id;

if (!uxr_run_session_until_all_status(&session, requestTimeoutMs, requests, status, nRequests)) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Topic/Sub/Reader session request failure for index '%d'",(int)i);
for (int s = 0 ; s < nRequests; s++) {
GCS_SEND_TEXT(MAV_SEVERITY_ERROR,"XRCE Client: Status '%d' result '%u'", s, status[s]);
}
// TODO add a failure log message sharing the status results
return false;
} else {
GCS_SEND_TEXT(MAV_SEVERITY_INFO,"XRCE Client: Topic/Sub/Reader session pass for index '%d'",(int)i);
uxr_buffer_request_data(&session, reliable_out, topics[i].dr_id, reliable_in, &delivery_control);
}
}
}

return true;
}

Expand Down
17 changes: 17 additions & 0 deletions libraries/AP_DDS/AP_DDS_Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,20 @@ class AP_DDS_Client
static void update_topic(sensor_msgs_msg_BatteryState& msg, const uint8_t instance);
static void update_topic(geometry_msgs_msg_PoseStamped& msg);

// subscription callback function
static void on_topic(uxrSession* session, uxrObjectId object_id, uint16_t request_id, uxrStreamId stream_id, struct ucdrBuffer* ub, uint16_t length, void* args);

// count of subscribed samples
uint32_t count;

// delivery control parameters
uxrDeliveryControl delivery_control = {
.max_samples = UXR_MAX_SAMPLES_UNLIMITED,
.max_elapsed_time = 0,
.max_bytes_per_second = 0,
.min_pace_period = 0
};

// The last ms timestamp AP_DDS wrote a Time message
uint64_t last_time_time_ms;
// The last ms timestamp AP_DDS wrote a NavSatFix message
Expand Down Expand Up @@ -128,9 +142,12 @@ class AP_DDS_Client
struct Topic_table {
const uint8_t topic_id;
const uint8_t pub_id;
const uint8_t sub_id; // added sub_id fields to avoid confusion
const uxrObjectId dw_id;
const uxrObjectId dr_id; // added dr_id fields to avoid confusion
const char* topic_profile_label;
const char* dw_profile_label;
const char* dr_profile_label;
Generic_serialize_topic_fn_t serialize;
Generic_deserialize_topic_fn_t deserialize;
Generic_size_of_topic_fn_t size_of;
Expand Down
28 changes: 28 additions & 0 deletions libraries/AP_DDS/AP_DDS_Topic_Table.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,79 @@ const struct AP_DDS_Client::Topic_table AP_DDS_Client::topics[] = {
{
.topic_id = 0x01,
.pub_id = 0x01,
.sub_id = 0x01,
.dw_id = uxrObjectId{.id=0x01, .type=UXR_DATAWRITER_ID},
.dr_id = uxrObjectId{.id=0x01, .type=UXR_DATAREADER_ID},
.topic_profile_label = "time__t",
.dw_profile_label = "time__dw",
.dr_profile_label = "time__dr",
.serialize = Generic_serialize_topic_fn_t(&builtin_interfaces_msg_Time_serialize_topic),
.deserialize = Generic_deserialize_topic_fn_t(&builtin_interfaces_msg_Time_deserialize_topic),
.size_of = Generic_size_of_topic_fn_t(&builtin_interfaces_msg_Time_size_of_topic),
},
{
.topic_id = 0x02,
.pub_id = 0x02,
.sub_id = 0x02,
.dw_id = uxrObjectId{.id=0x02, .type=UXR_DATAWRITER_ID},
.dr_id = uxrObjectId{.id=0x02, .type=UXR_DATAREADER_ID},
.topic_profile_label = "navsatfix0__t",
.dw_profile_label = "navsatfix0__dw",
.dr_profile_label = "",
.serialize = Generic_serialize_topic_fn_t(&sensor_msgs_msg_NavSatFix_serialize_topic),
.deserialize = Generic_deserialize_topic_fn_t(&sensor_msgs_msg_NavSatFix_deserialize_topic),
.size_of = Generic_size_of_topic_fn_t(&sensor_msgs_msg_NavSatFix_size_of_topic),
},
{
.topic_id = 0x03,
.pub_id = 0x03,
.sub_id = 0x03,
.dw_id = uxrObjectId{.id=0x03, .type=UXR_DATAWRITER_ID},
.dr_id = uxrObjectId{.id=0x03, .type=UXR_DATAREADER_ID},
.topic_profile_label = "statictransforms__t",
.dw_profile_label = "statictransforms__dw",
.dr_profile_label = "",
.serialize = Generic_serialize_topic_fn_t(&tf2_msgs_msg_TFMessage_serialize_topic),
.deserialize = Generic_deserialize_topic_fn_t(&tf2_msgs_msg_TFMessage_deserialize_topic),
.size_of = Generic_size_of_topic_fn_t(&tf2_msgs_msg_TFMessage_size_of_topic),
},
{
.topic_id = 0x04,
.pub_id = 0x04,
.sub_id = 0x04,
.dw_id = uxrObjectId{.id=0x04, .type=UXR_DATAWRITER_ID},
.dr_id = uxrObjectId{.id=0x04, .type=UXR_DATAREADER_ID},
.topic_profile_label = "batterystate0__t",
.dw_profile_label = "batterystate0__dw",
.dr_profile_label = "",
.serialize = Generic_serialize_topic_fn_t(&sensor_msgs_msg_BatteryState_serialize_topic),
.deserialize = Generic_deserialize_topic_fn_t(&sensor_msgs_msg_BatteryState_deserialize_topic),
.size_of = Generic_size_of_topic_fn_t(&sensor_msgs_msg_BatteryState_size_of_topic),
},
{
.topic_id = 0x05,
.pub_id = 0x05,
.sub_id = 0x05,
.dw_id = uxrObjectId{.id=0x05, .type=UXR_DATAWRITER_ID},
.dr_id = uxrObjectId{.id=0x05, .type=UXR_DATAREADER_ID},
.topic_profile_label = "localpose__t",
.dw_profile_label = "localpose__dw",
.dr_profile_label = "",
.serialize = Generic_serialize_topic_fn_t(&geometry_msgs_msg_PoseStamped_serialize_topic),
.deserialize = Generic_deserialize_topic_fn_t(&geometry_msgs_msg_PoseStamped_deserialize_topic),
.size_of = Generic_size_of_topic_fn_t(&geometry_msgs_msg_PoseStamped_size_of_topic),
},
{
.topic_id = 0x06,
.pub_id = 0x06,
.sub_id = 0x06,
.dw_id = uxrObjectId{.id=0x06, .type=UXR_DATAWRITER_ID},
.dr_id = uxrObjectId{.id=0x06, .type=UXR_DATAREADER_ID},
.topic_profile_label = "joy__t",
.dw_profile_label = "",
.dr_profile_label = "joy__dr",
.serialize = Generic_serialize_topic_fn_t(&sensor_msgs_msg_BatteryState_serialize_topic),
.deserialize = Generic_deserialize_topic_fn_t(&sensor_msgs_msg_BatteryState_deserialize_topic),
.size_of = Generic_size_of_topic_fn_t(&sensor_msgs_msg_BatteryState_size_of_topic),
},
};
35 changes: 35 additions & 0 deletions libraries/AP_DDS/dds_xrce_profile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@
</historyQos>
</topic>
</data_writer>
<data_reader profile_name="time__dr">
<topic>
<kind>NO_KEY</kind>
<name>rt/ROS2_Time</name>
<dataType>builtin_interfaces::msg::dds_::Time_</dataType>
</topic>
</data_reader>
<topic profile_name="navsatfix0__t">
<name>rt/ROS2_NavSatFix0</name>
<dataType>sensor_msgs::msg::dds_::NavSatFix_</dataType>
Expand Down Expand Up @@ -142,4 +149,32 @@
</historyQos>
</topic>
</data_writer>
<topic profile_name="joy__t">
<name>rt/ROS2_Joy</name>
<dataType>sensor_msgs::msg::dds_::Joy_</dataType>
<historyQos>
<kind>KEEP_LAST</kind>
<depth>5</depth>
</historyQos>
</topic>
<data_reader profile_name="joy__dr">
<!-- <historyMemoryPolicy>PREALLOCATED_WITH_REALLOC</historyMemoryPolicy>
<qos>
<reliability>
<kind>BEST_EFFORT</kind>
</reliability>
<durability>
<kind>VOLATILE</kind>
</durability>
</qos> -->
<topic>
<kind>NO_KEY</kind>
<name>rt/ROS2_Joy</name>
<dataType>sensor_msgs::msg::dds_::Joy_</dataType>
<!-- <historyQos>
<kind>KEEP_LAST</kind>
<depth>5</depth>
</historyQos> -->
</topic>
</data_reader>
</profiles>

0 comments on commit 7d6bb64

Please sign in to comment.