Skip to content

Commit

Permalink
Merge pull request #1057 from bosch-io/bugfix/policy-errors-in-ws
Browse files Browse the repository at this point in the history
fixed that error responses in WS contained wrong topic path
  • Loading branch information
thjaeckle committed May 10, 2021
2 parents ea5f00d + c5ab4bc commit 740abd2
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 71 deletions.
Expand Up @@ -30,21 +30,6 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import org.eclipse.ditto.gateway.service.security.HttpHeader;
import org.eclipse.ditto.gateway.service.streaming.Connect;
import org.eclipse.ditto.gateway.service.streaming.IncomingSignal;
import org.eclipse.ditto.gateway.service.streaming.StreamControlMessage;
import org.eclipse.ditto.gateway.service.streaming.StreamingAck;
import org.eclipse.ditto.gateway.service.streaming.actors.SessionedJsonifiable;
import org.eclipse.ditto.gateway.service.streaming.actors.StreamingActor;
import org.eclipse.ditto.gateway.service.streaming.actors.SupervisedStream;
import org.eclipse.ditto.gateway.service.util.config.streaming.StreamingConfig;
import org.eclipse.ditto.gateway.service.util.config.streaming.WebsocketConfig;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.entity.id.EntityId;
Expand All @@ -58,20 +43,24 @@
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.json.Jsonifiable;
import org.eclipse.ditto.jwt.model.ImmutableJsonWebToken;
import org.eclipse.ditto.jwt.model.JsonWebToken;
import org.eclipse.ditto.policies.model.PolicyException;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.thingsearch.model.ThingSearchException;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.HeaderTranslator;
import org.eclipse.ditto.protocol.JsonifiableAdaptable;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.protocol.ProtocolFactory;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayInternalErrorException;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayWebsocketSessionClosedException;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayWebsocketSessionExpiredException;
import org.eclipse.ditto.gateway.service.endpoints.routes.AbstractRoute;
import org.eclipse.ditto.gateway.service.endpoints.utils.EventSniffer;
import org.eclipse.ditto.gateway.service.endpoints.utils.GatewaySignalEnrichmentProvider;
import org.eclipse.ditto.gateway.service.security.HttpHeader;
import org.eclipse.ditto.gateway.service.streaming.Connect;
import org.eclipse.ditto.gateway.service.streaming.IncomingSignal;
import org.eclipse.ditto.gateway.service.streaming.StreamControlMessage;
import org.eclipse.ditto.gateway.service.streaming.StreamingAck;
import org.eclipse.ditto.gateway.service.streaming.actors.SessionedJsonifiable;
import org.eclipse.ditto.gateway.service.streaming.actors.StreamingActor;
import org.eclipse.ditto.gateway.service.streaming.actors.SupervisedStream;
import org.eclipse.ditto.gateway.service.util.config.streaming.StreamingConfig;
import org.eclipse.ditto.gateway.service.util.config.streaming.WebsocketConfig;
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.internal.utils.akka.controlflow.Filter;
import org.eclipse.ditto.internal.utils.akka.controlflow.LimitRateByRejection;
Expand All @@ -80,13 +69,23 @@
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayInternalErrorException;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayWebsocketSessionClosedException;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayWebsocketSessionExpiredException;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.jwt.model.ImmutableJsonWebToken;
import org.eclipse.ditto.jwt.model.JsonWebToken;
import org.eclipse.ditto.policies.model.PolicyException;
import org.eclipse.ditto.policies.model.signals.commands.PolicyErrorResponse;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.HeaderTranslator;
import org.eclipse.ditto.protocol.JsonifiableAdaptable;
import org.eclipse.ditto.protocol.ProtocolFactory;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.ThingErrorResponse;
import org.eclipse.ditto.thingsearch.model.ThingSearchException;
import org.eclipse.ditto.thingsearch.model.signals.commands.SearchErrorResponse;
import org.slf4j.Logger;

Expand Down Expand Up @@ -794,12 +793,7 @@ private static ThingErrorResponse buildThingErrorResponse(final DittoRuntimeExce
}

private static PolicyErrorResponse buildPolicyErrorResponse(final DittoRuntimeException dittoRuntimeException) {
final DittoHeaders dittoHeaders = dittoRuntimeException.getDittoHeaders();
final String nullableEntityId = dittoHeaders.get(DittoHeaderDefinition.ENTITY_ID.getKey());
return Optional.ofNullable(nullableEntityId)
.map(entityId -> entityId.substring(entityId.indexOf(':')))
.map(entityId -> PolicyErrorResponse.of(PolicyId.of(entityId), dittoRuntimeException, dittoHeaders))
.orElseGet(() -> PolicyErrorResponse.of(dittoRuntimeException, dittoHeaders));
return PolicyErrorResponse.of(dittoRuntimeException);
}

