Skip to content

Commit

Permalink
ensure delivery of writes immediately following pub match event (#165)
Browse files Browse the repository at this point in the history
A long-standing bug of Cyclone is that a sample written immediately
after a publication-matched event may never arrive at the reader that
was just matched.  This happened because the reader need not have
completed discovery of the writer by the time the writer discovers the
reader, at which point the reader ignores the sample because it either
doesn't know the writer at all, or it hasn't yet seen a Heartbeat from
it.

That Heartbeat arrives shortly after, but by then it is too late: the
reader slaves decides to accept the next sample to be written by the
writer.  (It has no choice, really: either you risk losing some data, or
you will be requesting all historical data, which is empathically not
what a volatile reader is about ...)

A related issue is the handling of historical data for transient-local
readers: it used to deliver this out-of-order, but that is firstly
against the specification, and secondly, against reasonable expectations
of those who use DDS as a mere publish-subscribe messaging system.  To
add insult to injury, it didn't completely handle some reordering issues
with disposes ...

This commit changes the way writers respond to a request for
retransmission from volatile proxy readers and the way the
in-sync/out-of-sync setting of a reader with respect to a proxy-writer
is used.  The first makes it safe for a Cyclone reader to ask a Cyclone
writer for all data (all these details not being covered in the specs it
errs on the reasonable side for other vendors, but that may cause the
data loss mentioned above): the writer simply send a Gap message to the
reader for all the sequence numbers prior to the matching.

The second changes the rule for switching from out-of-sync to in-sync:
that transition is now simply once the next sequence number to be
delivered to the reader equals the next sequence number that will be
delivered directly from the proxy writer object to all readers.  (I.e.,
a much more intuitive notion than reaching some seemingly arbitrary
sequence number.)

To avoid duplicates the rule for delivery straight from a proxy writer
has changed: where samples were delivered from the proxy writer to all
matching readers, they are now delivered only to the matching readers
that are in-sync.  To avoid ordering problems, the idea that historical
data can be delivered through the asynchronous delivery path even when
the regular data goes through the synchronous delivery path has been
abandoned.  All data now always follows the same path.

As these same mechanisms are used for getting historical data into
transient-local readers, the ordering problem for the historical data
also disappeared.

The test stuff in src/core/xtests/initsampledeliv covers a lot of the
interesting cases: data published before the existene of a reader, after
it, mixes of volatile and transient-local.  Running them takes quite a
bit of time, and they are not yet integrated in the CI builds (if ever,
because of that time).

Note: the "conservative built-in startup" option has been removed,
because it really makes no sense to keep a vague compatibility option
added a decade ago "just in case" that has never been used ...

Note: the workaround in the src/mpt/tests/basic/procs/hello.c (use
transient-local to ensure delivery of data) has been removed, as has
been its workaround for the already-fixed #146.

Signed-off-by: Erik Boasson <eb@ilities.com>
  • Loading branch information
eboasson committed May 28, 2019
1 parent ffbd3d8 commit 83b1f27
Show file tree
Hide file tree
Showing 16 changed files with 675 additions and 170 deletions.
1 change: 0 additions & 1 deletion src/core/ddsi/include/dds/ddsi/q_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ struct config
uint32_t rmsg_chunk_size; /**<< size of a chunk in the receive buffer */
uint32_t rbuf_size; /* << size of a single receiver buffer */
enum besmode besmode;
int conservative_builtin_reader_startup;
int meas_hb_to_ack_latency;
int unicast_response_to_spdp_messages;
int synchronous_delivery_priority_threshold;
Expand Down
4 changes: 3 additions & 1 deletion src/core/ddsi/include/dds/ddsi/q_entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ struct pwr_rd_match {
union {
struct {
seqno_t end_of_tl_seq; /* when seq >= end_of_tl_seq, it's in sync, =0 when not tl */
seqno_t end_of_out_of_sync_seq; /* when seq >= end_of_tl_seq, it's in sync, =0 when not tl */
struct nn_reorder *reorder; /* can be done (mostly) per proxy writer, but that is harder; only when state=OUT_OF_SYNC */
} not_in_sync;
} u;
Expand Down Expand Up @@ -574,6 +573,9 @@ uint64_t writer_instance_id (const struct nn_guid *guid);
rebuild them all (which only makes sense after previously having emptied them all). */
void rebuild_or_clear_writer_addrsets(int rebuild);


void local_reader_ary_setfastpath_ok (struct local_reader_ary *x, bool fastpath_ok);

#if defined (__cplusplus)
}
#endif
Expand Down
7 changes: 2 additions & 5 deletions src/core/ddsi/src/q_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,9 @@ static const struct cfgelem unsupp_cfgelems[] = {
{ MOVED("FragmentSize", "CycloneDDS/General/FragmentSize") },
{ LEAF("DeliveryQueueMaxSamples"), 1, "256", ABSOFF(delivery_queue_maxsamples), 0, uf_uint, 0, pf_uint,
BLURB("<p>This element controls the Maximum size of a delivery queue, expressed in samples. Once a delivery queue is full, incoming samples destined for that queue are dropped until space becomes available again.</p>") },
{ LEAF("PrimaryReorderMaxSamples"), 1, "64", ABSOFF(primary_reorder_maxsamples), 0, uf_uint, 0, pf_uint,
{ LEAF("PrimaryReorderMaxSamples"), 1, "128", ABSOFF(primary_reorder_maxsamples), 0, uf_uint, 0, pf_uint,
BLURB("<p>This element sets the maximum size in samples of a primary re-order administration. Each proxy writer has one primary re-order administration to buffer the packet flow in case some packets arrive out of order. Old samples are forwarded to secondary re-order administrations associated with readers in need of historical data.</p>") },
{ LEAF("SecondaryReorderMaxSamples"), 1, "16", ABSOFF(secondary_reorder_maxsamples), 0, uf_uint, 0, pf_uint,
{ LEAF("SecondaryReorderMaxSamples"), 1, "128", ABSOFF(secondary_reorder_maxsamples), 0, uf_uint, 0, pf_uint,
BLURB("<p>This element sets the maximum size in samples of a secondary re-order administration. The secondary re-order administration is per reader in need of historical data.</p>") },
{ LEAF("DefragUnreliableMaxSamples"), 1, "4", ABSOFF(defrag_unreliable_maxsamples), 0, uf_uint, 0, pf_uint,
BLURB("<p>This element sets the maximum number of samples that can be defragmented simultaneously for a best-effort writers.</p>") },
Expand All @@ -523,9 +523,6 @@ static const struct cfgelem unsupp_cfgelems[] = {
<li><i>writers</i>: all participants have the writers, but just one has the readers;</li>\n\
<li><i>minimal</i>: only one participant has built-in endpoints.</li></ul>\n\
<p>The default is <i>writers</i>, as this is thought to be compliant and reasonably efficient. <i>Minimal</i> may or may not be compliant but is most efficient, and <i>full</i> is inefficient but certain to be compliant. See also Internal/ConservativeBuiltinReaderStartup.</p>") },
{ LEAF("ConservativeBuiltinReaderStartup"), 1, "false", ABSOFF(conservative_builtin_reader_startup), 0, uf_boolean, 0, pf_boolean,
BLURB("<p>This element forces all DDSI2E built-in discovery-related readers to request all historical data, instead of just one for each \"topic\". There is no indication that any of the current DDSI implementations requires changing of this setting, but it is conceivable that an implementation might track which participants have been informed of the existence of endpoints and which have not been, refusing communication with those that have \"can't\" know.</p>\n\
<p>Should it be necessary to hide DDSI2E's shared discovery behaviour, set this to <i>true</i> and Internal/BuiltinEndpointSet to <i>full</i>.</p>") },
{ LEAF("MeasureHbToAckLatency"), 1, "false", ABSOFF(meas_hb_to_ack_latency), 0, uf_boolean, 0, pf_boolean,
BLURB("<p>This element enables heartbeat-to-ack latency among DDSI2E services by prepending timestamps to Heartbeat and AckNack messages and calculating round trip times. This is non-standard behaviour. The measured latencies are quite noisy and are currently not used anywhere.</p>") },
{ LEAF("UnicastResponseToSPDPMessages"), 1, "true", ABSOFF(unicast_response_to_spdp_messages), 0, uf_boolean, 0, pf_boolean,
Expand Down
2 changes: 1 addition & 1 deletion src/core/ddsi/src/q_debmon.c
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ static int print_proxy_participants (struct thread_state1 * const ts1, ddsi_tran
x += cpf (conn, " tl-catchup end_of_tl_seq %lld\n", m->u.not_in_sync.end_of_tl_seq);
break;
case PRMSS_OUT_OF_SYNC:
x += cpf (conn, " out-of-sync end_of_tl_seq %lld end_of_out_of_sync_seq %lld\n", m->u.not_in_sync.end_of_tl_seq, m->u.not_in_sync.end_of_out_of_sync_seq);
x += cpf (conn, " out-of-sync end_of_tl_seq %lld\n", m->u.not_in_sync.end_of_tl_seq);
break;
}
}
Expand Down
80 changes: 60 additions & 20 deletions src/core/ddsi/src/q_entity.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ void local_reader_ary_remove (struct local_reader_ary *x, struct reader *rd)
ddsrt_mutex_unlock (&x->rdary_lock);
}

void local_reader_ary_setfastpath_ok (struct local_reader_ary *x, bool fastpath_ok)
{
ddsrt_mutex_lock (&x->rdary_lock);
if (x->valid)
x->fastpath_ok = fastpath_ok;
ddsrt_mutex_unlock (&x->rdary_lock);
}

void local_reader_ary_setinvalid (struct local_reader_ary *x)
{
ddsrt_mutex_lock (&x->rdary_lock);
Expand Down Expand Up @@ -1492,9 +1500,17 @@ static void proxy_writer_drop_connection (const struct nn_guid *pwr_guid, struct
{
ddsrt_avl_delete (&pwr_readers_treedef, &pwr->readers, m);
if (m->in_sync != PRMSS_SYNC)
pwr->n_readers_out_of_sync--;
{
if (--pwr->n_readers_out_of_sync == 0)
local_reader_ary_setfastpath_ok (&pwr->rdary, true);
}
if (rd->reliable)
pwr->n_reliable_readers--;
/* If no reliable readers left, there is no reason to believe the heartbeats will keep
coming and therefore reset have_seen_heartbeat so the next reader to be created
doesn't get initialised based on stale data */
if (pwr->n_reliable_readers == 0)
pwr->have_seen_heartbeat = 0;
local_reader_ary_remove (&pwr->rdary, rd);
}
ddsrt_mutex_unlock (&pwr->e.lock);
Expand Down Expand Up @@ -1775,7 +1791,6 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader
{
struct pwr_rd_match *m = ddsrt_malloc (sizeof (*m));
ddsrt_avl_ipath_t path;
seqno_t last_deliv_seq;

ddsrt_mutex_lock (&pwr->e.lock);
if (ddsrt_avl_lookup_ipath (&pwr_readers_treedef, &pwr->readers, &rd->e.guid, &path))
Expand All @@ -1794,7 +1809,6 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader
m->rd_guid = rd->e.guid;
m->tcreate = now_mt ();


/* We track the last heartbeat count value per reader--proxy-writer
pair, so that we can correctly handle directed heartbeats. The
only reason to bother is to prevent a directed heartbeat (with
Expand All @@ -1813,34 +1827,61 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader
/* These can change as a consequence of handling data and/or
discovery activities. The safe way of dealing with them is to
lock the proxy writer */
last_deliv_seq = nn_reorder_next_seq (pwr->reorder) - 1;
if (!rd->handle_as_transient_local)
if (is_builtin_entityid (rd->e.guid.entityid, NN_VENDORID_ECLIPSE) && !ddsrt_avl_is_empty (&pwr->readers))
{
/* builtins really don't care about multiple copies or anything */
m->in_sync = PRMSS_SYNC;
}
else if (!config.conservative_builtin_reader_startup && is_builtin_entityid (rd->e.guid.entityid, NN_VENDORID_ECLIPSE) && !ddsrt_avl_is_empty (&pwr->readers))
else if (!pwr->have_seen_heartbeat)
{
/* Proxy writer hasn't seen a heartbeat yet: means we have no
clue from what sequence number to start accepting data, nor
where historical data ends and live data begins.
A transient-local reader should always get all historical
data, and so can always start-out as "out-of-sync". Cyclone
refuses to retransmit already ACK'd samples to a Cyclone
reader, so if the other side is Cyclone, we can always start
from sequence number 1.
For non-Cyclone, if the reader is volatile, we have to just
start from the most recent sample, even though that means
the first samples written after matching the reader may be
lost. The alternative not only gets too much historical data
but may also result in "sample lost" notifications because the
writer is (may not be) retaining samples on behalf of this
reader for the oldest samples and so this reader may end up
with a partial set of old-ish samples. Even when both are
using KEEP_ALL and the connection doesn't fail ... */
if (rd->handle_as_transient_local)
m->in_sync = PRMSS_OUT_OF_SYNC;
else if (vendor_is_eclipse (pwr->c.vendor))
m->in_sync = PRMSS_OUT_OF_SYNC;
else
m->in_sync = PRMSS_SYNC;
m->u.not_in_sync.end_of_tl_seq = MAX_SEQ_NUMBER;
}
else if (!rd->handle_as_transient_local)
{
/* builtins really don't care about multiple copies */
/* volatile reader, writer has seen a heartbeat: it's in sync
(there is a risk of it getting some historical data: that
happens to be cached in the writer's reorder admin at this
point) */
m->in_sync = PRMSS_SYNC;
}
else
{
/* normal transient-local, reader is behind proxy writer */
/* transient-local reader; range of sequence numbers is already
known */
m->in_sync = PRMSS_OUT_OF_SYNC;
if (last_deliv_seq == 0)
{
m->u.not_in_sync.end_of_out_of_sync_seq = MAX_SEQ_NUMBER;
m->u.not_in_sync.end_of_tl_seq = MAX_SEQ_NUMBER;
}
else
{
m->u.not_in_sync.end_of_tl_seq = pwr->last_seq;
m->u.not_in_sync.end_of_out_of_sync_seq = last_deliv_seq;
}
DDS_LOG(DDS_LC_DISCOVERY, " - out-of-sync %"PRId64, m->u.not_in_sync.end_of_out_of_sync_seq);
m->u.not_in_sync.end_of_tl_seq = pwr->last_seq;
}
if (m->in_sync != PRMSS_SYNC)
{
DDS_LOG(DDS_LC_DISCOVERY, " - out-of-sync");
pwr->n_readers_out_of_sync++;
local_reader_ary_setfastpath_ok (&pwr->rdary, false);
}
m->count = init_count;
/* Spec says we may send a pre-emptive AckNack (8.4.2.3.4), hence we
schedule it for the configured delay * T_MILLISECOND. From then
Expand Down Expand Up @@ -3510,7 +3551,6 @@ void new_proxy_participant
proxypp->plist = nn_plist_dup (plist);
ddsrt_avl_init (&proxypp_groups_treedef, &proxypp->groups);


if (custom_flags & CF_INC_KERNEL_SEQUENCE_NUMBERS)
proxypp->kernel_sequence_numbers = 1;
else
Expand Down

0 comments on commit 83b1f27

Please sign in to comment.