Skip to content

Commit

Permalink
Make deleted snapshots empty.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Sep 5, 2021
1 parent abfe3fe commit 6ee8904
Show file tree
Hide file tree
Showing 11 changed files with 206 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.persistence;

import org.eclipse.ditto.json.JsonObject;
import java.util.Optional;

import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionLifecycle;
import org.eclipse.ditto.internal.utils.persistence.mongo.AbstractMongoSnapshotAdapter;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
import org.slf4j.LoggerFactory;

/**
Expand All @@ -27,6 +32,23 @@ public ConnectionMongoSnapshotAdapter() {
super(LoggerFactory.getLogger(ConnectionMongoSnapshotAdapter.class));
}

@Override
protected boolean isDeleted(final Connection snapshotEntity) {
return snapshotEntity.hasLifecycle(ConnectionLifecycle.DELETED);
}

@Override
protected JsonField getDeletedLifecycleJsonField() {
final var field = Connection.JsonFields.LIFECYCLE;
return JsonField.newInstance(field.getPointer().getRoot().orElseThrow(),
JsonValue.of(ConnectionLifecycle.DELETED.name()), field);
}

@Override
protected Optional<JsonField> getRevisionJsonField(final Connection entity) {
return Optional.empty();
}

@Override
protected Connection createJsonifiableFrom(final JsonObject jsonObject) {
return ConnectionMigrationUtil.connectionFromJsonWithMigration(jsonObject);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionConfigurationInvalidException;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectionLifecycle;
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionMongoSnapshotAdapter;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.OpenConnection;
import org.eclipse.ditto.connectivity.model.signals.events.ConnectionCreated;
import org.eclipse.ditto.connectivity.model.signals.events.ConnectionDeleted;
import org.eclipse.ditto.connectivity.model.signals.events.ConnectivityEvent;
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionMongoSnapshotAdapter;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -109,8 +112,7 @@ public void testRecoveryOfDeletedConnectionsWithoutSnapshot() {
expectTerminated(underTest);

// verify snapshot was saved with DELETED lifecycle
final Connection deletedConnection = setLifecycleDeleted(connectionCreated.getConnection());
final SnapshotOffer snapshot = getSnapshotOffer(deletedConnection, 2); // created + deleted = 2
final SnapshotOffer snapshot = getSnapshotOffer(null, 2); // created + deleted = 2
final Queue<Object> expected = new LinkedList<>(Arrays.asList(snapshot, RecoveryCompleted.getInstance()));
actorSystem.actorOf(RecoverActor.props(connectionId, getRef(), expected));
expectMsgEquals("recovered");
Expand Down Expand Up @@ -155,7 +157,7 @@ private Connection setLifecycleDeleted(final Connection connection) {
.build();
}

private SnapshotOffer getSnapshotOffer(final Connection deletedConnection, final int sequenceNr) {
private SnapshotOffer getSnapshotOffer(@Nullable final Object deletedConnection, final int sequenceNr) {
final SnapshotMetadata metadata = new SnapshotMetadata(PERSISTENCE_ID_PREFIX + connectionId, sequenceNr, 0);
return new SnapshotOffer(metadata, deletedConnection);
}
Expand Down Expand Up @@ -216,7 +218,7 @@ private void checkSnapshotOffer(final SnapshotOffer snapshotOffer) {
fail("expected sequence nr: " + expected.metadata().sequenceNr() + " but got: " +
snapshotOffer.metadata().sequenceNr());
}
if (!expected.snapshot().equals(SNAPSHOT_ADAPTER.fromSnapshotStore(snapshotOffer))) {
if (!Objects.equals(expected.snapshot(), SNAPSHOT_ADAPTER.fromSnapshotStore(snapshotOffer))) {
fail("expected: " + expected.snapshot() + " but got: " + snapshotOffer.snapshot());
}
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.persistence;

import static org.assertj.core.api.Assertions.assertThat;

import org.bson.BsonDocument;
import org.eclipse.ditto.connectivity.model.ConnectionLifecycle;
import org.eclipse.ditto.connectivity.service.messaging.TestConstants;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoBsonJson;
import org.eclipse.ditto.json.JsonObject;
import org.junit.Test;

public class ConnectionMongoSnapshotAdapterTest {

@Test
public void makeEmptySnapshotsForDeletedConnections() {
final var underTest = new ConnectionMongoSnapshotAdapter();
final var deletedSnapshot = (BsonDocument) underTest.toSnapshotStore(
TestConstants.createConnection().toBuilder().lifecycle(ConnectionLifecycle.DELETED).build());
final JsonObject snapshotJson = DittoBsonJson.getInstance().serialize(deletedSnapshot);
assertThat(snapshotJson).containsOnly(underTest.getDeletedLifecycleJsonField());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.text.MessageFormat;
import java.util.Optional;

import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

import org.bson.BsonValue;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonParseException;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.base.model.json.Jsonifiable;
import org.eclipse.ditto.internal.utils.persistence.SnapshotAdapter;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonParseException;
import org.slf4j.Logger;

import akka.persistence.SelectedSnapshot;
Expand All @@ -48,6 +49,28 @@ protected AbstractMongoSnapshotAdapter(final Logger logger) {
this.logger = logger;
}

/**
* Whether an entity is deleted.
*
* @param snapshotEntity the entity.
* @return whether it has the deleted lifecycle.
*/
protected abstract boolean isDeleted(final T snapshotEntity);

