Skip to content

Commit

Permalink
microRTPS: timesync: properly apply offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
TSC21 committed Mar 9, 2020
1 parent 3644b3b commit fef82f8
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 55 deletions.
17 changes: 12 additions & 5 deletions msg/templates/urtps/RtpsTopics.cpp.em
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,15 @@ void RtpsTopics::publish(uint8_t topic_ID, char data_buffer[], size_t len)
st.deserialize(cdr_des);
@[ if topic == 'Timesync' or topic == 'timesync']@
_timesync->processTimesyncMsg(&st);

if (st.sys_id() == 1) {
@[ end if]@
// apply timestamp offset
_timesync->applyOffset(st.timestamp());
_timesync->subtractOffset(st.timestamp());
_@(topic)_pub.publish(&st);
@[ if topic == 'Timesync' or topic == 'timesync']@
if (st.sys_id() == 1)
}
@[ end if]@
_@(topic)_pub.publish(&st);
}
break;
@[end for]@
Expand Down Expand Up @@ -160,11 +162,16 @@ bool RtpsTopics::getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr)
@[ else]@
@(topic) msg = _@(topic)_sub.getMsg();
@[ end if]@
@[ end if]@
@[ if topic == 'Timesync' or topic == 'timesync']@
if (msg.sys_id() == 0) {
@[ end if]@
// apply timestamp offset
_timesync->applyOffset(msg.timestamp());

_timesync->addOffset(msg.timestamp());
msg.serialize(scdr);
@[ if topic == 'Timesync' or topic == 'timesync']@
}
@[ end if]@
ret = true;
_@(topic)_sub.unlockMsg();
}
Expand Down
70 changes: 32 additions & 38 deletions msg/templates/urtps/microRTPS_timesync.cpp.em
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ TimeSync::TimeSync()

TimeSync::~TimeSync() { stop(); }

void TimeSync::setNewOffsetCB(std::function<void(int64_t)> callback) { _notifyNewOffset = callback; }

@[if ros2_distro]@
void TimeSync::start(const Timesync_Publisher* pub) {
@[else]@
Expand Down Expand Up @@ -138,7 +136,7 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t
}

