Skip to content

Commit

Permalink
[eclipse-ditto#964] add comment; rename test methods; always persist …
Browse files Browse the repository at this point in the history
…empty event for open connections on ping with empty journal tag.

Empty event should be persisted regardless of the "alwaysAlive" flag
because the "alwaysAlive" flag is set at the end of recovery to
"isDesiredStateOpen()".

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Mar 3, 2021
1 parent e0279b2 commit 00ec59b
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,11 @@ protected void processPingCommand(final PingCommand ping) {
.map(JsonValue::asString)
.orElse(null);

if (journalTag != null && journalTag.isEmpty() && isDesiredStateOpen() && !alwaysAlive) {
if (journalTag != null && journalTag.isEmpty() && isDesiredStateOpen()) {
// persistence actor was sent a "ping" with empty journal tag:
// build in adding the "always-alive" tag here by persisting an "empty" event which is just tagged to be
// "always alive"
// "always alive". Stop persisting the empty event once every open connection has a tagged event, when
// the persistence ping actor will have a non-empty journal tag configured.
final EmptyEvent
emptyEvent = new EmptyEvent(entityId, EmptyEvent.EFFECT_ALWAYS_ALIVE, getRevisionNumber() + 1,
DittoHeaders.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ public Source<String, NotUsed> getJournalPids(final int batchSize, final Duratio
* Retrieve all unique PIDs in journals selected by a provided {@code tag}.
* Does its best not to create long-living cursors on the database by reading {@code batchSize} events per query.
*
* @param tag the Tag name the journal entries have to contain in order to be selected.
* @param tag the Tag name the journal entries have to contain in order to be selected, or an empty string to select
* all journal entries.
* @param batchSize how many events to read in one query.
* @param maxIdleTime how long the stream is allowed to idle without sending any element. Bounds the number of
* retries with exponential back-off.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,8 @@ protected AbstractShardedPersistenceActor(final I entityId, final SnapshotAdapte
entity = getEventStrategy().handle((E) event, entity, getRevisionNumber());
onEntityModified();
})
.match(EmptyEvent.class, event -> {
log.withCorrelationId(event).debug("Recovered EmptyEvent: <{}>", event);
})
.match(EmptyEvent.class,
event -> log.withCorrelationId(event).debug("Recovered EmptyEvent: <{}>", event))
.build();

handleCleanups = super.createReceive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void conversionBetweenCorrelationIdAndPersistenceIdIsOneToOne() {
}

@Test
public void testRecoverConnections() {
public void pingPersistenceActors() {
new TestKit(actorSystem) {{
final TestProbe probe = new TestProbe(actorSystem);

Expand Down Expand Up @@ -100,7 +100,7 @@ public void testRecoverConnections() {
}

@Test
public void testRecoverConnectionsIsNotStartedTwice() {
public void testPersistenceActorIsNotPingedTwice() {
new TestKit(actorSystem) {{
final TestProbe probe = new TestProbe(actorSystem);

Expand Down

0 comments on commit 00ec59b

Please sign in to comment.