-
Notifications
You must be signed in to change notification settings - Fork 215
/
SearchUpdateMapper.java
155 lines (136 loc) · 6.15 KB
/
SearchUpdateMapper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.thingsearch.service.persistence.write.streaming;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import org.bson.BsonDocument;
import org.eclipse.ditto.internal.utils.akka.AkkaClassLoader;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.thingsearch.service.common.config.DittoSearchConfig;
import org.eclipse.ditto.thingsearch.service.common.config.SearchConfig;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
import org.slf4j.Logger;
import com.mongodb.client.model.WriteModel;
import akka.NotUsed;
import akka.actor.AbstractExtensionId;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import akka.actor.Extension;
import akka.japi.Pair;
import akka.stream.javadsl.Source;
/**
* Search Update Mapper to be loaded by reflection.
* Can be used as an extension point to use custom map search updates.
* Implementations MUST have a public constructor taking an actorSystem as argument.
*
* @since 2.1.0
*/
public abstract class SearchUpdateMapper implements Extension {
private static final ExtensionId EXTENSION_ID = new ExtensionId();
protected final ActorSystem actorSystem;
protected SearchUpdateMapper(final ActorSystem actorSystem) {
this.actorSystem = actorSystem;
}
/**
* Gets the write models of the search updates and processes them.
*
* @param writeModels the write models.
* @return Ditto write models together with their processed MongoDB write models.
*/
public abstract Source<List<Pair<AbstractWriteModel, WriteModel<BsonDocument>>>, NotUsed>
processWriteModels(final List<AbstractWriteModel> writeModels);
/**
* Load a {@code SearchUpdateListener} dynamically according to the search configuration.
*
* @param actorSystem The actor system in which to load the listener.
* @return The listener.
*/
public static SearchUpdateMapper get(final ActorSystem actorSystem) {
return EXTENSION_ID.get(actorSystem);
}
/**
* Convert a write model to an incremental update model.
*
* @param model the write model.
* @param logger the logger.
* @return a singleton list of write model together with its update document, or an empty list if there is no
* change.
*/
protected static CompletionStage<List<Pair<AbstractWriteModel, WriteModel<BsonDocument>>>>
toIncrementalMongo(final AbstractWriteModel model, final Logger logger) {
return model.toIncrementalMongo()
.thenApply(mongoWriteModelOpt -> {
if (mongoWriteModelOpt.isEmpty()) {
logger.debug("Write model is unchanged, skipping update: <{}>", model);
model.getMetadata().sendWeakAck(null);
model.getMetadata().sendBulkWriteCompleteToOrigin(null);
return List.<Pair<AbstractWriteModel, WriteModel<BsonDocument>>>of();
} else {
ConsistencyLag.startS5MongoBulkWrite(model.getMetadata());
final var result = mongoWriteModelOpt.orElseThrow();
logger.debug("MongoWriteModel={}", result);
return List.of(Pair.create(model, result));
}
})
.handle((result, error) -> {
if (result != null) {
return result;
} else {
logger.error("Failed to compute write model " + model, error);
try {
model.getMetadata().getTimers().forEach(StartedTimer::stop);
} catch (final Exception e) {
// tolerate stopping stopped timers
}
return List.of();
}
});
}
/**
* Convert a list of write models to incremental update models.
*
* @param models the list of write models.
* @param logger the logger.
* @return a list of write models together with their update documents.
*/
protected static CompletionStage<List<Pair<AbstractWriteModel, WriteModel<BsonDocument>>>> toIncrementalMongo(
final Collection<AbstractWriteModel> models, final Logger logger) {
final var writeModelFutures = models.stream()
.map(model -> toIncrementalMongo(model, logger))
.map(CompletionStage::toCompletableFuture)
.collect(Collectors.toList());
final var allFutures = CompletableFuture.allOf(writeModelFutures.toArray(CompletableFuture[]::new));
return allFutures.thenApply(aVoid ->
writeModelFutures.stream().flatMap(future -> future.join().stream()).collect(Collectors.toList())
);
}
/**
* ID of the actor system extension to validate the {@code SearchUpdateListener}.
*/
private static final class ExtensionId extends AbstractExtensionId<SearchUpdateMapper> {
@Override
public SearchUpdateMapper createExtension(final ExtendedActorSystem system) {
final SearchConfig searchConfig =
DittoSearchConfig.of(DefaultScopedConfig.dittoScoped(
system.settings().config()));
return AkkaClassLoader.instantiate(system, SearchUpdateMapper.class,
searchConfig.getSearchUpdateMapperImplementation(),
List.of(ActorSystem.class),
List.of(system));
}
}
}