-
-
Notifications
You must be signed in to change notification settings - Fork 106
/
timestamp_estimator.py
250 lines (204 loc) · 9.88 KB
/
timestamp_estimator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
#
# Copyright (C) 2014-2016 UAVCAN Development Team <uavcan.org>
#
# This software is distributed under the terms of the MIT License.
#
# Author: Pavel Kirienko <pavel.kirienko@zubax.com>
# Ben Dyer <ben_dyer@mac.com>
#
from __future__ import division, absolute_import, print_function, unicode_literals
import decimal
class SourceTimeResolver:
"""
This class contains logic that recovers absolute value of a remote clock observable via small overflowing
integer samples.
For example, consider a remote system that reports timestamps as a 16-bit integer number of milliseconds that
overflows every 60 seconds (this method is used in SLCAN for example). This class can recover the time difference
in the remote clock domain between two arbitrary timestamps, even if the timestamp variable overflowed more than
once between these events.
"""
def __init__(self, source_clock_overflow_period=None):
"""
Args:
source_clock_overflow_period: Overflow period of the remote clock, in seconds.
If not provided, the remote clock is considered to never
overflow (i.e. absolute).
"""
self.source_clock_overflow_period = \
decimal.Decimal(source_clock_overflow_period) if source_clock_overflow_period else None
if self.source_clock_overflow_period is not None and self.source_clock_overflow_period <= 0:
raise ValueError('source_clock_overflow_period must be positive or None')
# Internal states
self._resolved_time = None
self._prev_source_sample = None
self._prev_target_sample = None
def reset(self):
"""
Resets the internal logic; resolved time will start over.
"""
self._resolved_time = None
self._prev_source_sample = None
self._prev_target_sample = None
def update(self, source_clock_sample, target_clock_sample):
"""
Args:
source_clock_sample: Sample of the source clock, in seconds
target_clock_sample: Sample of the target clock, in seconds
Returns: Resolved absolute source clock value
"""
if self._resolved_time is None or self.source_clock_overflow_period is None:
self._resolved_time = decimal.Decimal(source_clock_sample)
self._prev_source_sample = source_clock_sample
self._prev_target_sample = target_clock_sample
else:
# Time between updates in the target clock domain
tgt_delta = target_clock_sample - self._prev_target_sample
self._prev_target_sample = target_clock_sample
assert tgt_delta >= 0
# Time between updates in the source clock domain
src_delta = source_clock_sample - self._prev_source_sample
self._prev_source_sample = source_clock_sample
# Using the target clock we can resolve the integer ambiguity (number of overflows)
full_cycles = int(round((tgt_delta - src_delta) / float(self.source_clock_overflow_period), 0))
# Updating the source clock now; in two steps, in order to avoid error accumulation in floats
self._resolved_time += decimal.Decimal(full_cycles * self.source_clock_overflow_period)
self._resolved_time += decimal.Decimal(src_delta)
return self._resolved_time
class TimestampEstimator:
"""
Based on "A Passive Solution to the Sensor Synchronization Problem" [Edwin Olson 2010]
https://april.eecs.umich.edu/pdfs/olson2010.pdf
"""
DEFAULT_MAX_DRIFT_PPM = 200
DEFAULT_MAX_PHASE_ERROR_TO_RESYNC = 1.
def __init__(self,
max_rate_error=None,
source_clock_overflow_period=None,
fixed_delay=None,
max_phase_error_to_resync=None):
"""
Args:
max_rate_error: The max drift parameter must be not lower than maximum relative clock
drift in PPM. If the max relative drift is guaranteed to be lower,
reducing this value will improve estimation. The default covers vast
majority of low-cost (and up) crystal oscillators.
source_clock_overflow_period: How often the source clocks wraps over, in seconds.
For example, for SLCAN this value is 60 seconds.
If not provided, the source clock is considered to never wrap over.
fixed_delay: This value will be unconditionally added to the delay estimations.
Represented in seconds. Default is zero.
For USB-interfaced sources it should be safe to use as much as 100 usec.
max_phase_error_to_resync: When this value is exceeded, the estimator will start over.
Defaults to a large value.
"""
self.max_rate_error = float(max_rate_error or (self.DEFAULT_MAX_DRIFT_PPM / 1e6))
self.fixed_delay = fixed_delay or 0
self.max_phase_error_to_resync = max_phase_error_to_resync or self.DEFAULT_MAX_PHASE_ERROR_TO_RESYNC
if self.max_rate_error < 0:
raise ValueError('max_rate_error must be non-negative')
if self.fixed_delay < 0:
raise ValueError('fixed_delay must be non-negative')
if self.max_phase_error_to_resync <= 0:
raise ValueError('max_phase_error_to_resync must be positive')
# This is used to recover absolute source time
self._source_time_resolver = SourceTimeResolver(source_clock_overflow_period=source_clock_overflow_period)
# Refer to the paper for explanations
self._p = None
self._q = None
# Statistics
self._estimated_delay = 0.0
self._resync_count = 0
def update(self, source_clock_sample, target_clock_sample):
"""
Args:
source_clock_sample: E.g. value received from the source system, in seconds
target_clock_sample: E.g. target time sampled when the data arrived to the local system, in seconds
Returns: Event timestamp converted to the target time domain.
"""
pi = float(self._source_time_resolver.update(source_clock_sample, target_clock_sample))
qi = target_clock_sample
# Initialization
if self._p is None:
self._p = pi
self._q = qi
# Sync error - refer to the reference implementation of the algorithm
self._estimated_delay = abs((pi - self._p) - (qi - self._q))
# Resynchronization (discarding known state)
if self._estimated_delay > self.max_phase_error_to_resync:
self._source_time_resolver.reset()
self._resync_count += 1
self._p = pi = float(self._source_time_resolver.update(source_clock_sample, target_clock_sample))
self._q = qi
# Offset options
assert pi >= self._p
offset = self._p - self._q - self.max_rate_error * (pi - self._p) - self.fixed_delay
new_offset = pi - qi - self.fixed_delay
# Updating p/q if the new offset is lower by magnitude
if new_offset >= offset:
offset = new_offset
self._p = pi
self._q = qi
ti = pi - offset
return ti
@property
def estimated_delay(self):
"""Estimated delay, updated in the last call to update()"""
return self._estimated_delay
@property
def resync_count(self):
return self._resync_count
if __name__ == '__main__':
# noinspection PyPackageRequirements
import matplotlib.pyplot as plt
# noinspection PyPackageRequirements
import numpy
import time
if 1:
estimator = TimestampEstimator()
print(estimator.update(.0, 1000.0))
print(estimator.update(.1, 1000.1))
print(estimator.update(.2, 1000.1)) # Repeat
print(estimator.update(.3, 1000.1)) # Repeat
print(estimator.update(.4, 1000.2))
print(estimator.update(.5, 1000.3))
if 1:
# Conversion from Real to Monotonic
estimator = TimestampEstimator(max_rate_error=1e-5,
fixed_delay=1e-6,
max_phase_error_to_resync=1e-2)
print('Initial mono to real:', time.time() - time.monotonic())
while True:
mono = time.monotonic()
real = time.time()
est_real = estimator.update(mono, real)
mono_to_real_offset = est_real - mono
print(mono_to_real_offset)
time.sleep(1)
max_rate_error = None
source_clock_range = 10
delay_min = 0.0001
delay_max = 0.02
num_samples = 200
x = range(num_samples)
delays = numpy.random.uniform(delay_min, delay_max, size=num_samples)
estimator = TimestampEstimator(max_rate_error=max_rate_error, fixed_delay=delay_min,
source_clock_overflow_period=source_clock_range)
source_clocks = []
estimated_times = []
offset_errors = []
estimated_delays = []
for i, delay in enumerate(delays):
source_clock = i
source_clocks.append(source_clock)
target_clock = i + delay
estimated_time = estimator.update(source_clock % source_clock_range, target_clock)
estimated_times.append(estimated_time)
offset_errors.append(estimated_time - source_clock)
estimated_delays.append(estimator.estimated_delay)
fig = plt.figure()
ax1 = fig.add_subplot(211)
ax1.plot(x, numpy.array(delays) * 1e3)
ax1.plot(x, numpy.array(offset_errors) * 1e3)
ax2 = fig.add_subplot(212)
ax2.plot(x, (numpy.array(estimated_times) - numpy.array(source_clocks)) * 1e3)
plt.show()