-
Notifications
You must be signed in to change notification settings - Fork 214
/
MongoHealthChecker.java
executable file
·152 lines (131 loc) · 5.79 KB
/
MongoHealthChecker.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/*
* Copyright (c) 2017 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.persistence.mongo;
import static com.mongodb.client.model.Filters.eq;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import org.bson.Document;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.health.AbstractHealthCheckingActor;
import org.eclipse.ditto.internal.utils.health.StatusInfo;
import org.eclipse.ditto.internal.utils.health.mongo.CurrentMongoStatus;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultMongoDbConfig;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.reactivestreams.client.MongoCollection;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Props;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
/**
* Actor for handling calls to the mongodb.
*/
public final class MongoHealthChecker extends AbstractHealthCheckingActor {
private static final String TEST_COLLECTION_NAME = "test";
private static final String ID_FIELD = "_id";
private static final int HEALTH_CHECK_MAX_POOL_SIZE = 2;
private final DittoMongoClient mongoClient;
private final MongoCollection<Document> collection;
private final Materializer materializer;
private MongoHealthChecker() {
final DefaultMongoDbConfig mongoDbConfig = DefaultMongoDbConfig.of(
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config()));
mongoClient = MongoClientWrapper.getBuilder(mongoDbConfig)
.connectionPoolMinSize(0)
.connectionPoolMaxSize(HEALTH_CHECK_MAX_POOL_SIZE)
.build();
/*
* It's important to have the read preferences to primary preferred because the replication is to slow to retrieve
* the inserted document from a secondary directly after inserting it on the primary.
*/
collection = mongoClient.getCollection(TEST_COLLECTION_NAME)
.withReadPreference(ReadPreference.primary())
.withReadConcern(ReadConcern.LOCAL)
.withWriteConcern(WriteConcern.ACKNOWLEDGED);
materializer = Materializer.createMaterializer(this::getContext);
}
/**
* Close the Mongo client associated with this health checker, if any. Subsequent health checks fail for sure.
*/
@Override
public void postStop() {
if (mongoClient != null) {
mongoClient.close();
}
}
/**
* Creates Pekko configuration object Props for this MongoClientActor.
*
* @return the Pekko configuration Props object
* @throws NullPointerException if {@code mongoDbConfig} is {@code null}.
*/
public static Props props() {
return Props.create(MongoHealthChecker.class);
}
@Override
protected Receive matchCustomMessages() {
return ReceiveBuilder.create()
.match(CurrentMongoStatus.class, this::applyMongoStatus)
.build();
}
@Override
protected void triggerHealthRetrieval() {
generateStatusResponse().thenAccept(errorOpt -> {
final CurrentMongoStatus mongoStatus;
if (errorOpt.isPresent()) {
final Throwable error = errorOpt.get();
mongoStatus = new CurrentMongoStatus(false,
error.getClass().getCanonicalName() + ": " + error.getMessage());
log.error(error, error.getMessage());
} else {
mongoStatus = new CurrentMongoStatus(true);
}
getSelf().tell(mongoStatus, ActorRef.noSender());
});
}
private CompletionStage<Optional<Throwable>> generateStatusResponse() {
final String id = UUID.randomUUID().toString();
return Source.fromPublisher(collection.insertOne(new Document(ID_FIELD, id)))
.flatMapConcat(s ->
Source.fromPublisher(collection.find(eq(ID_FIELD, id))).flatMapConcat(r ->
Source.fromPublisher(collection.deleteOne(eq(ID_FIELD, id)))
.map(DeleteResult::getDeletedCount)
)
)
.runWith(Sink.seq(), materializer)
.handle((result, error) -> {
if (error != null) {
return Optional.of(error);
} else if (!Objects.equals(result, Collections.singletonList(1L))) {
final String message = "Expect 1 document inserted and deleted. Found: " + result;
log.error(message);
return Optional.of(new IllegalStateException(message));
} else {
return Optional.empty();
}
});
}
private void applyMongoStatus(final CurrentMongoStatus status) {
final StatusInfo persistenceStatus = StatusInfo.fromStatus(
status.isAlive() ? StatusInfo.Status.UP : StatusInfo.Status.DOWN,
status.getDescription().orElse(null));
updateHealth(persistenceStatus);
}
}