if (_num_samples >= WINDOW_SIZE) {
if (std::abs(measurement_offset - _offset_ns) > TRIGGER_RESET_THRESHOLD_NS) {
if (std::abs(measurement_offset - _offset_ns.load()) > TRIGGER_RESET_THRESHOLD_NS) {
_request_reset_counter++;
std::cout << std::endl << "Timesync offset outlier, discarding" << std::endl;
return false;
Expand All @@ -148,24 +146,10 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t
}

if (_num_samples == 0) {
_offset_ns = measurement_offset;
updateOffset(measurement_offset);
_skew_ns_per_sync = 0;
}

{
int64_t local_t2 = remote_t2_ns - _offset_ns;
int64_t time_there = local_t2 - local_t1_ns;

int64_t remote_t3 = local_t3_ns + _offset_ns;
int64_t time_back = remote_t3 - remote_t2_ns;

if (std::abs(time_back - time_there) > 3ll * 1000ll * 1000ll) {
std::cout << "trip there: " << time_there / 1e6f << "ms trip back: " << time_back / 1e6f
<< "ms , discarding" << std::endl;
return false;
}
}

// ignore if rtti > 10ms
if (rtti > 15ll * 1000ll * 1000ll) {
std::cout << std::endl
Expand All @@ -177,9 +161,9 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t
double alpha = ALPHA_INITIAL * (1. - schedule) + ALPHA_FINAL * schedule;
double beta = BETA_INTIIAL * (1. - schedule) + BETA_FINAL * schedule;

int64_t offset_prev = _offset_ns;
_offset_ns = static_cast<int64_t>((_skew_ns_per_sync + _offset_ns) * (1. - alpha) + measurement_offset * alpha);
_skew_ns_per_sync = static_cast<int64_t>(beta * (_offset_ns - offset_prev) + (1. - beta) * _skew_ns_per_sync);
int64_t offset_prev = _offset_ns.load();
updateOffset(static_cast<int64_t>((_skew_ns_per_sync + _offset_ns.load()) * (1. - alpha) + measurement_offset * alpha));
_skew_ns_per_sync = static_cast<int64_t>(beta * (_offset_ns.load() - offset_prev) + (1. - beta) * _skew_ns_per_sync);

_num_samples++;

Expand All @@ -188,51 +172,61 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t

@[if 1.5 <= fastrtpsgen_version <= 1.7]@
@[ if ros2_distro]@
void TimeSync::processTimesyncMsg(const @(package)::msg::dds_::Timesync_* msg) {
void TimeSync::processTimesyncMsg(@(package)::msg::dds_::Timesync_* msg) {
@[ else]@
void TimeSync::processTimesyncMsg(const timesync_* msg) {
void TimeSync::processTimesyncMsg(timesync_* msg) {
@[ end if]@
@[else]@
@[ if ros2_distro]@
void TimeSync::processTimesyncMsg(const @(package)::msg::Timesync* msg) {
void TimeSync::processTimesyncMsg(@(package)::msg::Timesync* msg) {
@[ else]@
void TimeSync::processTimesyncMsg(const timesync* msg) {
void TimeSync::processTimesyncMsg(timesync* msg) {
@[ end if]@
@[end if]@
if (msg->sys_id() == 1 && msg->seq() != _last_remote_msg_seq && msg->tc1() > 0) {
_last_remote_msg_seq = msg->seq();
if (msg->sys_id() == 1 && msg->seq() != _last_remote_msg_seq) {
if (msg->tc1() > 0) {
_last_remote_msg_seq = msg->seq();

if (!addMeasurement(msg->ts1(), msg->tc1(), getMonoRawTimeNSec())) {
std::cerr << "Offset not updated" << std::endl;
}

int64_t now_time = getSystemMonoNanos();
} else if (msg->tc1() == 0) {
_last_remote_msg_seq = msg->seq();

bool added = addMeasurement(msg->ts1(), msg->tc1(), now_time);
if (added && _notifyNewOffset) _notifyNewOffset(_offset_ns);
// std::cout << "Offset: " << _offset_ns << std::endl;
msg->timestamp() = getMonoTimeUSec();
msg->sys_id() = 0;
msg->seq()++;
msg->tc1() = getMonoRawTimeNSec();

_timesync_pub.publish(msg);
}
}
}

@[if 1.5 <= fastrtpsgen_version <= 1.7]@
@[ if ros2_distro]@
@(package)::msg::dds_::Timesync_ TimeSync::newTimesyncMsg() {
@(package)::msg::dds_::Timesync_ msg{};
@(package)::msg::dds_::Timesync_ msg{};
@[ else]@
timesync_ TimeSync::newTimesyncMsg() {
timesync_ msg{};
timesync_ msg{};
@[ end if]@
@[else]@
@[ if ros2_distro]@
@(package)::msg::Timesync TimeSync::newTimesyncMsg() {
@(package)::msg::Timesync msg{};
@(package)::msg::Timesync msg{};
@[ else]@
timesync TimeSync::newTimesyncMsg() {
timesync msg{};
timesync msg{};
@[ end if]@
@[end if]@
msg.timestamp() = getMonoTime();
applyOffset(msg.timestamp());

msg.timestamp() = getMonoTimeUSec();
msg.sys_id() = 0;
msg.seq() = _last_msg_seq;
msg.tc1() = 0;
msg.ts1() = getSystemMonoNanos();
msg.ts1() = getMonoRawTimeNSec();

_last_msg_seq++;

Expand Down
24 changes: 12 additions & 12 deletions msg/templates/urtps/microRTPS_timesync.h.em
Original file line number Diff line number Diff line change
Expand Up @@ -99,51 +99,51 @@ public:
void reset();
void stop();

void setNewOffsetCB(std::function<void(int64_t)> callback);

/**
* Get clock monotonic time (raw) in nanoseconds
*/
inline int64_t getSystemMonoNanos() {
inline int64_t getMonoRawTimeNSec() {
timespec t;
clock_gettime(CLOCK_MONOTONIC_RAW, &t);
return (int64_t)t.tv_sec * 1000000000ll + t.tv_nsec;
return (int64_t)t.tv_sec * 1000000000LL + t.tv_nsec;
}

/**
* Get system monotonic time in microseconds
*/
inline uint64_t getMonoTime() {
inline int64_t getMonoTimeUSec() {
timespec t;
clock_gettime(CLOCK_MONOTONIC, &t);
return (t.tv_sec * 1000000000ll + t.tv_nsec) / 1000ULL;
return (int64_t)(t.tv_sec * 1000000000LL + t.tv_nsec) / 1000LL;
}

bool addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t local_t3_ns);

@[if 1.5 <= fastrtpsgen_version <= 1.7]@
@[ if ros2_distro]@
void processTimesyncMsg(const @(package)::msg::dds_::Timesync_* msg);
void processTimesyncMsg(@(package)::msg::dds_::Timesync_* msg);

@(package)::msg::dds_::Timesync_ newTimesyncMsg();
@[ else]@
void processTimesyncMsg(const timesync_* msg);
void processTimesyncMsg(timesync_* msg);

timesync_ newTimesyncMsg();
@[ end if]@
@[else]@
@[ if ros2_distro]@
void processTimesyncMsg(const @(package)::msg::Timesync* msg);
void processTimesyncMsg(@(package)::msg::Timesync* msg);

@(package)::msg::Timesync newTimesyncMsg();
@[ else]@
void processTimesyncMsg(const timesync* msg);
void processTimesyncMsg(timesync* msg);

timesync newTimesyncMsg();
@[ end if]@
@[end if]@

inline void applyOffset(uint64_t timestamp) { timestamp += _offset_ns.load(); }
inline int64_t getOffset() { return _offset_ns.load(); }
inline void addOffset(uint64_t& timestamp) { timestamp = (timestamp * 1000LL + _offset_ns.load()) / 10000ULL; }
inline void subtractOffset(uint64_t& timestamp) { timestamp = (timestamp * 1000LL - _offset_ns.load()) / 10000ULL; }

private:
std::atomic<int64_t> _offset_ns;
Expand All @@ -165,5 +165,5 @@ private:
std::unique_ptr<std::thread> _send_timesync_thread;
std::atomic<bool> _request_stop{false};

std::function<void(int64_t)> _notifyNewOffset;
inline void updateOffset(const uint64_t& offset) { _offset_ns.store(offset, std::memory_order_relaxed); }
};

0 comments on commit fef82f8

Please sign in to comment.