Skip to content

Commit

Permalink
fixed that for commands requesting "search-persisted" the wrong write…
Browse files Browse the repository at this point in the history
… concern was used

* collections were mixed up
* also fixed some logging in connectivity which missed the correlation-id

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed May 12, 2021
1 parent 8e275e8 commit 2e3a4fb
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 29 deletions.
Expand Up @@ -27,6 +27,11 @@
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.api.ExternalMessageBuilder;
import org.eclipse.ditto.connectivity.api.ExternalMessageFactory;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectionType;
Expand All @@ -39,17 +44,12 @@
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.DefaultConnectionMonitorRegistry;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.api.ExternalMessageBuilder;
import org.eclipse.ditto.connectivity.api.ExternalMessageFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.internal.utils.tracing.TracingTags;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;

import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
Expand Down Expand Up @@ -127,7 +127,8 @@ protected final void forwardToMappingActor(final ExternalMessage message, final
// empty failed responses indicate that SetCount was missing
final boolean shouldRedeliver = failedResponses.isEmpty() ||
someFailedResponseRequiresRedelivery(failedResponses);
log().debug("Rejecting [redeliver={}] due to failed responses <{}>",
log().withCorrelationId(failedResponses.get(0))
.debug("Rejecting [redeliver={}] due to failed responses <{}>",
shouldRedeliver, failedResponses);
timer.tag(TracingTags.ACK_SUCCESS, false)
.tag(TracingTags.ACK_REDELIVER, shouldRedeliver)
Expand Down
Expand Up @@ -40,6 +40,13 @@
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.common.CharsetDeterminer;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.api.OutboundSignal;
import org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
Expand All @@ -62,21 +69,14 @@
import org.eclipse.ditto.connectivity.service.messaging.monitoring.DefaultConnectionMonitorRegistry;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger;
import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.api.OutboundSignal;
import org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders;
import org.eclipse.ditto.internal.models.placeholders.ExpressionResolver;
import org.eclipse.ditto.internal.models.placeholders.PlaceholderFactory;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.things.model.signals.commands.ThingCommand;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionEvent;

Expand Down Expand Up @@ -408,9 +408,10 @@ private SendingOrDropped publishToGenericTarget(final ExpressionResolver resolve
if (publishTargetOptional.isPresent()) {
final T publishTarget = publishTargetOptional.get();
final Signal<?> outboundSource = outbound.getSource();
logger.info("Publishing mapped message of type <{}> to PublishTarget <{}>", outboundSource.getType(),
final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(outboundSource);
l.info("Publishing mapped message of type <{}> to PublishTarget <{}>", outboundSource.getType(),
publishTarget);
logger.debug("Publishing mapped message of type <{}> to PublishTarget <{}>: {}", outboundSource.getType(),
l.debug("Publishing mapped message of type <{}> to PublishTarget <{}>: {}", outboundSource.getType(),
publishTarget, sendingContext.getExternalMessage());
@Nullable final Target autoAckTarget = sendingContext.getAutoAckTarget().orElse(null);
final HeaderMapping headerMapping = genericTarget.getHeaderMapping();
Expand All @@ -424,7 +425,7 @@ private SendingOrDropped publishToGenericTarget(final ExpressionResolver resolve
);
// set the external message after header mapping for the result of header mapping to show up in log
result = new Sending(sendingContext.setExternalMessage(mappedMessage), responsesFuture,
connectionIdResolver, logger);
connectionIdResolver, l);
} else {
result = new Dropped(sendingContext, "Signal dropped, target address unresolved: {0}");
}
Expand Down
Expand Up @@ -75,7 +75,8 @@ private MongoSearchUpdaterFlow(final MongoCollection<Document> collection,
*/
public static MongoSearchUpdaterFlow of(final MongoDatabase database,
final PersistenceStreamConfig persistenceConfig) {
return new MongoSearchUpdaterFlow(database.getCollection(PersistenceConstants.THINGS_COLLECTION_NAME), persistenceConfig);
return new MongoSearchUpdaterFlow(database.getCollection(PersistenceConstants.THINGS_COLLECTION_NAME),
persistenceConfig);
}


Expand Down Expand Up @@ -117,9 +118,9 @@ private Source<WriteResultAndErrors, NotUsed> executeBulkWrite(final boolean sho

final MongoCollection<Document> theCollection;
if (shouldAcknowledge) {
theCollection = this.collection;
} else {
theCollection = this.collectionWithAcknowledgements;
} else {
theCollection = this.collection;
}

return Source.fromPublisher(theCollection.bulkWrite(writeModels, new BulkWriteOptions().ordered(false)))
Expand Down
Expand Up @@ -17,15 +17,14 @@
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.thingsearch.service.common.config.PersistenceStreamConfig;
import org.eclipse.ditto.thingsearch.service.common.config.StreamCacheConfig;
import org.eclipse.ditto.thingsearch.service.common.config.StreamConfig;
import org.eclipse.ditto.thingsearch.service.common.config.StreamStageConfig;
import org.eclipse.ditto.thingsearch.service.common.config.UpdaterConfig;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;

import com.mongodb.reactivestreams.client.MongoDatabase;

Expand All @@ -37,6 +36,7 @@
import akka.stream.Attributes;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches;
import akka.stream.RestartSettings;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.RestartSink;
Expand Down Expand Up @@ -139,10 +139,10 @@ private Source<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSource
.map(writeModelSource -> writeModelSource.via(
blockNamespaceFlow(SearchUpdaterStream::namespaceOfWriteModel))));

final ExponentialBackOffConfig backOffConfig = retrievalConfig.getExponentialBackOffConfig();

return RestartSource.withBackoff(backOffConfig.getMin(), backOffConfig.getMax(),
backOffConfig.getRandomFactor(), () -> source);
final var backOffConfig = retrievalConfig.getExponentialBackOffConfig();
return RestartSource.withBackoff(
RestartSettings.create(backOffConfig.getMax(), backOffConfig.getMax(), backOffConfig.getRandomFactor()),
() -> source);
}

private Sink<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSink(
Expand All @@ -163,9 +163,9 @@ private Sink<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSink(
Attributes.logLevelError()))
.to(Sink.<String>ignore());

final ExponentialBackOffConfig backOffConfig = persistenceConfig.getExponentialBackOffConfig();

return RestartSink.withBackoff(backOffConfig.getMax(), backOffConfig.getMax(), backOffConfig.getRandomFactor(),
final var backOffConfig = persistenceConfig.getExponentialBackOffConfig();
return RestartSink.withBackoff(
RestartSettings.create(backOffConfig.getMax(), backOffConfig.getMax(), backOffConfig.getRandomFactor()),
() -> sink);
}

Expand Down

0 comments on commit 2e3a4fb

Please sign in to comment.