diff --git a/services/thingsearch/persistence/src/main/java/org/eclipse/ditto/services/thingsearch/persistence/read/MongoThingsSearchPersistence.java b/services/thingsearch/persistence/src/main/java/org/eclipse/ditto/services/thingsearch/persistence/read/MongoThingsSearchPersistence.java index e5b7e11337..c4f921c324 100644 --- a/services/thingsearch/persistence/src/main/java/org/eclipse/ditto/services/thingsearch/persistence/read/MongoThingsSearchPersistence.java +++ b/services/thingsearch/persistence/src/main/java/org/eclipse/ditto/services/thingsearch/persistence/read/MongoThingsSearchPersistence.java @@ -23,9 +23,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; -import org.bson.BsonBoolean; import org.bson.BsonDocument; -import org.bson.BsonInt32; import org.bson.BsonNull; import org.bson.Document; import org.bson.conversions.Bson; @@ -48,7 +46,6 @@ import com.mongodb.client.model.CountOptions; import com.mongodb.reactivestreams.client.AggregatePublisher; import com.mongodb.reactivestreams.client.MongoCollection; -import com.mongodb.reactivestreams.client.Success; import akka.NotUsed; import akka.actor.ActorSystem; @@ -56,7 +53,6 @@ import akka.event.LoggingAdapter; import akka.japi.pf.PFBuilder; import akka.stream.ActorMaterializer; -import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import scala.PartialFunction; @@ -76,7 +72,6 @@ public class MongoThingsSearchPersistence implements ThingsSearchPersistence { private final ActorMaterializer materializer; private final IndexInitializer indexInitializer; private final Duration maxQueryTime; - private final MongoClientWrapper clientWrapper; /** * Initializes the things search persistence with a passed in {@code persistence}. @@ -85,7 +80,6 @@ public class MongoThingsSearchPersistence implements ThingsSearchPersistence { * @param actorSystem the Akka ActorSystem. */ public MongoThingsSearchPersistence(final MongoClientWrapper clientWrapper, final ActorSystem actorSystem) { - this.clientWrapper = clientWrapper; collection = clientWrapper.getDatabase().getCollection(PersistenceConstants.THINGS_COLLECTION_NAME); log = Logging.getLogger(actorSystem, getClass()); materializer = ActorMaterializer.create(actorSystem); @@ -95,17 +89,7 @@ public MongoThingsSearchPersistence(final MongoClientWrapper clientWrapper, fina @Override public CompletionStage initializeIndices() { - // do not fail if index key too long - return failIndexKeyTooLong(false) - .mapAsync(1, success -> - indexInitializer.initialize(PersistenceConstants.THINGS_COLLECTION_NAME, Indices.Things.all()) - .thenApply(nullValue -> true)) - .runWith(Sink.ignore(), materializer) - .exceptionally(t -> { - log.error(t, "Index-Initialization failed."); - return null; - }) - .thenApply(notNull -> null); + return indexInitializer.initialize(PersistenceConstants.THINGS_COLLECTION_NAME, Indices.Things.all()); } @Override @@ -253,19 +237,4 @@ private PartialFunction handleMongoExecutionTimeExceededEx ) .build(); } - - /** - * Instruct MongoDB to fail index creation or not when a document's index entry exceeds size limit. - * - * @param shouldFail whether MongoDB should fail - * @return Akka stream source delivering a single element upon success. - */ - private Source failIndexKeyTooLong(final boolean shouldFail) { - return Source.fromPublisher(clientWrapper.getMongoClient() - .getDatabase("admin") - .runCommand(new BsonDocument() - .append("setParameter", new BsonInt32(1)) - .append("failIndexKeyTooLong", BsonBoolean.valueOf(shouldFail)))) - .map(notUsed -> Success.SUCCESS); - } } diff --git a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/indices/IndexInitializer.java b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/indices/IndexInitializer.java index ae5ef26174..1cb400e6e2 100644 --- a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/indices/IndexInitializer.java +++ b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/indices/IndexInitializer.java @@ -80,9 +80,13 @@ public CompletionStage initialize(final String collectionName, final List< LOGGER.info("Starting index-initialization with defined indices: {}", indices); return createNonExistingIndices(collectionName, indices) .thenCompose(done -> dropUndefinedIndices(collectionName, indices)) - .thenApply(unused -> { + .thenApply(unused -> { LOGGER.info("Index-Initialization was successful."); return null; + }) + .exceptionally(t -> { + LOGGER.error("Index-Initialization failed.", t); + return null; }); }