Skip to content

Commit

Permalink
replace DittoHeadersSizeChecker with extension point DittoHeadersVali…
Browse files Browse the repository at this point in the history
…dator

Signed-off-by: Johannes Schneider <johannes.schneider@bosch.io>
  • Loading branch information
jokraehe committed Aug 24, 2022
1 parent dc8160d commit 25689b3
Show file tree
Hide file tree
Showing 24 changed files with 301 additions and 286 deletions.
1 change: 1 addition & 0 deletions base/model/pom.xml
Expand Up @@ -134,6 +134,7 @@
<!--<exclude></exclude>-->
<exclude>org.eclipse.ditto.base.model.signals.commands.exceptions</exclude>
<exclude>org.eclipse.ditto.base.model.headers.DittoHeaderDefinition#POLICY_ENFORCER_INVALIDATED_PREEMPTIVELY</exclude>
<exclude>org.eclipse.ditto.base.model.headers.DittoHeadersSizeChecker</exclude>
</excludes>
</parameter>
</configuration>
Expand Down

This file was deleted.

This file was deleted.

Expand Up @@ -32,18 +32,13 @@ public final class DefaultLimitsConfig implements LimitsConfig, WithConfigPath {
* Path of the limits configuration settings.
*/
public static final String CONFIG_PATH = "limits";

private final long headersMaxSize;
private final int authSubjectsMaxSize;
private final long thingsMaxSize;
private final long policiesMaxSize;
private final long messagesMaxSize;
private final int thingsSearchDefaultPageSize;
private final int thingsSearchMaxPageSize;

private DefaultLimitsConfig(final ConfigWithFallback config) {
headersMaxSize = config.getNonNegativeBytesOrThrow(LimitsConfigValue.HEADERS_MAX_SIZE);
authSubjectsMaxSize = config.getPositiveIntOrThrow(LimitsConfigValue.AUTH_SUBJECTS_MAX_SIZE);
thingsMaxSize = config.getNonNegativeBytesOrThrow(LimitsConfigValue.THINGS_MAX_SIZE);
policiesMaxSize = config.getNonNegativeBytesOrThrow(LimitsConfigValue.POLICIES_MAX_SIZE);
messagesMaxSize = config.getNonNegativeBytesOrThrow(LimitsConfigValue.MESSAGES_MAX_SIZE);
Expand All @@ -62,16 +57,6 @@ public static DefaultLimitsConfig of(final Config config) {
return new DefaultLimitsConfig(ConfigWithFallback.newInstance(config, CONFIG_PATH, LimitsConfigValue.values()));
}

@Override
public long getHeadersMaxSize() {
return headersMaxSize;
}

@Override
public int getAuthSubjectsMaxCount() {
return authSubjectsMaxSize;
}

@Override
public long getThingsMaxSize() {
return thingsMaxSize;
Expand Down Expand Up @@ -114,9 +99,7 @@ public boolean equals(@Nullable final Object o) {
return false;
}
final DefaultLimitsConfig that = (DefaultLimitsConfig) o;
return headersMaxSize == that.headersMaxSize &&
authSubjectsMaxSize == that.authSubjectsMaxSize &&
thingsMaxSize == that.thingsMaxSize &&
return thingsMaxSize == that.thingsMaxSize &&
policiesMaxSize == that.policiesMaxSize &&
messagesMaxSize == that.messagesMaxSize &&
thingsSearchDefaultPageSize == that.thingsSearchDefaultPageSize &&
Expand All @@ -125,15 +108,13 @@ public boolean equals(@Nullable final Object o) {

@Override
public int hashCode() {
return Objects.hash(headersMaxSize, authSubjectsMaxSize, thingsMaxSize, policiesMaxSize, messagesMaxSize,
thingsSearchDefaultPageSize, thingsSearchMaxPageSize);
return Objects.hash(thingsMaxSize, policiesMaxSize, messagesMaxSize, thingsSearchDefaultPageSize,
thingsSearchMaxPageSize);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"headersMaxSize=" + headersMaxSize +
", authSubjectsMaxSize=" + authSubjectsMaxSize +
", thingsMaxSize=" + thingsMaxSize +
", policiesMaxSize=" + policiesMaxSize +
", messagesMaxSize=" + messagesMaxSize +
Expand Down
Expand Up @@ -22,20 +22,6 @@
@Immutable
public interface LimitsConfig {

/**
* Returns the maximum possible size of Ditto headers in bytes.
*
* @return the max size in bytes.
*/
long getHeadersMaxSize();

/**
* Returns the maximum number of authorization subjects in Ditto headers.
*
* @return the max count.
*/
int getAuthSubjectsMaxCount();

/**
* Returns the maximum possible size of "Thing" entities in bytes.
*
Expand Down Expand Up @@ -77,16 +63,6 @@ public interface LimitsConfig {
*/
enum LimitsConfigValue implements KnownConfigValue {

/**
* The maximum possible size of Ditto headers in bytes.
*/
HEADERS_MAX_SIZE("headers.max-size", 5_000L),

/**
* The maximum number of authorization subjects in Ditto headers.
*/
AUTH_SUBJECTS_MAX_SIZE("headers.auth-subjects", 100),

/**
* The maximum possible size of "Thing" entities in bytes.
*/
Expand Down
Expand Up @@ -110,11 +110,13 @@
import org.eclipse.ditto.connectivity.service.messaging.tunnel.SshTunnelState;
import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.edge.service.headers.DittoHeadersValidator;
import org.eclipse.ditto.internal.models.signal.correlation.MatchingValidationResult;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.cluster.AkkaJacksonCborSerializable;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.gauge.Gauge;
import org.eclipse.ditto.internal.utils.protocol.ProtocolAdapterProvider;
Expand All @@ -125,8 +127,6 @@
import org.eclipse.ditto.thingsearch.model.signals.commands.ThingSearchCommand;
import org.eclipse.ditto.thingsearch.model.signals.commands.WithSubscriptionId;

import com.typesafe.config.Config;

import akka.Done;
import akka.NotUsed;
import akka.actor.AbstractFSMWithStash;
Expand All @@ -147,6 +147,7 @@
import akka.pattern.Patterns;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
import com.typesafe.config.Config;

/**
* Base class for ClientActors which implement the connection handling for various connectivity protocols.
Expand Down Expand Up @@ -196,6 +197,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
protected final ConnectivityCounterRegistry connectionCounterRegistry;
protected final ConnectionLoggerRegistry connectionLoggerRegistry;
protected final ChildActorNanny childActorNanny;
private final DittoHeadersValidator dittoHeadersValidator;
private final boolean dryRun;

private Sink<Object, NotUsed> inboundMappingSink;
Expand Down Expand Up @@ -230,6 +232,7 @@ protected BaseClientActor(final Connection connection,

commandForwarderActorSelection = getLocalActorOfSamePath(commandForwarderActor);
childActorNanny = ChildActorNanny.newInstance(getContext(), logger);
dittoHeadersValidator = DittoHeadersValidator.get(system, ScopedConfig.dittoExtension(config));

final UserIndicatedErrors userIndicatedErrors = UserIndicatedErrors.of(config);
connectivityStatusResolver = ConnectivityStatusResolver.of(userIndicatedErrors);
Expand Down Expand Up @@ -1766,6 +1769,7 @@ private Sink<Object, NotUsed> getInboundDispatchingSink(final ActorRef outboundM
getSelf(),
getContext(),
connectivityConfig,
dittoHeadersValidator,
getResponseValidationFailureConsumer());
}

Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.commands.ErrorResponse;
import org.eclipse.ditto.edge.service.headers.DittoHeadersValidator;
import org.eclipse.ditto.policies.model.PolicyException;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.policies.model.signals.commands.PolicyErrorResponse;
Expand All @@ -38,14 +39,14 @@
final class DittoRuntimeExceptionToErrorResponseFunction
implements BiFunction<DittoRuntimeException, TopicPath, ErrorResponse<?>> {

private final long headersMaxSize;
private final DittoHeadersValidator dittoHeadersValidator;

private DittoRuntimeExceptionToErrorResponseFunction(final long headersMaxSize) {
this.headersMaxSize = headersMaxSize;
private DittoRuntimeExceptionToErrorResponseFunction(final DittoHeadersValidator dittoHeadersValidator) {
this.dittoHeadersValidator = dittoHeadersValidator;
}

static DittoRuntimeExceptionToErrorResponseFunction of(final long headersMaxSize) {
return new DittoRuntimeExceptionToErrorResponseFunction(headersMaxSize);
static DittoRuntimeExceptionToErrorResponseFunction of(final DittoHeadersValidator dittoHeadersValidator) {
return new DittoRuntimeExceptionToErrorResponseFunction(dittoHeadersValidator);
}

@Override
Expand Down Expand Up @@ -78,23 +79,27 @@ private PolicyErrorResponse getPolicyErrorResponse(final DittoRuntimeException e

return getEntityId(exception, topicPath)
.flatMap(constructEntityIdSafely(PolicyId::of))
.map(policyId -> PolicyErrorResponse.of(policyId, exception, truncateHeaders(exception)))
.orElseGet(() -> PolicyErrorResponse.of(exception, truncateHeaders(exception.getDittoHeaders()
.toBuilder()
.removeHeader(DittoHeaderDefinition.ENTITY_ID.getKey())
.build())));
.map(policyId -> PolicyErrorResponse.of(policyId, exception,
dittoHeadersValidator.truncate(exception.getDittoHeaders())))
.orElseGet(() -> PolicyErrorResponse.of(exception,
dittoHeadersValidator.truncate(exception.getDittoHeaders()
.toBuilder()
.removeHeader(DittoHeaderDefinition.ENTITY_ID.getKey())
.build())));
}

private ThingErrorResponse getThingErrorResponse(final DittoRuntimeException exception,
@Nullable final TopicPath topicPath) {

return getEntityId(exception, topicPath)
.flatMap(constructEntityIdSafely(ThingId::of))
.map(thingId -> ThingErrorResponse.of(thingId, exception, truncateHeaders(exception)))
.orElseGet(() -> ThingErrorResponse.of(exception, truncateHeaders(exception.getDittoHeaders()
.toBuilder()
.removeHeader(DittoHeaderDefinition.ENTITY_ID.getKey())
.build())));
.map(thingId -> ThingErrorResponse.of(thingId, exception,
dittoHeadersValidator.truncate(exception.getDittoHeaders())))
.orElseGet(() -> ThingErrorResponse.of(exception,
dittoHeadersValidator.truncate(exception.getDittoHeaders()
.toBuilder()
.removeHeader(DittoHeaderDefinition.ENTITY_ID.getKey())
.build())));
}

private static <T> Function<EntityId, Optional<T>> constructEntityIdSafely(
Expand Down Expand Up @@ -149,19 +154,4 @@ private static Optional<EntityId> getEntityIdFromDittoHeaders(final DittoHeaders
return result;
}

private DittoHeaders truncateHeaders(final WithDittoHeaders withDittoHeaders) {

/*
* Truncate headers to send in an error response.
* This is necessary because the consumer actor and the publisher actor may not reside in the same connectivity
* instance due to cluster routing.
*/
final var dittoHeaders = withDittoHeaders.getDittoHeaders();
return truncateHeaders(dittoHeaders);
}

private DittoHeaders truncateHeaders(final DittoHeaders dittoHeaders) {
return dittoHeaders.truncate(headersMaxSize);
}

}

0 comments on commit 25689b3

Please sign in to comment.