Skip to content

Commit

Permalink
also persist "revision" in journal entries to make it possible to rea…
Browse files Browse the repository at this point in the history
…d when replaying events

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Sep 27, 2022
1 parent 5720136 commit 2d4c43c
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,18 @@
package org.eclipse.ditto.internal.utils.persistence.mongo;

import java.util.Set;
import java.util.function.Predicate;

import javax.annotation.Nullable;

import org.bson.BsonDocument;
import org.bson.BsonValue;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.base.model.signals.events.EventRegistry;
import org.eclipse.ditto.base.model.signals.events.EventsourcedEvent;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonParseException;
import org.eclipse.ditto.json.JsonValue;
Expand All @@ -44,10 +43,6 @@ public abstract class AbstractMongoEventAdapter<T extends Event<?>> implements E

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMongoEventAdapter.class);

private static final Predicate<JsonField> IS_REVISION = field -> field.getDefinition()
.filter(EventsourcedEvent.JsonFields.REVISION::equals)
.isPresent();

@Nullable protected final ExtendedActorSystem system;
protected final EventRegistry<T> eventRegistry;

Expand All @@ -69,11 +64,10 @@ public String manifest(final Object event) {

@Override
public Object toJournal(final Object event) {
if (event instanceof Event) {
final Event<?> theEvent = (Event<?>) event;
if (event instanceof Event<?> theEvent) {
final JsonSchemaVersion schemaVersion = theEvent.getImplementedSchemaVersion();
final JsonObject jsonObject = performToJournalMigration(
theEvent.toJson(schemaVersion, IS_REVISION.negate())
theEvent.toJson(schemaVersion, FieldType.regularOrSpecial())
);
final BsonDocument bson = DittoBsonJson.getInstance().parse(jsonObject);
final Set<String> tags = theEvent.getDittoHeaders().getJournalTags();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,17 @@
*/
package org.eclipse.ditto.things.service.persistence.serializer;

import java.util.Objects;
import java.util.function.Predicate;

import javax.annotation.Nullable;

import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.policies.model.Policy;
import org.eclipse.ditto.internal.utils.persistence.mongo.AbstractMongoEventAdapter;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoBsonJson;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.base.model.signals.events.EventsourcedEvent;
import org.eclipse.ditto.base.model.signals.events.GlobalEventRegistry;
import org.eclipse.ditto.internal.utils.persistence.mongo.AbstractMongoEventAdapter;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoBsonJson;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.policies.model.Policy;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;

import akka.actor.ExtendedActorSystem;
Expand All @@ -38,10 +33,6 @@
*/
public final class ThingMongoEventAdapter extends AbstractMongoEventAdapter<ThingEvent<?>> {

private static final Predicate<JsonField> IS_REVISION = field -> field.getDefinition()
.map(definition -> Objects.equals(definition, EventsourcedEvent.JsonFields.REVISION))
.orElse(false);

private static final JsonPointer POLICY_IN_THING_EVENT_PAYLOAD = ThingEvent.JsonFields.THING.getPointer()
.append(JsonPointer.of(Policy.INLINED_FIELD_NAME));

Expand All @@ -57,11 +48,10 @@ protected JsonObject performToJournalMigration(final JsonObject jsonObject) {

@Override
public Object toJournal(final Object event) {
if (event instanceof Event) {
final Event<?> theEvent = (Event<?>) event;
if (event instanceof Event<?> theEvent) {
final JsonSchemaVersion schemaVersion = theEvent.getImplementedSchemaVersion();
final JsonObject jsonObject =
theEvent.toJson(schemaVersion, IS_REVISION.negate().and(FieldType.regularOrSpecial()))
theEvent.toJson(schemaVersion, FieldType.regularOrSpecial())
// remove the policy entries from thing event payload
.remove(POLICY_IN_THING_EVENT_PAYLOAD);
final DittoBsonJson dittoBsonJson = DittoBsonJson.getInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ public void toJournalReturnsBsonDocument() {
.setAttributes(Attributes.newBuilder().set("hello", "cloud").build())
.build();
final ThingCreated thingCreated =
ThingCreated.of(thing, 0, Instant.parse("2021-02-24T14:17:37.581679843Z"), DittoHeaders.empty(), null);
ThingCreated.of(thing, 1L, Instant.parse("2021-02-24T14:17:37.581679843Z"), DittoHeaders.empty(), null);

final String journalEntry = "{\n" +
" \"type\" : \"things.events:thingCreated\",\n" +
" \"_timestamp\" : \"2021-02-24T14:17:37.581679843Z\",\n" +
" \"revision\" : 1,\n" +
" \"_metadata\" : null,\n" +
" \"thingId\" : \"pap.th.tMJyAjktUVP:YlmZXbTQ\",\n" +
" \"thing\" : {\n" +
Expand Down

0 comments on commit 2d4c43c

Please sign in to comment.