Skip to content

Commit

Permalink
Review: Use typed EntityId; remove unnecessary enumeration of collect…
Browse files Browse the repository at this point in the history
…ions.

Signed-off-by: Yufei Cai <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Nov 11, 2019
1 parent ebc3463 commit 3cef7b0
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;

import org.bson.Document;
import org.eclipse.ditto.model.base.entity.id.EntityId;
import org.eclipse.ditto.services.utils.persistence.mongo.ops.MongoOpsUtil;
import org.eclipse.ditto.services.utils.persistence.operations.EntityPersistenceOperations;

Expand Down Expand Up @@ -55,15 +56,15 @@ public static MongoEntitiesPersistenceOperations of(final MongoDatabase db,
}

@Override
public Source<List<Throwable>, NotUsed> purgeEntity(final CharSequence entityId) {
public Source<List<Throwable>, NotUsed> purgeEntity(final EntityId entityId) {
requireNonNull(entityId);

final Collection<MongoPersistenceOperationsSelection> selections = selectEntity(entityId.toString());
final Collection<MongoPersistenceOperationsSelection> selections = selectEntity(entityId);

return purgeAllSelections(selections);
}

private Collection<MongoPersistenceOperationsSelection> selectEntity(final String entityId) {
private Collection<MongoPersistenceOperationsSelection> selectEntity(final EntityId entityId) {
return selectionProvider.selectEntity(entityId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.bson.BsonRegularExpression;
import org.bson.BsonString;
import org.bson.Document;
import org.eclipse.ditto.model.base.entity.id.EntityId;

import akka.contrib.persistence.mongodb.JournallingFieldNames$;

Expand Down Expand Up @@ -61,13 +62,9 @@ public static MongoPersistenceOperationsSelectionProvider of(final MongoEventSou
* in the EventSource.
* @throws NullPointerException if {@code entityId} is {@code null}.
*/
public Collection<MongoPersistenceOperationsSelection> selectEntity(final String entityId) {
public Collection<MongoPersistenceOperationsSelection> selectEntity(final EntityId entityId) {
checkNotNull(entityId, "entity ID");

if (settings.isSupportsNamespaces()) {
validateAndExtractNamespace(entityId);
}

return Collections.unmodifiableList(Arrays.asList(
selectEntityByPid(settings.getMetadataCollectionName(), entityId),
selectEntityByPid(settings.getJournalCollectionName(), entityId),
Expand Down Expand Up @@ -105,20 +102,11 @@ private Document filterByPidPrefix(final CharSequence namespace) {
return new Document(PID, new BsonRegularExpression(pidRegex));
}

private static void validateAndExtractNamespace(final String entityId) {
final int separatorIndex = entityId.indexOf(':');
if (-1 == separatorIndex) {
throw new IllegalArgumentException(
MessageFormat.format("Entity ID <{0}> does not have namespace!", entityId));
}

}

private MongoPersistenceOperationsSelection selectEntityByPid(final String collection, final String entityId) {
private MongoPersistenceOperationsSelection selectEntityByPid(final String collection, final EntityId entityId) {
return MongoPersistenceOperationsSelection.of(collection, filterByPid(entityId));
}

private Document filterByPid(final String entityId) {
private Document filterByPid(final EntityId entityId) {
final String pid = String.format("%s%s", settings.getPersistenceIdPrefix(), entityId);
return new Document(PID, new BsonString(pid));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.annotation.Nullable;

import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
Expand All @@ -37,15 +33,13 @@
import org.eclipse.ditto.services.utils.persistence.mongo.config.DefaultMongoDbConfig;
import org.eclipse.ditto.services.utils.persistence.mongo.config.MongoDbConfig;
import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mongodb.QueryOperators;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Sorts;
import com.mongodb.reactivestreams.client.ListCollectionsPublisher;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import com.typesafe.config.Config;
Expand Down Expand Up @@ -79,9 +73,6 @@
public class MongoReadJournal {
// not a final class to test with Mockito

// pattern that matches nothing
private static final Pattern MATCH_NOTHING = Pattern.compile(".\\A");

private static final String AKKA_PERSISTENCE_JOURNAL_AUTO_START =
"akka.persistence.journal.auto-start-journals";
private static final String AKKA_PERSISTENCE_SNAPS_AUTO_START =
Expand Down Expand Up @@ -110,15 +101,15 @@ public class MongoReadJournal {
private static final String COLLECTION_NAME_FIELD = "name";
private static final Duration MAX_BACK_OFF_DURATION = Duration.ofSeconds(128L);

private final Pattern journalCollectionPrefix;
private final Pattern snapsCollectionPrefix;
private final String journalCollection;
private final String snapsCollection;
private final DittoMongoClient mongoClient;
private final Logger log;

private MongoReadJournal(final Pattern journalCollectionPrefix, final Pattern snapsCollectionPrefix,
private MongoReadJournal(final String journalCollection, final String snapsCollection,
final DittoMongoClient mongoClient) {
this.journalCollectionPrefix = journalCollectionPrefix;
this.snapsCollectionPrefix = snapsCollectionPrefix;
this.journalCollection = journalCollection;
this.snapsCollection = snapsCollection;
this.mongoClient = mongoClient;
log = LoggerFactory.getLogger(MongoReadJournal.class);
}
Expand Down Expand Up @@ -146,11 +137,11 @@ public static MongoReadJournal newInstance(final ActorSystem system) {
public static MongoReadJournal newInstance(final Config config, final DittoMongoClient mongoClient) {
final String autoStartJournalKey = extractAutoStartConfigKey(config, AKKA_PERSISTENCE_JOURNAL_AUTO_START);
final String autoStartSnapsKey = extractAutoStartConfigKey(config, AKKA_PERSISTENCE_SNAPS_AUTO_START);
final Pattern journalCollectionPrefix =
getOverrideCollectionNamePattern(config.getConfig(autoStartJournalKey), JOURNAL_COLLECTION_NAME_KEY);
final Pattern snapsCollectionPrefix =
getOverrideCollectionNamePattern(config.getConfig(autoStartSnapsKey), SNAPS_COLLECTION_NAME_KEY);
return new MongoReadJournal(journalCollectionPrefix, snapsCollectionPrefix, mongoClient);
final String journalCollection =
getOverrideCollectionName(config.getConfig(autoStartJournalKey), JOURNAL_COLLECTION_NAME_KEY);
final String snapshotCollection =
getOverrideCollectionName(config.getConfig(autoStartSnapsKey), SNAPS_COLLECTION_NAME_KEY);
return new MongoReadJournal(journalCollection, snapshotCollection, mongoClient);
}

/**
Expand All @@ -165,9 +156,9 @@ public Source<PidWithSeqNr, NotUsed> getPidWithSeqNrsByInterval(final Instant st
final MongoDatabase db = mongoClient.getDefaultDatabase();
final Document idFilter = createIdFilter(start, end);

log.debug("Looking for journal collection with pattern <{}>.", journalCollectionPrefix);
log.debug("Looking for journal <{}> and snapshot store <{}>.", journalCollection, snapsCollection);

return listJournalsAndSnapshotStores()
return getJournalAndSnapshotStore()
.flatMapConcat(journalAndSnaps -> listPidWithSeqNr(journalAndSnaps, db, idFilter));
}

Expand All @@ -186,7 +177,7 @@ public Source<String, NotUsed> getJournalPids(final int batchSize, final Duratio
final ActorMaterializer mat) {

final int maxRestarts = computeMaxRestarts(maxIdleTime);
return listJournals().withAttributes(Attributes.inputBuffer(1, 1))
return getJournal().withAttributes(Attributes.inputBuffer(1, 1))
.flatMapConcat(journal ->
listPidsInJournal(journal, "", batchSize, mat, MAX_BACK_OFF_DURATION, maxRestarts)
)
Expand All @@ -205,7 +196,7 @@ public Source<String, NotUsed> getJournalPids(final int batchSize, final Duratio
public Source<String, NotUsed> getJournalPidsAbove(final String lowerBoundPid, final int batchSize,
final Duration maxIdleTime, final ActorMaterializer mat) {

return listJournals().withAttributes(Attributes.inputBuffer(1, 1))
return getJournal().withAttributes(Attributes.inputBuffer(1, 1))
.flatMapConcat(journal ->
listPidsInJournal(journal, lowerBoundPid, batchSize, mat, Duration.ZERO, 0)
)
Expand Down Expand Up @@ -274,22 +265,15 @@ private int computeMaxRestarts(final Duration maxDuration) {

private Source<PidWithSeqNr, NotUsed> listPidWithSeqNr(final JournalAndSnaps journalAndSnaps,
final MongoDatabase database, final Document idFilter) {
final Source<PidWithSeqNr, NotUsed> journalPids;
final Source<PidWithSeqNr, NotUsed> snapsPids;

if (journalAndSnaps.journal == null) {
journalPids = Source.empty();
} else {
journalPids = find(database, journalAndSnaps.journal, idFilter, JOURNAL_PROJECT_DOCUMENT)
.map(doc -> new PidWithSeqNr(doc.getString(PROCESSOR_ID), doc.getLong(TO)));
}
final Source<PidWithSeqNr, NotUsed> journalPids =
find(database, journalAndSnaps.journal, idFilter, JOURNAL_PROJECT_DOCUMENT)
.map(doc -> new PidWithSeqNr(doc.getString(PROCESSOR_ID), doc.getLong(TO)));

if (journalAndSnaps.snaps == null) {
snapsPids = Source.empty();
} else {
snapsPids = find(database, journalAndSnaps.snaps, idFilter, SNAPS_PROJECT_DOCUMENT)
.map(doc -> new PidWithSeqNr(doc.getString(PROCESSOR_ID), doc.getLong(SN)));
}
final Source<PidWithSeqNr, ?> snapsPids =
Source.lazily(() -> find(database, journalAndSnaps.snaps, idFilter, SNAPS_PROJECT_DOCUMENT)
.map(doc -> new PidWithSeqNr(doc.getString(PROCESSOR_ID), doc.getLong(SN)))
);

return journalPids.concat(snapsPids);
}
Expand All @@ -302,31 +286,12 @@ private Source<Document, NotUsed> find(final MongoDatabase db, final String coll
);
}

private Source<JournalAndSnaps, NotUsed> listJournalsAndSnapshotStores() {
final MongoDatabase database = mongoClient.getDefaultDatabase();
return resolveCollectionNames(journalCollectionPrefix, snapsCollectionPrefix, database, log)
.map(this::toJournalAndSnaps);
private Source<JournalAndSnaps, NotUsed> getJournalAndSnapshotStore() {
return Source.single(new JournalAndSnaps(journalCollection, snapsCollection));
}

private Source<MongoCollection<Document>, NotUsed> listJournals() {
final MongoDatabase database = mongoClient.getDefaultDatabase();
return resolveCollectionNames(journalCollectionPrefix, MATCH_NOTHING, database, log)
.map(database::getCollection);
}

private JournalAndSnaps toJournalAndSnaps(final String collectionName) {
final Matcher matcher1 = journalCollectionPrefix.matcher(collectionName);
if (matcher1.matches()) {
return new JournalAndSnaps(collectionName, null);
} else {
final Matcher matcher2 = snapsCollectionPrefix.matcher(collectionName);
if (matcher2.matches()) {
return new JournalAndSnaps(null, collectionName);
} else {
throw new IllegalArgumentException(String.format(
"Collection is neither journal nor snapshot-store: <%s>", collectionName));
}
}
private Source<MongoCollection<Document>, NotUsed> getJournal() {
return Source.single(mongoClient.getDefaultDatabase().getCollection(journalCollection));
}

private Document createIdFilter(final Instant start, final Instant end) {
Expand Down Expand Up @@ -390,7 +355,7 @@ private static String extractAutoStartConfigKey(final Config config, final Strin
}

/**
* Resolve event journal collection prefix (e.g. "things_journal") from the auto-start journal configuration.
* Resolve event journal collection name (e.g. "things_journal") from the auto-start journal configuration.
* <p>
* It assumes that in the auto-start journal configuration,
* {@code overrides.journal-collection} is defined and equal to the name of the event journal
Expand All @@ -403,55 +368,17 @@ private static String extractAutoStartConfigKey(final Config config, final Strin
* @throws com.typesafe.config.ConfigException.Missing if a relevant config value is missing.
* @throws com.typesafe.config.ConfigException.WrongType if a relevant config value has not the expected type.
*/
private static Pattern getOverrideCollectionNamePattern(final Config journalOrSnapsConfig, final String key) {
final String collectionPrefix = journalOrSnapsConfig.getString(key);
return Pattern.compile("^" + collectionPrefix);
}

/**
* Resolves all journal and snapshot-store collection names matching the passed prefixes.
*
* @param journalCollectionPrefix the prefix of the journal collections to resolve.
* @param snapsCollectionPrefix the prefix of the journal collections to resolve.
* @param database the MongoDB database to use for resolving collection names.
* @return a source of resolved journal collection names which matched the prefix.
*/
private static Source<String, NotUsed> resolveCollectionNames(final Pattern journalCollectionPrefix,
final Pattern snapsCollectionPrefix, final MongoDatabase database, final Logger log) {

// starts with "journalCollectionPrefix":
final ListCollectionsPublisher<Document> documentListCollectionsPublisher = database.listCollections();
final Bson filter = Filters.or(Filters.regex(COLLECTION_NAME_FIELD, journalCollectionPrefix),
Filters.regex(COLLECTION_NAME_FIELD, snapsCollectionPrefix));
final Publisher<Document> publisher = documentListCollectionsPublisher.filter(filter);
return Source.fromPublisher(publisher)
.map(document -> document.getString(COLLECTION_NAME_FIELD))
// Double check in case the Mongo API persistence layer in use does not support listCollections with filtering
.filter(collectionName -> journalCollectionPrefix.matcher(collectionName).matches() ||
snapsCollectionPrefix.matcher(collectionName).matches())
.map(collectionName -> {
log.debug("Collection <{}> with patterns <{}> or <{}> found.", collectionName,
journalCollectionPrefix, snapsCollectionPrefix);
return collectionName;
})
// Each "get current PIDs" query collects all collection names in memory in order to list them in
// a fixed order.
.<SortedSet<String>>fold(new TreeSet<>(), (collectionNames, collectionName) -> {
collectionNames.add(collectionName);
return collectionNames;
})
.mapConcat(collectionNames -> collectionNames);
private static String getOverrideCollectionName(final Config journalOrSnapsConfig, final String key) {
return journalOrSnapsConfig.getString(key);
}

private static final class JournalAndSnaps {

@Nullable
private final String journal;

@Nullable
private final String snaps;

private JournalAndSnaps(@Nullable final String journal, @Nullable final String snaps) {
private JournalAndSnaps(final String journal, final String snaps) {
this.journal = journal;
this.snaps = snaps;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface EntityPersistenceOperations {
* @param entityId the ID of the entity to delete.
* @return source of any errors during the purge.
*/
Source<List<Throwable>, NotUsed> purgeEntity(CharSequence entityId);
Source<List<Throwable>, NotUsed> purgeEntity(EntityId entityId);

/**
* Purge all entities contained in the given {@code entityIds}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import org.bson.BsonString;
import org.bson.Document;
import org.eclipse.ditto.model.base.entity.id.DefaultEntityId;
import org.eclipse.ditto.model.base.entity.id.DefaultNamespacedEntityId;
import org.eclipse.ditto.model.base.entity.id.NamespacedEntityIdInvalidException;
import org.junit.Test;

/**
Expand Down Expand Up @@ -61,8 +64,8 @@ public void selectEntityWithoutNamespaceWhenNamespacesEnabledFails() {
final MongoPersistenceOperationsSelectionProvider underTest =
MongoPersistenceOperationsSelectionProvider.of(settings);

assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> underTest.selectEntity(ENTITY_NAME));
assertThatExceptionOfType(NamespacedEntityIdInvalidException.class)
.isThrownBy(() -> underTest.selectEntity(DefaultNamespacedEntityId.of(ENTITY_NAME)));
}

@Test
Expand All @@ -72,10 +75,11 @@ public void selectEntityWhenNamespacesDisabled() {
final MongoPersistenceOperationsSelectionProvider underTest =
MongoPersistenceOperationsSelectionProvider.of(settings);

final Collection<MongoPersistenceOperationsSelection> selections = underTest.selectEntity(ENTITY_NAME);
final Collection<MongoPersistenceOperationsSelection> selections =
underTest.selectEntity(DefaultEntityId.of(ENTITY_NAME));

final String pid = PERSISTENCE_ID_PREFIX + ENTITY_NAME;
final Document pidFilter = new Document(KEY_PID, new BsonString(pid));
final Document pidFilter = new Document().append(KEY_PID, new BsonString(pid));
final MongoPersistenceOperationsSelection expectedMetadataSelection =
MongoPersistenceOperationsSelection.of(METADATA_COLLECTION_NAME, pidFilter);
final MongoPersistenceOperationsSelection expectedJournalSelection =
Expand Down

0 comments on commit 3cef7b0

Please sign in to comment.