Skip to content

Commit

Permalink
Add debug logging to Publisher and SubUpdater.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Jul 24, 2022
1 parent a7f0906 commit 9d140bd
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ private void declaredAcksChanged(final RemoteAcksChanged event) {
}

private void topicSubscribersChanged(final Replicator.Changed<?> event) {
log.debug("Topics changed <{}>", event.key());
final Map<ActorRef, scala.collection.immutable.Set<String>> mmap =
CollectionConverters.asJava(((ORMultiMap<ActorRef, String>) event.dataValue()).entries());
final Map<ActorRef, List<Grouped<Long>>> deserializedMMap = mmap.entrySet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ private boolean checkForLostSubscriber(final Subscribe subscribe, final boolean
}

private void ddataOpSuccess(final DDataOpSuccess<SubscriptionsReader> opSuccess) {
log().debug("DDataOp success seqNr=<{}>", opSuccess.seqNr);
errorCounter = 0;
flushSubAcks(opSuccess.seqNr);
// race condition possible -- some published messages may arrive before the acknowledgement
Expand Down Expand Up @@ -231,6 +232,8 @@ private CompletionStage<SubscriptionsReader> performDDataOp(final boolean localS
final Replicator.WriteConsistency writeConsistency) {
final SubscriptionsReader snapshot = subscriptions.snapshot();
final CompletionStage<Void> ddataOp;
log().debug("Tick seq=<{}> changed=<{}> empty=<{}> writeConsistency=<{}>", seqNr, localSubscriptionsChanged,
subscriptions.isEmpty(), nextWriteConsistency);
if (resetProbability > 0 && Math.random() < resetProbability) {
log().debug("Resetting ddata topics: <{}>", getSelf());
ddataOp = ddata.getWriter().reset(subscriber, subscriptions.export(), writeConsistency);
Expand All @@ -245,6 +248,7 @@ private CompletionStage<SubscriptionsReader> performDDataOp(final boolean localS
final LiteralUpdate nextUpdate = subscriptions.export();
// take snapshot to give to the subscriber; clear accumulated incremental changes.
final var diff = nextUpdate.diff(previousUpdate);
log().debug("diff.isEmpty=<{}>", diff.isEmpty());
if (!diff.isEmpty()) {
ddataOp = ddata.getWriter().put(subscriber, nextUpdate.diff(previousUpdate), writeConsistency);
} else {
Expand Down

0 comments on commit 9d140bd

Please sign in to comment.