Skip to content

Commit

Permalink
Fix exists(features/*); simplify logging of DittoRuntimeException dur…
Browse files Browse the repository at this point in the history
…ing search update; change role name of SearchService; terminate timers when ThingUpdater shuts down on command.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Apr 13, 2022
1 parent 10398da commit 5d08fd4
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 15 deletions.
Expand Up @@ -13,10 +13,10 @@
package org.eclipse.ditto.thingsearch.service.persistence.read.expression.visitors;

import static org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants.DESIRED_PROPERTIES;
import static org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants.DOT;
import static org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants.FIELD_ATTRIBUTES_PATH;
import static org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants.FIELD_DESIRED_PROPERTIES;
import static org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants.FIELD_FEATURES_PATH;
import static org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants.FIELD_FEATURE_ID;
import static org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants.FIELD_F_ARRAY;
import static org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants.FIELD_PROPERTIES;
import static org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants.FIELD_THING;
Expand All @@ -27,8 +27,8 @@

import javax.annotation.Nullable;

import org.bson.Document;
import org.bson.conversions.Bson;
import org.eclipse.ditto.json.JsonKey;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.rql.query.expression.ExistsFieldExpression;
import org.eclipse.ditto.rql.query.expression.visitors.ExistsFieldExpressionVisitor;
Expand Down Expand Up @@ -73,7 +73,7 @@ public Bson visitAttribute(final String key) {
public Bson visitFeature(final String featureId) {
if (FEATURE_ID_WILDCARD.equals(featureId)) {
// any feature exists
return Filters.gt(FIELD_F_ARRAY, 0);
return Filters.exists(toDottedPath(FIELD_F_ARRAY, List.of(JsonKey.of(FIELD_FEATURE_ID))));
} else {
return matchKey(FIELD_FEATURES_PATH + featureId);
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.Optional;

import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
Expand Down Expand Up @@ -72,8 +73,7 @@ public static Pair<Status, List<String>> checkBulkWriteResult(final WriteResultA
switch (consistencyError.status()) {
case CONSISTENCY_ERROR:
// write result is not consistent; there is a bug with Ditto or with its environment
acknowledgeFailures(getAllMetadata(writeResultAndErrors)
);
acknowledgeFailures(getAllMetadata(writeResultAndErrors));

return Pair.create(consistencyError.status(), List.of(consistencyError.message));
case INCORRECT_PATCH:
Expand Down Expand Up @@ -205,12 +205,16 @@ private static String logResult(final String status, final WriteResultAndErrors
final Optional<Throwable> unexpectedError = writeResultAndErrors.getUnexpectedError();
if (unexpectedError.isPresent()) {
final Throwable error = unexpectedError.get();
final StringWriter stackTraceWriter = new StringWriter();
stackTraceWriter.append(String.format("%s: UnexpectedError[stacktrace=", status));
error.printStackTrace(new PrintWriter(stackTraceWriter));
return stackTraceWriter.append("] - correlation: ")
.append(writeResultAndErrors.getBulkWriteCorrelationId())
.toString();
if (error instanceof DittoRuntimeException dittoRuntimeException) {
return dittoRuntimeException.toJsonString();
} else {
final StringWriter stackTraceWriter = new StringWriter();
stackTraceWriter.append(String.format("%s: UnexpectedError[stacktrace=", status));
error.printStackTrace(new PrintWriter(stackTraceWriter));
return stackTraceWriter.append("] - correlation: ")
.append(writeResultAndErrors.getBulkWriteCorrelationId())
.toString();
}
} else if (containsNoErrors) {
final BulkWriteResult bulkWriteResult = writeResultAndErrors.getBulkWriteResult();

Expand Down
Expand Up @@ -31,7 +31,7 @@ public class SearchService extends DittoService<SearchConfig> {
/**
* Name of things-search service.
*/
public static final String SERVICE_NAME = "things-search";
public static final String SERVICE_NAME = "search";

private static final Logger LOGGER = LoggerFactory.getLogger(SearchService.class);

Expand Down
Expand Up @@ -26,6 +26,7 @@
import javax.annotation.Nullable;

import org.bson.BsonDocument;
import org.eclipse.ditto.base.api.common.ShutdownReasonType;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.service.actors.ShutdownBehaviour;
Expand All @@ -39,6 +40,7 @@
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.policies.api.PolicyReferenceTag;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.events.ThingDeleted;
Expand Down Expand Up @@ -250,8 +252,10 @@ private FSM.State<State, Data> subscribeAck(final DistributedPubSubMediator.Subs
return stay();
}

private FSM.State<State, Data> shutdownNow(final Object trigger, final Data data) {
log.info("Shutting down now due to <{}> during <{}>", trigger, stateName());
private FSM.State<State, Data> shutdownNow(final org.eclipse.ditto.base.api.common.Shutdown shutdown,
final Data data) {
log.info("Shutting down now due to <{}> during <{}>", shutdown, stateName());
data.metadata().sendWeakAck(getDescription(shutdown));
return stop();
}

Expand Down Expand Up @@ -537,4 +541,16 @@ private static Data getInitialData(final ThingId thingId) {
final var deletedMetadata = Metadata.ofDeleted(thingId);
return new Data(deletedMetadata, ThingDeleteModel.of(deletedMetadata));
}

private static JsonValue getDescription(final org.eclipse.ditto.base.api.common.Shutdown shutdown) {
final var type = shutdown.getReason().getType();
if (type instanceof ShutdownReasonType.Known knownType) {
return JsonValue.of(switch (knownType) {
case PURGE_NAMESPACE -> "The namespace is being purged.";
case PURGE_ENTITIES -> "The entities are being purged.";
});
} else {
return JsonValue.of(type.toString());
}
}
}
2 changes: 1 addition & 1 deletion thingsearch/service/src/main/resources/things-search.conf
@@ -1,5 +1,5 @@
ditto {
service-name = "things-search"
service-name = "search"
mapping-strategy.implementation = "org.eclipse.ditto.thingsearch.api.ThingSearchMappingStrategies"

persistence.operations.delay-after-persistence-actor-shutdown = 5s
Expand Down
Expand Up @@ -146,6 +146,13 @@ public void nullAndEmptyValuesExist() {
assertThat(results).isEqualTo(expected);
}

@Test
public void existsByAnyFeature() {
final Criteria crit = cf.existsCriteria(ef.existsByFeatureId("*"));
final Collection<ThingId> result = findForCriteria(crit);
assertThat(result).isNotEmpty();
}

private void insertThings() {
final Attributes attributes1 = createAttributes(THING1_KNOWN_ATTR, THING1_KNOWN_ATTR_VALUE).toBuilder()
.setAll(createEmptyAttributes())
Expand Down

0 comments on commit 5d08fd4

Please sign in to comment.