Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

microRTPS: add timesync #14297

Merged
merged 18 commits into from Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions msg/CMakeLists.txt
Expand Up @@ -128,6 +128,7 @@ set(msg_files
tecs_status.msg
telemetry_status.msg
test_motor.msg
timesync.msg
timesync_status.msg
trajectory_bezier.msg
trajectory_waypoint.msg
Expand Down
18 changes: 18 additions & 0 deletions msg/templates/uorb_microcdr/microRTPS_client.cpp.em
Expand Up @@ -78,6 +78,9 @@ receive_base_types = [s.short_name for idx, s in enumerate(spec) if scope[idx] =

void* send(void *data);

uint8_t last_remote_msg_seq = 0;
uint8_t last_msg_seq = 0;

@[if send_topics]@
void* send(void* /*unused*/)
{
Expand Down Expand Up @@ -105,13 +108,28 @@ void* send(void* /*unused*/)
@[for idx, topic in enumerate(send_topics)]@
@(send_base_types[idx])_s @(topic)_data;
if (@(topic)_sub.update(&@(topic)_data)) {
@[if topic == 'Timesync' or topic == 'timesync']@
if(@(topic)_data.sys_id == 0 && @(topic)_data.seq != last_remote_msg_seq && @(topic)_data.tc1 == 0) {
TSC21 marked this conversation as resolved.
Show resolved Hide resolved
last_remote_msg_seq = @(topic)_data.seq;

@(topic)_data.timestamp = hrt_absolute_time();
@(topic)_data.sys_id = 1;
@(topic)_data.seq = last_msg_seq;
@(topic)_data.tc1 = hrt_absolute_time() * 1000ULL;
@(topic)_data.ts1 = @(topic)_data.ts1;

last_msg_seq++;
@[end if]@
// copy raw data into local buffer. Payload is shifted by header length to make room for header
serialize_@(send_base_types[idx])(&writer, &@(topic)_data, &data_buffer[header_length], &length);
if (0 < (read = transport_node->write(static_cast<char>(@(rtps_message_id(ids, topic))), data_buffer, length)))
{
total_sent += read;
++sent;
}
@[if topic == 'Timesync' or topic == 'timesync']@
}
@[end if]@
}
@[end for]@
px4_usleep(_options.sleep_ms * 1000);
Expand Down
75 changes: 17 additions & 58 deletions msg/templates/urtps/Publisher.cpp.em
Expand Up @@ -127,67 +127,26 @@ bool @(topic)_Publisher::init()

void @(topic)_Publisher::PubListener::onPublicationMatched(Publisher* pub, MatchingInfo& info)
{
(void)pub;

if (info.status == MATCHED_MATCHING)
{
n_matched++;
std::cout << " - @(topic) publisher matched" << std::endl;
}
else
{
n_matched--;
std::cout << " - @(topic) publisher unmatched" << std::endl;
}
}

void @(topic)_Publisher::run()
{
while(m_listener.n_matched == 0)
{
@[if 1.5 <= fastrtpsgen_version <= 1.7]@
eClock::my_sleep(250); // Sleep 250 ms;
@[else]@
std::this_thread::sleep_for(std::chrono::milliseconds(250)); // Sleep 250 ms
@[end if]@
}

// Publication code
@[if 1.5 <= fastrtpsgen_version <= 1.7]@
@[ if ros2_distro]@
@(package)::msg::dds_::@(topic)_ st;
@[ else]@
@(topic)_ st;
@[ end if]@
@[else]@
@[ if ros2_distro]@
@(package)::msg::@(topic) st;
@[ else]@
@(topic) st;
@[ end if]@
@[end if]@

/* Initialize your structure here */

int msgsent = 0;
char ch = 'y';
do
{
if(ch == 'y')
{
mp_publisher->write(&st); ++msgsent;
std::cout << "Sending sample, count=" << msgsent << ", send another sample?(y-yes,n-stop): ";
}
else if(ch == 'n')
{
std::cout << "Stopping execution " << std::endl;
// The first 6 values of the ID guidPrefix of an entity in a DDS-RTPS Domain
// are the same for all its subcomponents (publishers, subscribers)
bool is_different_endpoint = false;
for (size_t i = 0; i < 6; i++) {
if (pub->getGuid().guidPrefix.value[i] != info.remoteEndpointGuid.guidPrefix.value[i]) {
is_different_endpoint = true;
break;
}
else
{
std::cout << "Command " << ch << " not recognized, please enter \"y/n\":";
}

// If the matching happens for the same entity, do not make a match
if (is_different_endpoint) {
if (info.status == MATCHED_MATCHING) {
n_matched++;
std::cout << " - @(topic) publisher matched" << std::endl;
} else {
n_matched--;
std::cout << " - @(topic) publisher unmatched" << std::endl;
}
}while(std::cin >> ch);
}
}

@[if 1.5 <= fastrtpsgen_version <= 1.7]@
Expand Down
23 changes: 21 additions & 2 deletions msg/templates/urtps/RtpsTopics.cpp.em
Expand Up @@ -73,7 +73,6 @@ bool RtpsTopics::init(std::condition_variable* t_send_queue_cv, std::mutex* t_se
std::cerr << "Failed starting @(topic) subscriber" << std::endl;
return false;
}

@[end for]@
std::cout << "--------------------" << std::endl << std::endl;
@[end if]@
Expand All @@ -83,11 +82,13 @@ bool RtpsTopics::init(std::condition_variable* t_send_queue_cv, std::mutex* t_se
@[for topic in send_topics]@
if (_@(topic)_pub.init()) {
std::cout << "- @(topic) publisher started" << std::endl;
@[ if topic == 'Timesync' or topic == 'timesync']@
_timesync->start(&_@(topic)_pub);
@[ end if]@
} else {
std::cerr << "ERROR starting @(topic) publisher" << std::endl;
return false;
}

@[end for]@
std::cout << "--------------------" << std::endl;
@[end if]@
Expand Down Expand Up @@ -118,7 +119,17 @@ void RtpsTopics::publish(uint8_t topic_ID, char data_buffer[], size_t len)
eprosima::fastcdr::FastBuffer cdrbuffer(data_buffer, len);
eprosima::fastcdr::Cdr cdr_des(cdrbuffer);
st.deserialize(cdr_des);
@[ if topic == 'Timesync' or topic == 'timesync']@
_timesync->processTimesyncMsg(&st);

if (st.sys_id() == 1) {
@[ end if]@
// apply timestamp offset
_timesync->subtractOffset(st.timestamp());
_@(topic)_pub.publish(&st);
@[ if topic == 'Timesync' or topic == 'timesync']@
}
@[ end if]@
}
break;
@[end for]@
Expand Down Expand Up @@ -152,8 +163,16 @@ bool RtpsTopics::getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr)
@(topic) msg = _@(topic)_sub.getMsg();
@[ end if]@
@[ end if]@
@[ if topic == 'Timesync' or topic == 'timesync']@
if (msg.sys_id() == 0) {
@[ end if]@
// apply timestamp offset
_timesync->addOffset(msg.timestamp());
msg.serialize(scdr);
ret = true;
@[ if topic == 'Timesync' or topic == 'timesync']@
}
@[ end if]@
_@(topic)_sub.unlockMsg();
}
break;
Expand Down
5 changes: 5 additions & 0 deletions msg/templates/urtps/RtpsTopics.h.em
Expand Up @@ -57,6 +57,8 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer
#include <condition_variable>
#include <queue>

#include "microRTPS_timesync.h"

@[for topic in send_topics]@
#include "@(topic)_Publisher.h"
@[end for]@
Expand All @@ -67,6 +69,7 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer
class RtpsTopics {
public:
bool init(std::condition_variable* t_send_queue_cv, std::mutex* t_send_queue_mutex, std::queue<uint8_t>* t_send_queue);
void set_timesync(const std::shared_ptr<TimeSync>& timesync) { _timesync = timesync; };
@[if send_topics]@
void publish(uint8_t topic_ID, char data_buffer[], size_t len);
@[end if]@
Expand All @@ -88,5 +91,7 @@ private:
@(topic)_Subscriber _@(topic)_sub;
@[end for]@

std::shared_ptr<TimeSync> _timesync;

@[end if]@
};
60 changes: 40 additions & 20 deletions msg/templates/urtps/Subscriber.cpp.em
Expand Up @@ -127,30 +127,50 @@ bool @(topic)_Subscriber::init(uint8_t topic_ID, std::condition_variable* t_send

void @(topic)_Subscriber::SubListener::onSubscriptionMatched(Subscriber* sub, MatchingInfo& info)
{
@# Since the time sync runs on the bridge itself, it is required that there is a
@# match between two topics of the same entity
@[if topic != 'Timesync' and topic != 'timesync']@
// The first 6 values of the ID guidPrefix of an entity in a DDS-RTPS Domain
// are the same for all its subcomponents (publishers, subscribers)
bool is_different_endpoint = false;
for (size_t i = 0; i < 6; i++) {
if (sub->getGuid().guidPrefix.value[i] != info.remoteEndpointGuid.guidPrefix.value[i]) {
is_different_endpoint = true;
break;
}
}

// If the matching happens for the same entity, do not make a match
if (is_different_endpoint) {
if (info.status == MATCHED_MATCHING) {
n_matched++;
std::cout << " - @(topic) subscriber matched" << std::endl;
} else {
n_matched--;
std::cout << " - @(topic) subscriber unmatched" << std::endl;
}
}
@[else]@
(void)sub;

if (info.status == MATCHED_MATCHING)
{
if (info.status == MATCHED_MATCHING) {
n_matched++;
std::cout << " - @(topic) subscriber matched" << std::endl;
}
else
{
} else {
n_matched--;
std::cout << " - @(topic) subscriber unmatched" << std::endl;
}
@[end if]@
}

void @(topic)_Subscriber::SubListener::onNewDataMessage(Subscriber* sub)
{
if (n_matched > 0) {
std::unique_lock<std::mutex> has_msg_lock(has_msg_mutex);
if(has_msg.load() == true) // Check if msg has been fetched
{
has_msg_cv.wait(has_msg_lock); // Wait till msg has been fetched
}
has_msg_lock.unlock();


// Take data
if(sub->takeNextData(&msg, &m_info))
{
Expand All @@ -167,18 +187,16 @@ void @(topic)_Subscriber::SubListener::onNewDataMessage(Subscriber* sub)

}
}
}

void @(topic)_Subscriber::run()
{
std::cout << "Waiting for Data, press Enter to stop the Subscriber. "<<std::endl;
std::cin.ignore();
std::cout << "Shutting down the Subscriber." << std::endl;
}
}

bool @(topic)_Subscriber::hasMsg()
{
return m_listener.has_msg.load();
if (m_listener.n_matched > 0) {
return m_listener.has_msg.load();
}

return false;
}

@[if 1.5 <= fastrtpsgen_version <= 1.7]@
Expand All @@ -200,8 +218,10 @@ bool @(topic)_Subscriber::hasMsg()

void @(topic)_Subscriber::unlockMsg()
{
std::unique_lock<std::mutex> has_msg_lock(m_listener.has_msg_mutex);
m_listener.has_msg = false;
has_msg_lock.unlock();
m_listener.has_msg_cv.notify_one();
if (m_listener.n_matched > 0) {
std::unique_lock<std::mutex> has_msg_lock(m_listener.has_msg_mutex);
m_listener.has_msg = false;
has_msg_lock.unlock();
m_listener.has_msg_cv.notify_one();
}
}
10 changes: 10 additions & 0 deletions msg/templates/urtps/microRTPS_agent.cpp.em
Expand Up @@ -70,6 +70,7 @@ recv_topics = [(alias[idx] if alias[idx] else s.short_name) for idx, s in enumer
#include <fastrtps/Domain.h>

#include "microRTPS_transport.h"
#include "microRTPS_timesync.h"
#include "RtpsTopics.h"

#define BUFFER_SIZE 1024
Expand All @@ -92,6 +93,9 @@ Transport_node *transport_node = nullptr;
RtpsTopics topics;
uint32_t total_sent = 0, sent = 0;

// Init timesync
std::shared_ptr<TimeSync> timeSync = std::make_shared<TimeSync>();

struct options {
enum class eTransports
{
Expand Down Expand Up @@ -160,6 +164,7 @@ void signal_handler(int signum)
printf("Interrupt signal (%d) received.\n", signum);
running = 0;
transport_node->close();
timeSync->stop();
}

@[if recv_topics]@
Expand Down Expand Up @@ -188,6 +193,7 @@ void t_send(void*)
/* make room for the header to fill in later */
eprosima::fastcdr::FastBuffer cdrbuffer(&data_buffer[header_length], sizeof(data_buffer)-header_length);
eprosima::fastcdr::Cdr scdr(cdrbuffer);

if (topics.getMsg(topic_ID, scdr))
{
length = scdr.getSerializedDataLength();
Expand Down Expand Up @@ -253,6 +259,8 @@ int main(int argc, char** argv)
std::chrono::time_point<std::chrono::steady_clock> start, end;
@[end if]@

topics.set_timesync(timeSync);

@[if recv_topics]@
topics.init(&t_send_queue_cv, &t_send_queue_mutex, &t_send_queue);
@[end if]@
Expand Down Expand Up @@ -300,5 +308,7 @@ int main(int argc, char** argv)
delete transport_node;
transport_node = nullptr;

timeSync->reset();

return 0;
}