private static SearchErrorResponse buildSearchErrorResponse(final DittoRuntimeException dittoRuntimeException) {
Expand Down
Expand Up @@ -15,25 +15,27 @@
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.JsonParsableCommandResponse;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.base.model.signals.GlobalErrorRegistry;
import org.eclipse.ditto.base.model.signals.commands.AbstractErrorResponse;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.policies.model.PolicyId;

/**
* Response to a {@link PolicyCommand} which wraps the exception thrown while processing the command.
Expand Down Expand Up @@ -71,7 +73,13 @@ private PolicyErrorResponse(final PolicyId policyId, final DittoRuntimeException
* @throws NullPointerException if one of the arguments is {@code null}.
*/
public static PolicyErrorResponse of(final DittoRuntimeException dittoRuntimeException) {
return of(FALLBACK_POLICY_ID, dittoRuntimeException, dittoRuntimeException.getDittoHeaders());
final DittoHeaders dittoHeaders = dittoRuntimeException.getDittoHeaders();
final String nullableEntityId = dittoHeaders.get(DittoHeaderDefinition.ENTITY_ID.getKey());
final PolicyId policyId = Optional.ofNullable(nullableEntityId)
.map(entityId -> entityId.substring(entityId.indexOf(":") + 1)) // starts with "policy:" - cut that off!
.map(PolicyId::of)
.orElse(FALLBACK_POLICY_ID);
return of(policyId, dittoRuntimeException, dittoHeaders);
}

/**
Expand Down
Expand Up @@ -198,7 +198,7 @@ public int hashCode() {
public String toString() {
return getClass().getSimpleName() + " ["
+ "namespace=" + namespace +
", id=" + name +
", name=" + name +
", group=" + group +
", channel=" + channel +
", criterion=" + criterion +
Expand Down
Expand Up @@ -21,10 +21,10 @@
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.JsonifiableMapper;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingIdNotExplicitlySettableException;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing;
import org.eclipse.ditto.things.model.signals.commands.modify.DeleteAttribute;
Expand Down Expand Up @@ -175,7 +175,9 @@ private static Thing thingToCreateOrModifyFrom(final Adaptable adaptable) {

if (thingIdOptional.isPresent()) {
if (!thingIdOptional.get().equals(thingIdFromTopic)) {
throw ThingIdNotExplicitlySettableException.forDittoProtocol().build();
throw ThingIdNotExplicitlySettableException.forDittoProtocol()
.dittoHeaders(adaptable.getDittoHeaders())
.build();
}
} else {
return thing.toBuilder()
Expand Down
Expand Up @@ -12,20 +12,20 @@
*/
package org.eclipse.ditto.protocol.adapter.policies;

import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.base.model.signals.ErrorRegistry;
import org.eclipse.ditto.base.model.signals.GlobalErrorRegistry;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.policies.model.PolicyIdInvalidException;
import org.eclipse.ditto.policies.model.signals.commands.PolicyErrorResponse;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter;
import org.eclipse.ditto.protocol.Payload;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapterTest;
import org.eclipse.ditto.protocol.TestConstants;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.base.model.signals.ErrorRegistry;
import org.eclipse.ditto.base.model.signals.GlobalErrorRegistry;
import org.eclipse.ditto.policies.model.signals.commands.PolicyErrorResponse;
import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapterTest;
import org.junit.Before;
import org.junit.Test;

Expand Down
Expand Up @@ -21,21 +21,21 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.JsonParsableCommandResponse;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.base.model.signals.GlobalErrorRegistry;
import org.eclipse.ditto.base.model.signals.commands.AbstractErrorResponse;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.things.model.ThingId;

/**
* Response to a {@link ThingCommand} which wraps the exception thrown while processing the command.
Expand Down Expand Up @@ -75,7 +75,7 @@ public static ThingErrorResponse of(final DittoRuntimeException dittoRuntimeExce
final DittoHeaders dittoHeaders = dittoRuntimeException.getDittoHeaders();
final String nullableEntityId = dittoHeaders.get(DittoHeaderDefinition.ENTITY_ID.getKey());
final ThingId thingId = Optional.ofNullable(nullableEntityId)
.map(entityId -> entityId.substring(entityId.indexOf(":")))
.map(entityId -> entityId.substring(entityId.indexOf(":") + 1)) // starts with "thing:" - cut that off!
.map(ThingId::of)
.orElse(FALLBACK_THING_ID);
return of(thingId, dittoRuntimeException, dittoHeaders);
Expand Down
Expand Up @@ -168,7 +168,6 @@ public Flow<Map<ThingId, Metadata>, Source<AbstractWriteModel, NotUsed>, NotUsed
computeWriteModel(metadataRef, responseMap.get(metadataRef.getThingId()))
.async(MongoSearchUpdaterFlow.DISPATCHER_NAME, parallelism)
)
.withAttributes(Attributes.inputBuffer(parallelism, parallelism))
);
})
.withAttributes(Attributes.inputBuffer(1, 1));
Expand All @@ -181,7 +180,6 @@ private Source<Map<ThingId, SudoRetrieveThingResponse>, NotUsed> sudoRetrieveThi
return Source.fromIterator(changeMap.entrySet()::iterator)
.flatMapMerge(parallelism, entry -> sudoRetrieveThing(entry)
.async(MongoSearchUpdaterFlow.DISPATCHER_NAME, parallelism))
.withAttributes(Attributes.inputBuffer(parallelism, parallelism))
.<Map<ThingId, SudoRetrieveThingResponse>>fold(new HashMap<>(), (map, response) -> {
map.put(getThingId(response), response);
return map;
Expand Down
Expand Up @@ -32,7 +32,6 @@
import akka.NotUsed;
import akka.japi.Pair;
import akka.japi.pf.PFBuilder;
import akka.stream.Attributes;
import akka.stream.FanInShape2;
import akka.stream.FlowShape;
import akka.stream.Graph;
Expand Down Expand Up @@ -102,9 +101,7 @@ public Flow<Source<AbstractWriteModel, NotUsed>, WriteResultAndErrors, NotUsed>
final Flow<List<AbstractWriteModel>, WriteResultAndErrors, NotUsed> writeFlow =
Flow.<List<AbstractWriteModel>>create()
.flatMapMerge(parallelism, writeModels ->
executeBulkWrite(shouldAcknowledge, writeModels).async(DISPATCHER_NAME, parallelism))
// never initiate more than "parallelism" writes against the persistence
.withAttributes(Attributes.inputBuffer(parallelism, parallelism));
executeBulkWrite(shouldAcknowledge, writeModels).async(DISPATCHER_NAME, parallelism));

return Flow.fromGraph(assembleFlows(batchFlow, writeFlow, createStartTimerFlow(), createStopTimerFlow()));
}
Expand Down
4 changes: 2 additions & 2 deletions thingsearch/service/src/main/resources/things-search.conf
Expand Up @@ -120,7 +120,7 @@ ditto {
// retrieval of things and policy-enforcers
retrieval {
// upper bound of parallel SudoRetrieveThing commands (by extension, parallel loads of policy enforcer cache)
parallelism = 25
parallelism = 16
parallelism = ${?THINGS_SEARCH_UPDATER_STREAM_RETRIEVAL_PARALLELISM}

// back-offs in case of failure
Expand All @@ -134,7 +134,7 @@ ditto {
// writing into the persistence
persistence {
// how many bulk writes to request in parallel; must be a power of 2
parallelism = 1
parallelism = 2
parallelism = ${?THINGS_SEARCH_UPDATER_STREAM_PERSISTENCE_PARALLELISM}

// how many write operations to perform in one bulk
Expand Down
4 changes: 2 additions & 2 deletions thingsearch/service/src/test/resources/actors-test.conf
Expand Up @@ -119,7 +119,7 @@ ditto {
// retrieval of things and policy-enforcers
retrieval {
// upper bound of parallel SudoRetrieveThing commands (by extension, parallel loads of policy enforcer cache)
parallelism = 25
parallelism = 16
parallelism = ${?THINGS_SEARCH_UPDATER_STREAM_RETRIEVAL_PARALLELISM}

// back-offs in case of failure
Expand All @@ -133,7 +133,7 @@ ditto {
// writing into the persistence
persistence {
// how many bulk writes to request in parallel; must be a power of 2
parallelism = 1
parallelism = 2
parallelism = ${?THINGS_SEARCH_UPDATER_STREAM_PERSISTENCE_PARALLELISM}

// how many write operations to perform in one bulk
Expand Down

0 comments on commit 740abd2

Please sign in to comment.