Skip to content

Commit

Permalink
Invalidate policies cache on policy update; log policy updates in upd…
Browse files Browse the repository at this point in the history
…ater and forwarder.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Apr 17, 2022
1 parent 1abf05b commit 215568c
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
Expand Up @@ -148,6 +148,9 @@ private void updatePolicyRevision(final LocalWrappedPolicyTag wrappedPolicyTag)
* @param policyReferenceTag the policy reference tag.
*/
private void forwardToThingsUpdater(final PolicyReferenceTag policyReferenceTag) {
log.info("Forwarding <{}> at <{}> to <{}>", policyReferenceTag.getPolicyTag().getEntityId(),
policyReferenceTag.getPolicyTag().getRevision(),
policyReferenceTag.getThingId());
thingsUpdater.tell(policyReferenceTag, ActorRef.noSender());
}

Expand Down Expand Up @@ -183,8 +186,7 @@ private void restartPolicyReferenceTagStream() {
} else {
repeat = Source.repeat(Control.DUMP_POLICY_REVISIONS);
}
killSwitch = repeat
.viaMat(KillSwitches.single(), Keep.right())
killSwitch = repeat.viaMat(KillSwitches.single(), Keep.right())
.mapAsync(1, message ->
Patterns.ask(self, message, ASK_SELF_TIMEOUT).exceptionally(Function.identity()))
.flatMapConcat(this::mapDumpResult)
Expand Down
Expand Up @@ -81,7 +81,8 @@
*/
public final class ThingUpdater extends AbstractFSMWithStash<ThingUpdater.State, ThingUpdater.Data> {

private static final Counter INCORRECT_PATCH_UPDATE_COUNT = DittoMetrics.counter("wildcard_search_incorrect_patch_updates");
private static final Counter INCORRECT_PATCH_UPDATE_COUNT =
DittoMetrics.counter("wildcard_search_incorrect_patch_updates");
private static final Counter UPDATE_FAILURE_COUNT = DittoMetrics.counter("wildcard_search_update_failures");

private static final Duration BLOCK_NAMESPACE_SHUTDOWN_DELAY = Duration.ofMinutes(2);
Expand Down Expand Up @@ -394,6 +395,10 @@ private FSM.State<State, Data> onPolicyReferenceTag(final PolicyReferenceTag pol
log.debug("Received new Policy-Reference-Tag for thing <{}> with revision <{}>, policy-id <{}> and " +
"policy-revision <{}>: <{}>.",
thingId, thingRevision, policyId, policyRevision, policyReferenceTag.asIdentifierString());
} else {
log.info("Got policy update <{}> at revision <{}>. Previous known policy is <{}> at <{}>.",
policyReferenceTag.getPolicyTag().getEntityId(), policyReferenceTag.getPolicyTag().getRevision(),
policyId, policyRevision);
}

acknowledge(policyReferenceTag);
Expand All @@ -402,7 +407,8 @@ private FSM.State<State, Data> onPolicyReferenceTag(final PolicyReferenceTag pol
final var policyIdOfTag = policyTag.getEntityId();
if (!Objects.equals(policyId, policyIdOfTag) || policyRevision < policyTag.getRevision()) {
final var newMetadata = Metadata.of(thingId, thingRevision, policyIdOfTag, policyTag.getRevision(), null)
.withUpdateReason(UpdateReason.POLICY_UPDATE);
.withUpdateReason(UpdateReason.POLICY_UPDATE)
.invalidateCaches(false, true);
return enqueue(newMetadata, data);
} else {
log.debug("Dropping <{}> because my policyId=<{}> and policyRevision=<{}>",
Expand Down

0 comments on commit 215568c

Please sign in to comment.