/
IndexInitializer.java
177 lines (148 loc) · 6.9 KB
/
IndexInitializer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
/*
* Copyright (c) 2017 Bosch Software Innovations GmbH.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
*
* Contributors:
* Bosch Software Innovations GmbH - initial contribution
*/
package org.eclipse.ditto.services.utils.persistence.mongo.indices;
import static java.util.Objects.requireNonNull;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import javax.annotation.concurrent.Immutable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.reactivestreams.client.MongoDatabase;
import com.mongodb.reactivestreams.client.Success;
import akka.Done;
import akka.NotUsed;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
/**
* Initializes indexes on a MongoDB collection.
*/
@Immutable
public final class IndexInitializer {
private static final Logger LOGGER = LoggerFactory.getLogger(IndexInitializer.class);
private final Materializer materializer;
private final IndexOperations indexOperations;
private IndexInitializer(final MongoDatabase db, final Materializer materializer) {
this.materializer = materializer;
this.indexOperations = IndexOperations.of(db);
}
/**
* Returns a new {@code MongoIndexInitializer}.
*
* @param db the mongo client to use for creating indexes.
* @param materializer the materializer for akka streams.
* @return the index initializer.
*/
public static IndexInitializer of(final MongoDatabase db,
final Materializer materializer) {
requireNonNull(db);
requireNonNull(materializer);
return new IndexInitializer(db, materializer);
}
/**
* Creates all given indexes on the specified collection and deletes all not defined indices (except of the default
* "_id_" index).
*
* @param collectionName the collection on which the indexes will be initialized.
* @param indices the indexes to be used for initialization.
* @return a completion stage that completes successfully if the indexes are initialized, otherwise it will raise an
* exception.
*/
public CompletionStage<Void> initialize(final String collectionName, final List<Index> indices) {
requireNonNull(collectionName);
requireNonNull(indices);
LOGGER.info("Starting index-initialization with defined indices: {}", indices);
return createNonExistingIndices(collectionName, indices)
.thenCompose(done -> dropUndefinedIndices(collectionName, indices))
.<Void>thenApply(unused -> {
LOGGER.info("Index-Initialization was successful.");
return null;
})
.<Void>exceptionally(t -> {
LOGGER.error("Index-Initialization failed.", t);
return null;
});
}
private CompletionStage<Done> createNonExistingIndices(final String collectionName,
final List<Index> indices) {
if (indices.isEmpty()) {
LOGGER.warn("No indices are defined, thus no indices are created.");
return CompletableFuture.completedFuture(Done.getInstance());
}
return indexOperations.getIndicesExceptDefaultIndex(collectionName)
.flatMapConcat(
existingIndices -> {
LOGGER.info("Create non-existing indices: Existing indices are: {}", existingIndices);
final List<Index> indicesToCreate = excludeIndices(indices, existingIndices);
LOGGER.info("Indices to create are: {}", indicesToCreate);
return createIndices(collectionName, indicesToCreate);
})
.runWith(Sink.ignore(), materializer);
}
private Source<Success, NotUsed> createIndices(final String collectionName, final List<Index> indices) {
if (indices.isEmpty()) {
return Source.empty();
}
return Source.from(indices)
.flatMapConcat(index -> createIndex(collectionName, index));
}
private Source<Success, NotUsed> createIndex(final String collectionName, final Index index) {
LOGGER.info("Creating index: {}", index);
return indexOperations.createIndex(collectionName, index);
}
private CompletionStage<Done> dropUndefinedIndices(final String collectionName, final List<Index> definedIndices) {
return getIndicesExceptDefaultIndex(collectionName)
.flatMapConcat(existingIndices -> {
LOGGER.info("Drop undefined indices - Existing indices are: {}", existingIndices);
final List<String> indicesToDrop = getUndefinedIndexNames(existingIndices, definedIndices);
LOGGER.info("Dropping undefined indices: {}", indicesToDrop);
return dropIndices(collectionName, indicesToDrop);
})
.runWith(Sink.ignore(), materializer);
}
private static List<String> getUndefinedIndexNames(final Collection<Index> allIndices,
final Collection<Index> definedIndices) {
return excludeIndices(allIndices, definedIndices).stream()
.map(Index::getName)
.collect(Collectors.toList());
}
private Source<Success, NotUsed> dropIndices(final String collectionName, final List<String> indices) {
if (indices.isEmpty()) {
return Source.empty();
}
return Source.from(indices)
.flatMapConcat(index -> dropIndex(collectionName, index));
}
private Source<Success, NotUsed> dropIndex(final String collectionName, final String indexName) {
LOGGER.info("Dropping index: {}", indexName);
return indexOperations.dropIndex(collectionName, indexName);
}
private Source<List<Index>, NotUsed> getIndicesExceptDefaultIndex(final String collectionName) {
return indexOperations.getIndicesExceptDefaultIndex(collectionName);
}
private static Set<String> extractIndexNames(final Collection<Index> indices) {
return indices.stream()
.map(Index::getName)
.collect(Collectors.toSet());
}
private static List<Index> excludeIndices(final Collection<Index> allIndices, final Collection<Index>
indicesToExclude) {
final Set<String> excludedIndexNames = extractIndexNames(indicesToExclude);
return allIndices.stream()
.filter(indexModel -> !excludedIndexNames.contains(indexModel.getName()))
.collect(Collectors.toList());
}
}