From a1e6e0b1770a87a7c735d981539adb58e8bfbc77 Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Fri, 4 Dec 2020 10:28:47 +0100 Subject: [PATCH] [#903]: Make schema optional, allow alternate content type This change allows to make the schema optional and also allows to configure the data content type. Signed-off-by: Jens Reimann --- .../endpoints/routes/AbstractRoute.java | 2 +- .../routes/cloudevents/CloudEventsRoute.java | 46 +++++++++- .../gateway/endpoints/EndpointTestBase.java | 4 + .../endpoints/routes/RootRouteTest.java | 2 +- .../gateway/starter/GatewayRootActor.java | 3 +- .../util/config/DittoGatewayConfig.java | 9 ++ .../gateway/util/config/GatewayConfig.java | 7 ++ .../config/endpoints/CloudEventsConfig.java | 76 ++++++++++++++++ .../endpoints/DefaultCloudEventsConfig.java | 89 +++++++++++++++++++ 9 files changed, 231 insertions(+), 7 deletions(-) create mode 100644 services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/endpoints/CloudEventsConfig.java create mode 100644 services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/endpoints/DefaultCloudEventsConfig.java diff --git a/services/gateway/endpoints/src/main/java/org/eclipse/ditto/services/gateway/endpoints/routes/AbstractRoute.java b/services/gateway/endpoints/src/main/java/org/eclipse/ditto/services/gateway/endpoints/routes/AbstractRoute.java index d41129542fd..1339d9ece0a 100755 --- a/services/gateway/endpoints/src/main/java/org/eclipse/ditto/services/gateway/endpoints/routes/AbstractRoute.java +++ b/services/gateway/endpoints/src/main/java/org/eclipse/ditto/services/gateway/endpoints/routes/AbstractRoute.java @@ -91,13 +91,13 @@ public abstract class AbstractRoute extends AllDirectives { protected final ActorRef proxyActor; protected final ActorSystem actorSystem; - protected final Set mediaTypeJsonWithFallbacks; private final HttpConfig httpConfig; private final CommandConfig commandConfig; private final HeaderTranslator headerTranslator; private final HttpRequestActorPropsFactory httpRequestActorPropsFactory; private final Attributes supervisionStrategy; + private final Set mediaTypeJsonWithFallbacks; /** * Constructs the abstract route builder. diff --git a/services/gateway/endpoints/src/main/java/org/eclipse/ditto/services/gateway/endpoints/routes/cloudevents/CloudEventsRoute.java b/services/gateway/endpoints/src/main/java/org/eclipse/ditto/services/gateway/endpoints/routes/cloudevents/CloudEventsRoute.java index e496c0ede22..a38e2581975 100755 --- a/services/gateway/endpoints/src/main/java/org/eclipse/ditto/services/gateway/endpoints/routes/cloudevents/CloudEventsRoute.java +++ b/services/gateway/endpoints/src/main/java/org/eclipse/ditto/services/gateway/endpoints/routes/cloudevents/CloudEventsRoute.java @@ -36,6 +36,7 @@ import org.eclipse.ditto.protocoladapter.ProtocolFactory; import org.eclipse.ditto.services.gateway.endpoints.actors.AbstractHttpRequestActor; import org.eclipse.ditto.services.gateway.endpoints.routes.AbstractRoute; +import org.eclipse.ditto.services.gateway.util.config.endpoints.CloudEventsConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.CommandConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.HttpConfig; import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory; @@ -50,6 +51,9 @@ import akka.http.javadsl.model.HttpResponse; import akka.http.javadsl.server.RequestContext; import akka.http.javadsl.server.Route; +import akka.http.scaladsl.model.ContentType; +import akka.http.scaladsl.model.MediaType; +import akka.http.scaladsl.model.MediaTypes; import akka.stream.javadsl.Sink; import akka.util.ByteString; import io.cloudevents.CloudEvent; @@ -73,6 +77,8 @@ public final class CloudEventsRoute extends AbstractRoute { private static final DittoProtocolAdapter PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance(); private static final String DATA_SCHEMA_SCHEME = "ditto"; + private final CloudEventsConfig cloudEventsConfig; + /** * Constructs the cloud events route builder. * @@ -81,6 +87,7 @@ public final class CloudEventsRoute extends AbstractRoute { * @param httpConfig the configuration settings of the Gateway service's HTTP endpoint. * @param commandConfig the configuration settings for incoming commands (via HTTP requests) in the gateway. * @param headerTranslator translates headers from external sources or to external sources. + * @param cloudEventsConfig the configuration settings for cloud events. * @throws NullPointerException if any argument is {@code null}. */ public CloudEventsRoute( @@ -88,9 +95,11 @@ public CloudEventsRoute( final ActorSystem actorSystem, final HttpConfig httpConfig, final CommandConfig commandConfig, - final HeaderTranslator headerTranslator + final HeaderTranslator headerTranslator, + final CloudEventsConfig cloudEventsConfig ) { super(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator); + this.cloudEventsConfig = cloudEventsConfig; } @@ -251,7 +260,7 @@ private Optional> jsonToDittoSignal(@Nullable final CloudEventData dat final JsonifiableAdaptable jsonifiableAdaptable = ProtocolFactory.jsonifiableAdaptableFromJson(jsonObject); final Signal signal = PROTOCOL_ADAPTER.fromAdaptable(jsonifiableAdaptable); final Signal signalWithAdjustedHeaders = signal.setDittoHeaders( - signal.getDittoHeaders().toBuilder().putHeaders(adjustedHeaders).build()); + signal.getDittoHeaders().toBuilder().putHeaders(adjustedHeaders).build()); return Optional.of(signalWithAdjustedHeaders); } @@ -259,7 +268,7 @@ private void ensureDataContentType(@Nullable final String dataContentType, final RequestContext ctx, final DittoHeaders dittoHeaders) { - if (dataContentType == null || !mediaTypeJsonWithFallbacks.contains(dataContentType)) { + if (!isCorrectDataType(dataContentType)) { if (LOGGER.isInfoEnabled()) { LOGGER.withCorrelationId(dittoHeaders) .info("CloudEvent request rejected: unsupported data-content-type: <{}> request: <{}>", @@ -267,12 +276,36 @@ private void ensureDataContentType(@Nullable final String dataContentType, } throw UnsupportedMediaTypeException .withDetailedInformationBuilder(dataContentType != null ? dataContentType : "none", - mediaTypeJsonWithFallbacks) + cloudEventsConfig.getDataTypes()) .dittoHeaders(dittoHeaders) .build(); } } + /** + * Test if the data type is acceptable. + *

+ * A missing, empty or malformed data type is not acceptable. + * + * @param dataContentType The content type to check. + * @return {@code true} if the content type is acceptable, {@code false} otherwise. + */ + private boolean isCorrectDataType(@Nullable final String dataContentType) { + if (dataContentType == null) { + // no content type + return false; + } + + final var result = MediaType.parse(dataContentType); + final MediaType type = result.getOrElse(() -> null); + if (type == null) { + // failed to parse content type + return false; + } + + return this.cloudEventsConfig.getDataTypes().contains(String.format("%s/%s", type.mainType(), type.subType())); + } + /** * Ensure that the data schema starts with {@code ditto:}. * @@ -284,6 +317,11 @@ private void ensureDataSchema(@Nullable final URI dataSchema, final RequestContext ctx, final DittoHeaders dittoHeaders) { + if (dataSchema == null && cloudEventsConfig.isEmptySchemaAllowed()) { + // early return, no schema, but no requirement to have one + return; + } + if (dataSchema == null || !dataSchema.getScheme().equals(DATA_SCHEMA_SCHEME)) { if (LOGGER.isInfoEnabled()) { LOGGER.withCorrelationId(dittoHeaders) diff --git a/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/EndpointTestBase.java b/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/EndpointTestBase.java index 66408aa1349..2c07b8e4a5b 100755 --- a/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/EndpointTestBase.java +++ b/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/EndpointTestBase.java @@ -38,8 +38,10 @@ import org.eclipse.ditto.services.gateway.security.authentication.jwt.JwtAuthorizationSubjectsProviderFactory; import org.eclipse.ditto.services.gateway.security.utils.DefaultHttpClientFacade; import org.eclipse.ditto.services.gateway.security.utils.HttpClientFacade; +import org.eclipse.ditto.services.gateway.util.config.endpoints.CloudEventsConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.CommandConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultClaimMessageConfig; +import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultCloudEventsConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultCommandConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultMessageConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultPublicHealthConfig; @@ -98,6 +100,7 @@ public abstract class EndpointTestBase extends JUnitRouteTest { protected static StreamingConfig streamingConfig; protected static PublicHealthConfig publicHealthConfig; protected static ProtocolConfig protocolConfig; + protected static CloudEventsConfig cloudEventsConfig; protected static JwtAuthenticationFactory jwtAuthenticationFactory; protected static HttpClientFacade httpClientFacade; protected static JwtAuthorizationSubjectsProviderFactory authorizationSubjectsProviderFactory; @@ -117,6 +120,7 @@ public static void initTestFixture() { streamingConfig = DefaultStreamingConfig.of(gatewayScopedConfig); publicHealthConfig = DefaultPublicHealthConfig.of(gatewayScopedConfig); protocolConfig = DefaultProtocolConfig.of(dittoScopedConfig); + cloudEventsConfig = DefaultCloudEventsConfig.of(gatewayScopedConfig); httpClientFacade = DefaultHttpClientFacade.getInstance(ActorSystem.create(EndpointTestBase.class.getSimpleName()), DefaultHttpProxyConfig.ofProxy(DefaultScopedConfig.empty("/"))); authorizationSubjectsProviderFactory = DittoJwtAuthorizationSubjectsProvider::of; diff --git a/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/routes/RootRouteTest.java b/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/routes/RootRouteTest.java index bb2db357fd3..05f4aa64f75 100755 --- a/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/routes/RootRouteTest.java +++ b/services/gateway/endpoints/src/test/java/org/eclipse/ditto/services/gateway/endpoints/routes/RootRouteTest.java @@ -168,7 +168,7 @@ public void setUp() { .thingSearchRoute( new ThingSearchRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator)) .whoamiRoute(new WhoamiRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator)) - .cloudEventsRoute(new CloudEventsRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator)) + .cloudEventsRoute(new CloudEventsRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator, cloudEventsConfig)) .websocketRoute(WebSocketRoute.getInstance(proxyActor, streamingConfig, materializer)) .supportedSchemaVersions(httpConfig.getSupportedSchemaVersions()) .protocolAdapterProvider(protocolAdapterProvider) diff --git a/services/gateway/starter/src/main/java/org/eclipse/ditto/services/gateway/starter/GatewayRootActor.java b/services/gateway/starter/src/main/java/org/eclipse/ditto/services/gateway/starter/GatewayRootActor.java index 96f944fcdaf..14de6444a4d 100755 --- a/services/gateway/starter/src/main/java/org/eclipse/ditto/services/gateway/starter/GatewayRootActor.java +++ b/services/gateway/starter/src/main/java/org/eclipse/ditto/services/gateway/starter/GatewayRootActor.java @@ -283,7 +283,8 @@ private static Route createRoute(final ActorSystem actorSystem, .thingSearchRoute( new ThingSearchRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator)) .whoamiRoute(new WhoamiRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator)) - .cloudEventsRoute(new CloudEventsRoute(proxyActor, actorSystem, httpConfig, commandConfig, headerTranslator)) + .cloudEventsRoute(new CloudEventsRoute(proxyActor, actorSystem, httpConfig, commandConfig, + headerTranslator, gatewayConfig.getCloudEventsConfig())) .websocketRoute(WebSocketRoute.getInstance(streamingActor, streamingConfig, materializer) .withSignalEnrichmentProvider(signalEnrichmentProvider) .withHeaderTranslator(headerTranslator)) diff --git a/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/DittoGatewayConfig.java b/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/DittoGatewayConfig.java index 2eb3ca60733..422ba82a5b2 100644 --- a/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/DittoGatewayConfig.java +++ b/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/DittoGatewayConfig.java @@ -16,8 +16,10 @@ import org.eclipse.ditto.services.base.config.DittoServiceConfig; import org.eclipse.ditto.services.base.config.limits.LimitsConfig; +import org.eclipse.ditto.services.gateway.util.config.endpoints.CloudEventsConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.CommandConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultClaimMessageConfig; +import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultCloudEventsConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultCommandConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultMessageConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.DefaultPublicHealthConfig; @@ -59,6 +61,7 @@ public final class DittoGatewayConfig implements GatewayConfig, WithConfigPath { private final AuthenticationConfig authenticationConfig; private final StreamingConfig streamingConfig; private final PublicHealthConfig publicHealthConfig; + private final DefaultCloudEventsConfig cloudEventsConfig; private DittoGatewayConfig(final ScopedConfig dittoScopedConfig) { @@ -73,6 +76,7 @@ private DittoGatewayConfig(final ScopedConfig dittoScopedConfig) { authenticationConfig = DefaultAuthenticationConfig.of(dittoServiceConfig); streamingConfig = DefaultStreamingConfig.of(dittoServiceConfig); publicHealthConfig = DefaultPublicHealthConfig.of(dittoServiceConfig); + cloudEventsConfig = DefaultCloudEventsConfig.of(dittoServiceConfig); } /** @@ -152,6 +156,11 @@ public ProtocolConfig getProtocolConfig() { return protocolConfig; } + @Override + public CloudEventsConfig getCloudEventsConfig() { + return cloudEventsConfig; + } + /** * @return always {@value #CONFIG_PATH}. */ diff --git a/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/GatewayConfig.java b/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/GatewayConfig.java index 95075f72b18..71cc3ca594f 100644 --- a/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/GatewayConfig.java +++ b/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/GatewayConfig.java @@ -13,6 +13,7 @@ package org.eclipse.ditto.services.gateway.util.config; import org.eclipse.ditto.services.base.config.ServiceSpecificConfig; +import org.eclipse.ditto.services.gateway.util.config.endpoints.CloudEventsConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.CommandConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.HttpConfig; import org.eclipse.ditto.services.gateway.util.config.endpoints.MessageConfig; @@ -88,4 +89,10 @@ public interface GatewayConfig extends ServiceSpecificConfig, WithProtocolConfig */ PublicHealthConfig getPublicHealthConfig(); + /** + * Returns the configuration for the cloud events endpoint. + * + * @return the config. + */ + CloudEventsConfig getCloudEventsConfig(); } diff --git a/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/endpoints/CloudEventsConfig.java b/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/endpoints/CloudEventsConfig.java new file mode 100644 index 00000000000..1f8cac1f048 --- /dev/null +++ b/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/endpoints/CloudEventsConfig.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2020 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.services.gateway.util.config.endpoints; + +import java.util.Set; + +import javax.annotation.concurrent.Immutable; + +import org.eclipse.ditto.services.utils.config.KnownConfigValue; + +/** + * Provides configuration settings for the cloud events endpoint of the Ditto Gateway service. + */ +@Immutable +public interface CloudEventsConfig { + + /** + * Returns if an empty data schema is allowed. + * + * @return {@code true} if an empty data schema is allowed {@code false} otherwise. + */ + boolean isEmptySchemaAllowed(); + + /** + * Returns the allowed data types. + * + * @return The set of allowed data types. + */ + Set getDataTypes(); + + /** + * An enumeration of the known config path expressions and their associated default values for + * {@code CloudEventsConfig}. + */ + enum CloudEventsConfigValue implements KnownConfigValue { + + /** + * Flag if an empty data schema is allowed. + */ + EMPTY_SCHEMA_ALLOWED("empty-schema-allowed", true), + + /** + * Set of allowed data types + */ + DATA_TYPES("data-types", Set.of("application/json", "application/vnd.eclipse.ditto+json")); + + private final String path; + private final Object defaultValue; + + private CloudEventsConfigValue(final String thePath, final Object theDefaultValue) { + path = thePath; + defaultValue = theDefaultValue; + } + + @Override + public Object getDefaultValue() { + return defaultValue; + } + + @Override + public String getConfigPath() { + return path; + } + + } +} diff --git a/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/endpoints/DefaultCloudEventsConfig.java b/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/endpoints/DefaultCloudEventsConfig.java new file mode 100644 index 00000000000..9132c7208d8 --- /dev/null +++ b/services/gateway/util/src/main/java/org/eclipse/ditto/services/gateway/util/config/endpoints/DefaultCloudEventsConfig.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2020 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.services.gateway.util.config.endpoints; + +import java.util.Objects; +import java.util.Set; + +import javax.annotation.concurrent.Immutable; + +import org.eclipse.ditto.services.utils.config.ConfigWithFallback; +import org.eclipse.ditto.services.utils.config.ScopedConfig; + +import com.typesafe.config.Config; + +/** + * This class is the default implementation of the cloud events endpoint config. + */ +@Immutable +public class DefaultCloudEventsConfig implements CloudEventsConfig { + + private static final String CONFIG_PATH = "cloud-events"; + + private final boolean emptySchemaAllowed; + + private final Set dataTypes; + + private DefaultCloudEventsConfig(final ScopedConfig scopedConfig) { + emptySchemaAllowed = scopedConfig.getBoolean(CloudEventsConfig.CloudEventsConfigValue.EMPTY_SCHEMA_ALLOWED.getConfigPath()); + dataTypes = Set.copyOf(scopedConfig.getStringList(CloudEventsConfigValue.DATA_TYPES.getConfigPath())); + } + + /** + * Returns an instance of {@code DefaultCloudEventsConfig} based on the settings of the specified Config. + * + * @param config is supposed to provide the settings of the public health config at {@value #CONFIG_PATH}. + * @return the instance. + * @throws org.eclipse.ditto.services.utils.config.DittoConfigError if {@code config} is invalid. + */ + public static DefaultCloudEventsConfig of(final Config config) { + return new DefaultCloudEventsConfig( + ConfigWithFallback.newInstance(config, CONFIG_PATH, CloudEventsConfig.CloudEventsConfigValue.values())); + } + + @Override + public boolean isEmptySchemaAllowed() { + return emptySchemaAllowed; + } + + @Override + public Set getDataTypes() { + return dataTypes; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DefaultCloudEventsConfig that = (DefaultCloudEventsConfig) o; + return emptySchemaAllowed == that.emptySchemaAllowed && dataTypes.equals(that.dataTypes); + } + + @Override + public int hashCode() { + return Objects.hash(emptySchemaAllowed, dataTypes); + } + + @Override + public String toString() { + return getClass().getSimpleName() + " [" + + "emptySchemaAllowed=" + emptySchemaAllowed + + "dataTypes=" + dataTypes + + "]"; + } + +}