-
Notifications
You must be signed in to change notification settings - Fork 214
/
ThingsSnapshotTestHelper.java
executable file
·168 lines (144 loc) · 6.86 KB
/
ThingsSnapshotTestHelper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
/*
* Copyright (c) 2017 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.things.service.persistence.testhelper;
import static java.util.Objects.requireNonNull;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.bson.BsonDocument;
import org.eclipse.ditto.things.model.ThingId;
import com.typesafe.config.ConfigFactory;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.persistence.Persistence;
import org.apache.pekko.persistence.SelectedSnapshot;
import org.apache.pekko.persistence.SnapshotProtocol;
import org.apache.pekko.persistence.SnapshotSelectionCriteria;
import scala.Option;
/**
* Helper class which provides functionality for testing with Pekko persistence snapshots for the things
* services.
* Requires akka-persistence-inmemory (by com.github.dnvriend).
*
* @param <S> the domain specific datatype stored as snapshot
*/
public final class ThingsSnapshotTestHelper<S> {
private static final String SNAPSHOT_PLUGIN_ID = "pekko-contrib-mongodb-persistence-things-snapshots";
private static final int WAIT_TIMEOUT = 3;
private final Function<ThingId, String> domainIdToPersistenceId;
private final BiFunction<BsonDocument, Long, S> snapshotToDomainObject;
private final ActorRef snapshotPlugin;
/**
* Constructor.
*
* @param actorSystem the actor system to be used to find the persistence extension
* @param snapshotToDomainObject a {@link java.util.function.BiFunction} providing the snapshot and its sequence number and expecting a
* domain object
* @param domainIdToPersistenceId a {@link java.util.function.Function} providing the domain ID and expecting the matching persistence
* ID
*/
public ThingsSnapshotTestHelper(final ActorSystem actorSystem,
final BiFunction<BsonDocument, Long, S> snapshotToDomainObject,
final Function<ThingId, String> domainIdToPersistenceId) {
this.snapshotToDomainObject = requireNonNull(snapshotToDomainObject);
this.domainIdToPersistenceId = requireNonNull(domainIdToPersistenceId);
snapshotPlugin =
Persistence.get(actorSystem).snapshotStoreFor(SNAPSHOT_PLUGIN_ID, ConfigFactory.empty());
}
/**
* Gets the maximum snapshot, if any exists.
*
* @param domainId the domain ID of the snapshot
* @return an Optional containing the maximum snapshot, if any exists; an empty Optional otherwise
*/
public Optional<S> getMaxSnapshot(final ThingId domainId) {
requireNonNull(domainId);
final String persistenceId = domainIdToPersistenceId.apply(domainId);
final SelectedSnapshot maxSnapshotData = getMaxSnapshotData(persistenceId);
return Optional.ofNullable(maxSnapshotData).map(this::convertSnapshotDataToDomainObject);
}
/**
* Gets all snapshots in ascending order.
*
* @param domainId the domain ID of the snapshots
* @return the snapshots in ascending orders
*/
public List<S> getAllSnapshotsAscending(final ThingId domainId) {
requireNonNull(domainId);
final String persistenceId = domainIdToPersistenceId.apply(domainId);
final SelectedSnapshot maxSnapshotData = getMaxSnapshotData(persistenceId);
return Optional.ofNullable(maxSnapshotData)
.map(this::getAllSnapshotDataAscending)
.map(allSnapshotData ->
allSnapshotData.stream()
.map(this::convertSnapshotDataToDomainObject)
.collect(Collectors.toList()))
.map(Collections::unmodifiableList)
.orElse(Collections.emptyList());
}
private S convertSnapshotDataToDomainObject(final SelectedSnapshot snapshotData) {
final BsonDocument dbObject = (BsonDocument) snapshotData.snapshot();
return snapshotToDomainObject.apply(dbObject, snapshotData.metadata().sequenceNr());
}
private List<SelectedSnapshot> getAllSnapshotDataAscending(final SelectedSnapshot maxSnapshotData) {
// this method is a bit complicated, but I there currently is no easier way to get all snaphots
final List<SelectedSnapshot> allSnapshotData = new ArrayList<>();
allSnapshotData.add(maxSnapshotData);
SelectedSnapshot lastSnapshotData = maxSnapshotData;
while (lastSnapshotData != null) {
final SelectedSnapshot previousSnapshotData =
getMaxSnapshotDataBySequenceNumber(lastSnapshotData.metadata().persistenceId(),
lastSnapshotData.metadata().sequenceNr() - 1);
if (previousSnapshotData != null) {
allSnapshotData.add(previousSnapshotData);
}
lastSnapshotData = previousSnapshotData;
}
Collections.reverse(allSnapshotData);
return allSnapshotData;
}
private SelectedSnapshot getMaxSnapshotDataBySequenceNumber(final String persistenceId,
final long maxSequenceNumber) {
final SnapshotSelectionCriteria criteria = SnapshotSelectionCriteria.create(maxSequenceNumber, Long.MAX_VALUE);
final SnapshotProtocol.LoadSnapshot loadSnapshot = new SnapshotProtocol.LoadSnapshot(persistenceId, criteria,
maxSequenceNumber);
return convertScalaOpt(
waitForFuture(
Patterns.ask(snapshotPlugin, loadSnapshot, Duration.ofSeconds(WAIT_TIMEOUT))
.thenApply(obj -> (SnapshotProtocol.LoadSnapshotResult) obj)
.thenApply(SnapshotProtocol.LoadSnapshotResult::snapshot)
.toCompletableFuture()
), null);
}
private SelectedSnapshot getMaxSnapshotData(final String persistenceId) {
return getMaxSnapshotDataBySequenceNumber(persistenceId, Long.MAX_VALUE);
}
private <T> T convertScalaOpt(final Option<T> opt, final T defaultValue) {
return opt.isDefined() ? opt.get() : defaultValue;
}
private <T> T waitForFuture(
final java.util.concurrent.Future<T> resultFuture) {
try {
return resultFuture.get(WAIT_TIMEOUT, TimeUnit.SECONDS);
} catch (final Exception e) {
throw new IllegalStateException(e);
}
}
}