/
MongoHealthChecker.java
executable file
·143 lines (123 loc) · 5.48 KB
/
MongoHealthChecker.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/*
* Copyright (c) 2017-2018 Bosch Software Innovations GmbH.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.utils.persistence.mongo;
import static com.mongodb.client.model.Filters.eq;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import org.bson.Document;
import org.eclipse.ditto.services.utils.health.AbstractHealthCheckingActor;
import org.eclipse.ditto.services.utils.health.StatusInfo;
import org.eclipse.ditto.services.utils.health.mongo.RetrieveMongoStatusResponse;
import org.eclipse.ditto.services.utils.persistence.mongo.config.MongoDbConfig;
import com.mongodb.ReadPreference;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.reactivestreams.client.MongoCollection;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
/**
* Actor for handling calls to the mongodb.
*/
public final class MongoHealthChecker extends AbstractHealthCheckingActor {
private static final String TEST_COLLECTION_NAME = "test";
private static final String ID_FIELD = "_id";
private static final int HEALTH_CHECK_MAX_POOL_SIZE = 2;
private final DittoMongoClient mongoClient;
private final MongoCollection<Document> collection;
private final ActorMaterializer materializer;
private MongoHealthChecker(final DittoMongoClient theDittoMongoClient) {
mongoClient = theDittoMongoClient;
/*
* It's important to have the read preferences to primary preferred because the replication is to slow to retrieve
* the inserted document from a secondary directly after inserting it on the primary.
*/
collection = mongoClient.getCollection(TEST_COLLECTION_NAME)
.withReadPreference(ReadPreference.primaryPreferred());
materializer = ActorMaterializer.create(getContext());
}
/**
* Close the Mongo client associated with this health checker, if any. Subsequent health checks fail for sure.
*/
@Override
public void postStop() {
if (mongoClient != null) {
mongoClient.close();
}
}
/**
* Creates Akka configuration object Props for this MongoClientActor.
*
* @param mongoDbConfig the MongoDB configuration settings.
* @return the Akka configuration Props object
* @throws NullPointerException if {@code mongoDbConfig} is {@code null}.
*/
public static Props props(final MongoDbConfig mongoDbConfig) {
return Props.create(MongoHealthChecker.class, () -> {
final DittoMongoClient mongoDbClient = MongoClientWrapper.getBuilder(mongoDbConfig)
.connectionPoolMaxSize(HEALTH_CHECK_MAX_POOL_SIZE)
.build();
return new MongoHealthChecker(mongoDbClient);
});
}
@Override
protected Receive matchCustomMessages() {
return ReceiveBuilder.create()
.match(RetrieveMongoStatusResponse.class, this::applyMongoStatus)
.build();
}
@Override
protected void triggerHealthRetrieval() {
generateStatusResponse().thenAccept(errorOpt -> {
final RetrieveMongoStatusResponse response;
if (errorOpt.isPresent()) {
final Throwable error = errorOpt.get();
response = new RetrieveMongoStatusResponse(false,
error.getClass().getCanonicalName() + ": " + error.getMessage());
} else {
response = new RetrieveMongoStatusResponse(true);
}
getSelf().tell(response, ActorRef.noSender());
});
}
private CompletionStage<Optional<Throwable>> generateStatusResponse() {
final String id = UUID.randomUUID().toString();
return Source.fromPublisher(collection.insertOne(new Document(ID_FIELD, id)))
.flatMapConcat(s ->
Source.fromPublisher(collection.find(eq(ID_FIELD, id))).flatMapConcat(r ->
Source.fromPublisher(collection.deleteOne(eq(ID_FIELD, id)))
.map(DeleteResult::getDeletedCount)
)
)
.runWith(Sink.seq(), materializer)
.handle((result, error) -> {
if (error != null) {
return Optional.of(error);
} else if (!Objects.equals(result, Collections.singletonList(1L))) {
final String message = "Expect 1 document inserted and deleted. Found: " + result;
return Optional.of(new IllegalStateException(message));
} else {
return Optional.empty();
}
});
}
private void applyMongoStatus(final RetrieveMongoStatusResponse statusResponse) {
final StatusInfo persistenceStatus = StatusInfo.fromStatus(
statusResponse.isAlive() ? StatusInfo.Status.UP : StatusInfo.Status.DOWN,
statusResponse.getDescription().orElse(null));
updateHealth(persistenceStatus);
}
}