Skip to content

Commit

Permalink
Add aggregator address as header for policy announcements
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Aug 19, 2022
1 parent 8c915ff commit 637c826
Showing 1 changed file with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.ditto.base.model.common.DittoDuration;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.signals.Signal;
Expand All @@ -47,7 +48,9 @@
import akka.NotUsed;
import akka.actor.AbstractFSM;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.japi.pf.FSMStateFunctionBuilder;
import scala.util.Random$;

Expand Down Expand Up @@ -86,6 +89,7 @@ public final class SubjectExpiryActor extends AbstractFSM<SubjectExpiryState, No
private boolean deleted;
private Instant deleteAt;
private boolean acknowledged;
private final Address selfRemoteAddress;

@SuppressWarnings("unused")
private SubjectExpiryActor(final PolicyId policyId,
Expand Down Expand Up @@ -117,6 +121,7 @@ private SubjectExpiryActor(final PolicyId policyId,
deleted = false;
deleteAt = subject.getExpiry().map(SubjectExpiry::getTimestamp).orElseGet(Instant::now);
acknowledged = false;
selfRemoteAddress = Cluster.get(getContext().systemImpl()).selfUniqueAddress().address();
}

/**
Expand Down Expand Up @@ -469,7 +474,14 @@ private boolean handleSignalWithAckregator(final PolicyAnnouncement<?> announcem
.info("Publishing PolicyAnnouncement with ack requests: <{}>", announcement.getType());
log.withCorrelationId(announcement)
.debug("Publishing PolicyAnnouncement with ack requests: <{}>", announcement);
policyAnnouncementPub.publishWithAcks(announcement, ACK_EXTRACTOR, aggregatorActor);

final String addressSerializationFormat = aggregatorActor.path()
.toSerializationFormatWithAddress(selfRemoteAddress);
final PolicyAnnouncement<?> adjustedSignal =
announcement.setDittoHeaders(announcement.getDittoHeaders().toBuilder()
.putHeader(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), addressSerializationFormat)
.build());
policyAnnouncementPub.publishWithAcks(adjustedSignal, ACK_EXTRACTOR, aggregatorActor);

return false;
}
Expand Down

0 comments on commit 637c826

Please sign in to comment.