/**
* Return the snapshot JSON field for lifecycle of deleted entities.
*
* @return the empty snapshot JSON.
*/
protected abstract JsonField getDeletedLifecycleJsonField();

/**
* Return the revision JSON field if present in the entity.
*
* @return the revision JSON field.
*/
protected abstract Optional<JsonField> getRevisionJsonField(final T entity);

@Override
public Object toSnapshotStore(final T snapshotEntity) {
final JsonObject json = convertToJson(checkNotNull(snapshotEntity, "snapshot entity"));
Expand Down Expand Up @@ -89,7 +112,13 @@ public T fromSnapshotStore(final SelectedSnapshot selectedSnapshot) {
*/
protected JsonObject convertToJson(final T snapshotEntity) {
checkNotNull(snapshotEntity, "snapshot entity");
return snapshotEntity.toJson(snapshotEntity.getImplementedSchemaVersion(), FieldType.all());
if (isDeleted(snapshotEntity)) {
final var builder = JsonObject.newBuilder().set(getDeletedLifecycleJsonField());
getRevisionJsonField(snapshotEntity).ifPresent(builder::set);
return builder.build();
} else {
return snapshotEntity.toJson(snapshotEntity.getImplementedSchemaVersion(), FieldType.all());
}
}

/**
Expand Down Expand Up @@ -132,7 +161,14 @@ private static JsonObject convertToJson(final BsonValue bsonValue) {
@Nullable
private T tryToCreateJsonifiableFrom(final JsonObject jsonObject) {
try {
return createJsonifiableFrom(jsonObject);
final var deletedLifecycleField = getDeletedLifecycleJsonField();
if (jsonObject.getValue(deletedLifecycleField.getKey())
.filter(deletedLifecycleField.getValue()::equals)
.isPresent()) {
return null; // entity is deleted
} else {
return createJsonifiableFrom(jsonObject);
}
} catch (final JsonParseException | DittoRuntimeException e) {
final String pattern = "Failed to deserialize JSON <{0}>!";
logger.error(MessageFormat.format(pattern, jsonObject), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public static Policy fromJson(final JsonObject jsonObject) {
.map(ImmutablePolicy::tryToParseModified)
.orElse(null);

final JsonObject readEntries = jsonObject.getValueOrThrow(JsonFields.ENTRIES);
final JsonObject readEntries = jsonObject.getValue(JsonFields.ENTRIES).orElseGet(JsonObject::empty);

final Function<JsonField, PolicyEntry> toPolicyEntry = jsonField -> {
final JsonValue jsonValue = jsonField.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@
*/
package org.eclipse.ditto.policies.service.persistence.serializer;

import java.util.Optional;

import javax.annotation.concurrent.ThreadSafe;

import org.eclipse.ditto.internal.utils.persistence.mongo.AbstractMongoSnapshotAdapter;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.policies.model.PoliciesModelFactory;
import org.eclipse.ditto.policies.model.Policy;
import org.eclipse.ditto.internal.utils.persistence.mongo.AbstractMongoSnapshotAdapter;
import org.eclipse.ditto.policies.model.PolicyLifecycle;
import org.slf4j.LoggerFactory;

/**
Expand All @@ -34,6 +39,25 @@ public PolicyMongoSnapshotAdapter() {
super(LoggerFactory.getLogger(PolicyMongoSnapshotAdapter.class));
}

@Override
protected boolean isDeleted(final Policy snapshotEntity) {
return snapshotEntity.hasLifecycle(PolicyLifecycle.DELETED);
}

@Override
protected JsonField getDeletedLifecycleJsonField() {
final var field = Policy.JsonFields.LIFECYCLE;
return JsonField.newInstance(field.getPointer().getRoot().orElseThrow(),
JsonValue.of(PolicyLifecycle.DELETED.name()), field);
}

@Override
protected Optional<JsonField> getRevisionJsonField(final Policy entity) {
final var field = Policy.JsonFields.REVISION;
return entity.getRevision().map(revision ->
JsonField.newInstance(field.getPointer().getRoot().orElseThrow(), JsonValue.of(revision.toLong())));
}

@Override
protected Policy createJsonifiableFrom(final JsonObject jsonObject) {
return PoliciesModelFactory.newPolicy(jsonObject);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,22 @@
import java.util.function.BiFunction;

import org.bson.BsonDocument;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.events.AbstractEventsourcedEvent;
import org.eclipse.ditto.base.model.signals.events.EventsourcedEvent;
import org.eclipse.ditto.internal.utils.persistence.SnapshotAdapter;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoBsonJson;
import org.eclipse.ditto.internal.utils.pubsub.DistributedPub;
import org.eclipse.ditto.internal.utils.test.Retry;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.policies.model.PoliciesModelFactory;
import org.eclipse.ditto.policies.model.Policy;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.policies.model.PolicyLifecycle;
import org.eclipse.ditto.policies.model.PolicyRevision;
import org.eclipse.ditto.policies.service.persistence.serializer.DefaultPolicyMongoEventAdapter;
import org.eclipse.ditto.policies.service.persistence.serializer.PolicyMongoSnapshotAdapter;
import org.eclipse.ditto.policies.service.persistence.testhelper.Assertions;
import org.eclipse.ditto.policies.service.persistence.testhelper.PoliciesJournalTestHelper;
import org.eclipse.ditto.policies.service.persistence.testhelper.PoliciesSnapshotTestHelper;
import org.eclipse.ditto.internal.utils.persistence.SnapshotAdapter;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoBsonJson;
import org.eclipse.ditto.internal.utils.pubsub.DistributedPub;
import org.eclipse.ditto.internal.utils.test.Retry;
import org.eclipse.ditto.policies.model.signals.announcements.PolicyAnnouncement;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.policies.model.signals.commands.exceptions.PolicyNotAccessibleException;
import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicy;
import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicyResponse;
Expand All @@ -54,11 +51,14 @@
import org.eclipse.ditto.policies.model.signals.commands.modify.ModifyPolicyResponse;
import org.eclipse.ditto.policies.model.signals.commands.query.RetrievePolicy;
import org.eclipse.ditto.policies.model.signals.commands.query.RetrievePolicyResponse;
import org.eclipse.ditto.base.model.signals.events.AbstractEventsourcedEvent;
import org.eclipse.ditto.base.model.signals.events.EventsourcedEvent;
import org.eclipse.ditto.policies.model.signals.events.PolicyCreated;
import org.eclipse.ditto.policies.model.signals.events.PolicyDeleted;
import org.eclipse.ditto.policies.model.signals.events.PolicyModified;
import org.eclipse.ditto.policies.service.persistence.serializer.DefaultPolicyMongoEventAdapter;
import org.eclipse.ditto.policies.service.persistence.serializer.PolicyMongoSnapshotAdapter;
import org.eclipse.ditto.policies.service.persistence.testhelper.Assertions;
import org.eclipse.ditto.policies.service.persistence.testhelper.PoliciesJournalTestHelper;
import org.eclipse.ditto.policies.service.persistence.testhelper.PoliciesSnapshotTestHelper;
import org.junit.Test;
import org.mockito.Mockito;

Expand All @@ -85,7 +85,8 @@ public final class PolicyPersistenceActorSnapshottingTest extends PersistenceAct
private DefaultPolicyMongoEventAdapter eventAdapter;
private PoliciesJournalTestHelper<EventsourcedEvent<?>> journalTestHelper;
private PoliciesSnapshotTestHelper<Policy> snapshotTestHelper;
private Map<Class<? extends Command<?>>, BiFunction<Command<?>, Long, EventsourcedEvent<?>>> commandToEventMapperRegistry;
private Map<Class<? extends Command<?>>, BiFunction<Command<?>, Long, EventsourcedEvent<?>>>
commandToEventMapperRegistry;
private DistributedPub<PolicyAnnouncement<?>> policyAnnouncementPub = Mockito.mock(DistributedPub.class);

@Override
Expand Down Expand Up @@ -166,7 +167,7 @@ public void deletedPolicyIsSnapshotWithCorrectDataAndCanBeRecreated() {
underTest.tell(deletePolicy, getRef());
expectMsgEquals(DeletePolicyResponse.of(policyId, dittoHeadersV2));

final Policy expectedDeletedSnapshot = policyCreated.toBuilder()
final Policy expectedDeletedSnapshot = PoliciesModelFactory.newPolicyBuilder()
.setRevision(2)
.setLifecycle(PolicyLifecycle.DELETED)
.build();
Expand Down Expand Up @@ -418,7 +419,8 @@ private ActorRef createPersistenceActorFor(final PolicyId policyId) {

private EventsourcedEvent<?> toEvent(final Command<?> command, final long revision) {
final Class<? extends Command> clazz = command.getClass();
final BiFunction<Command<?>, Long, EventsourcedEvent<?>> commandToEventFunction = commandToEventMapperRegistry.get(clazz);
final BiFunction<Command<?>, Long, EventsourcedEvent<?>> commandToEventFunction =
commandToEventMapperRegistry.get(clazz);
if (commandToEventFunction == null) {
throw new UnsupportedOperationException("Mapping not yet implemented for type: " + clazz);
}
Expand Down
Loading

0 comments on commit 6ee8904

Please sign in to comment.