-
Notifications
You must be signed in to change notification settings - Fork 214
/
AbstractPersistenceStreamingActor.java
151 lines (130 loc) · 6.1 KB
/
AbstractPersistenceStreamingActor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/*
* Copyright (c) 2017 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.persistence.mongo;
import static java.util.Objects.requireNonNull;
import java.time.Duration;
import java.util.List;
import java.util.function.Function;
import org.eclipse.ditto.internal.models.streaming.BatchedEntityIdWithRevisions;
import org.eclipse.ditto.internal.models.streaming.EntityIdWithRevision;
import org.eclipse.ditto.internal.models.streaming.SudoStreamPids;
import org.eclipse.ditto.internal.utils.pekko.streaming.AbstractStreamingActor;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultMongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.MongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.PidWithSeqNr;
import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;
import org.apache.pekko.NotUsed;
import org.apache.pekko.stream.javadsl.Source;
/**
* Abstract implementation of an Actor that streams information about persisted entities modified in a time window in
* the past.
*
* @param <T> type of the elements.
*/
@AllValuesAreNonnullByDefault
public abstract class AbstractPersistenceStreamingActor<T extends EntityIdWithRevision<?>>
extends AbstractStreamingActor<SudoStreamPids, T> {
private final Function<PidWithSeqNr, T> entityMapper;
private final Function<EntityIdWithRevision<?>, PidWithSeqNr> entityUnmapper;
private final DittoMongoClient mongoClient;
private final MongoReadJournal readJournal;
/**
* Constructor.
*
* @param entityMapper the mapper used to map {@link org.eclipse.ditto.internal.utils.persistence.mongo.streaming.PidWithSeqNr}
* to {@code T}. The resulting entity will be streamed to the recipient actor.
* @param entityUnmapper the mapper used to map elements back to PidWithSeqNr for stream resumption.
*/
protected AbstractPersistenceStreamingActor(final Function<PidWithSeqNr, T> entityMapper,
final Function<EntityIdWithRevision<?>, PidWithSeqNr> entityUnmapper) {
this.entityMapper = requireNonNull(entityMapper);
this.entityUnmapper = entityUnmapper;
final var config = getContext().getSystem().settings().config();
final MongoDbConfig mongoDbConfig =
DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config));
mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);
readJournal = MongoReadJournal.newInstance(config, mongoClient, getContext().getSystem());
}
/**
* Constructor for tests.
*
* @param entityMapper the mapper used to map {@link org.eclipse.ditto.internal.utils.persistence.mongo.streaming.PidWithSeqNr}
* to {@code T}. The resulting entity will be streamed to the recipient actor.
* @param entityUnmapper the mapper used to map elements back to PidWithSeqNr for stream resumption.
* @param readJournal the ReadJournal to use instead of creating one in the non-test constructor.
*/
protected AbstractPersistenceStreamingActor(final Function<PidWithSeqNr, T> entityMapper,
final Function<EntityIdWithRevision<?>, PidWithSeqNr> entityUnmapper,
final MongoReadJournal readJournal) {
this.entityMapper = requireNonNull(entityMapper);
this.entityUnmapper = entityUnmapper;
final var config = getContext().getSystem().settings().config();
final MongoDbConfig mongoDbConfig =
DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config));
mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);
this.readJournal = readJournal;
}
@Override
public void postStop() throws Exception {
mongoClient.close();
super.postStop();
}
/**
* Get the class of the elements.
*
* @return the class of the elements.
*/
protected abstract Class<T> getElementClass();
@Override
protected final Class<SudoStreamPids> getCommandClass() {
return SudoStreamPids.class;
}
@Override
protected int getBurst(final SudoStreamPids command) {
return command.getBurst();
}
@Override
protected Duration getInitialTimeout(final SudoStreamPids command) {
return Duration.ofMillis(command.getTimeoutMillis());
}
@Override
protected Duration getIdleTimeout(final SudoStreamPids command) {
return Duration.ofMillis(command.getTimeoutMillis());
}
@Override
protected Object batchMessages(final List<T> elements) {
return BatchedEntityIdWithRevisions.of(getElementClass(), elements);
}
@Override
protected final Source<T, NotUsed> createSource(final SudoStreamPids command) {
log.info("Starting stream for <{}>", command);
final var maxIdleTime = Duration.ofMillis(command.getTimeoutMillis());
final int batchSize = command.getBurst() * 5;
final Source<String, NotUsed> pidSource;
if (command.hasNonEmptyLowerBound()) {
// resume from lower bound
final var pidWithSeqNr = entityUnmapper.apply(command.getLowerBound());
pidSource =
readJournal.getJournalPidsAbove(pidWithSeqNr.getPersistenceId(), batchSize, materializer);
} else {
// no lower bound; read from event journals with restart-source
pidSource = readJournal.getJournalPids(batchSize, maxIdleTime, materializer);
}
return pidSource.map(pid -> mapEntity(new PidWithSeqNr(pid, 0L))).log("pid-streaming", log);
}
private T mapEntity(final PidWithSeqNr pidWithSeqNr) {
return entityMapper.apply(pidWithSeqNr);
}
}