Skip to content

Commit

Permalink
don't try to publish ExpiryAnnouncements for new or deleted policies …
Browse files Browse the repository at this point in the history
…(optimization)

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed May 31, 2021
1 parent b50e500 commit b11bff2
Showing 1 changed file with 35 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,7 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.policies.model.Label;
import org.eclipse.ditto.policies.model.Policy;
import org.eclipse.ditto.policies.model.PolicyEntry;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.policies.model.PolicyLifecycle;
import org.eclipse.ditto.policies.model.Subject;
import org.eclipse.ditto.policies.model.SubjectAnnouncement;
import org.eclipse.ditto.policies.model.SubjectExpiry;
import org.eclipse.ditto.policies.model.SubjectId;
import org.eclipse.ditto.policies.model.Subjects;
import org.eclipse.ditto.policies.api.PolicyTag;
import org.eclipse.ditto.policies.service.common.config.DittoPoliciesConfig;
import org.eclipse.ditto.policies.service.common.config.PolicyConfig;
import org.eclipse.ditto.policies.service.persistence.actors.strategies.commands.PolicyCommandStrategies;
import org.eclipse.ditto.policies.service.persistence.actors.strategies.events.PolicyEventStrategies;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.persistence.SnapshotAdapter;
Expand All @@ -58,12 +44,26 @@
import org.eclipse.ditto.internal.utils.persistentactors.commands.DefaultContext;
import org.eclipse.ditto.internal.utils.persistentactors.events.EventStrategy;
import org.eclipse.ditto.internal.utils.pubsub.DistributedPub;
import org.eclipse.ditto.policies.api.PolicyTag;
import org.eclipse.ditto.policies.model.Label;
import org.eclipse.ditto.policies.model.Policy;
import org.eclipse.ditto.policies.model.PolicyEntry;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.policies.model.PolicyLifecycle;
import org.eclipse.ditto.policies.model.Subject;
import org.eclipse.ditto.policies.model.SubjectAnnouncement;
import org.eclipse.ditto.policies.model.SubjectExpiry;
import org.eclipse.ditto.policies.model.SubjectId;
import org.eclipse.ditto.policies.model.Subjects;
import org.eclipse.ditto.policies.model.signals.announcements.PolicyAnnouncement;
import org.eclipse.ditto.policies.model.signals.announcements.SubjectDeletionAnnouncement;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.policies.model.signals.commands.exceptions.PolicyNotAccessibleException;
import org.eclipse.ditto.policies.model.signals.events.PolicyEvent;
import org.eclipse.ditto.policies.model.signals.events.SubjectDeleted;
import org.eclipse.ditto.policies.service.common.config.DittoPoliciesConfig;
import org.eclipse.ditto.policies.service.common.config.PolicyConfig;
import org.eclipse.ditto.policies.service.persistence.actors.strategies.commands.PolicyCommandStrategies;
import org.eclipse.ditto.policies.service.persistence.actors.strategies.events.PolicyEventStrategies;

import akka.actor.ActorRef;
import akka.actor.Props;
Expand Down Expand Up @@ -424,20 +424,25 @@ private void announceSubjectDeletion(@Nullable final Policy previousPolicy, @Nul
private void sendPastDueAnnouncementsOfNewSubjects(@Nullable final Policy previousPolicy,
@Nullable final Policy nextPolicy) {

final Set<Subject> previousSubjectIds =
streamAndFlatMapSubjects(previousPolicy, Optional::of).collect(Collectors.toSet());
final Stream<Subject> newSubjectsWithPastDueAnnouncements = streamAndFlatMapSubjects(nextPolicy,
subject -> {
final var pastDue = getAnnouncementInstant(subject)
.filter(instant -> !lastAnnouncement.isBefore(instant))
.isPresent();
if (pastDue && !previousSubjectIds.contains(subject)) {
return Optional.of(subject);
} else {
return Optional.empty();
}
});
publishExpiryAnnouncementsByTimestamp(newSubjectsWithPastDueAnnouncements);
// this only makes sense for previously existing policies:
if (null != previousPolicy &&
previousPolicy.getLifecycle().filter(PolicyLifecycle.DELETED::equals).isPresent()) {

final Set<Subject> previousSubjectIds =
streamAndFlatMapSubjects(previousPolicy, Optional::of).collect(Collectors.toSet());
final Stream<Subject> newSubjectsWithPastDueAnnouncements = streamAndFlatMapSubjects(nextPolicy,
subject -> {
final var pastDue = getAnnouncementInstant(subject)
.filter(instant -> !lastAnnouncement.isBefore(instant))
.isPresent();
if (pastDue && !previousSubjectIds.contains(subject)) {
return Optional.of(subject);
} else {
return Optional.empty();
}
});
publishExpiryAnnouncementsByTimestamp(newSubjectsWithPastDueAnnouncements);
}
}

private static Duration truncateToOneDay(final Duration duration) {
Expand Down

0 comments on commit b11bff2

Please sign in to comment.