Skip to content

Commit

Permalink
fix providing Ditto Adaptable information in the "_context" of an SSE…
Browse files Browse the repository at this point in the history
… event

Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Aug 11, 2023
1 parent dcfafe1 commit e99df3a
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.exceptions.SignalEnrichmentFailedException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.headers.translator.HeaderTranslator;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.FeatureToggle;
import org.eclipse.ditto.base.model.signals.commands.streaming.SubscribeForPersistedEvents;
Expand Down Expand Up @@ -75,6 +75,10 @@
import org.eclipse.ditto.messages.model.Message;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
import org.eclipse.ditto.placeholders.TimePlaceholder;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.JsonifiableAdaptable;
import org.eclipse.ditto.protocol.Payload;
import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter;
import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder;
import org.eclipse.ditto.protocol.placeholders.TopicPathPlaceholder;
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
Expand Down Expand Up @@ -154,6 +158,8 @@ public final class ThingsSseRouteBuilder extends RouteDirectives implements SseR
private final StreamingConfig streamingConfig;
private final QueryFilterCriteriaFactory queryFilterCriteriaFactory;
private final ActorRef pubSubMediator;
private final HeaderTranslator headerTranslator;
private final DittoProtocolAdapter dittoProtocolAdapter;
private SseConnectionSupervisor sseConnectionSupervisor;
private SseEventSniffer eventSniffer;
private StreamingAuthorizationEnforcer sseAuthorizationEnforcer;
Expand All @@ -164,12 +170,16 @@ private ThingsSseRouteBuilder(final ActorSystem actorSystem,
final ActorRef streamingActor,
final StreamingConfig streamingConfig,
final QueryFilterCriteriaFactory queryFilterCriteriaFactory,
final ActorRef pubSubMediator) {
final ActorRef pubSubMediator,
final HeaderTranslator headerTranslator) {

this.streamingActor = streamingActor;
this.streamingConfig = streamingConfig;
this.queryFilterCriteriaFactory = queryFilterCriteriaFactory;
this.pubSubMediator = pubSubMediator;
this.headerTranslator = headerTranslator;

dittoProtocolAdapter = DittoProtocolAdapter.of(headerTranslator);

final Config config = actorSystem.settings().config();
final var dittoExtensionsConfig = ScopedConfig.dittoExtension(config);
Expand All @@ -183,16 +193,19 @@ private ThingsSseRouteBuilder(final ActorSystem actorSystem,
/**
* Returns an instance of this class.
*
* @param actorSystem the actor system.
* @param streamingActor is used for actual event streaming.
* @param streamingConfig the streaming configuration.
* @param pubSubMediator akka pub-sub mediator for error reporting by the search source.
* @param headerTranslator the header translator used for translating to external headers.
* @return the instance.
* @throws NullPointerException if {@code streamingActor} is {@code null}.
*/
public static ThingsSseRouteBuilder getInstance(final ActorSystem actorSystem,
final ActorRef streamingActor,
final StreamingConfig streamingConfig,
final ActorRef pubSubMediator) {
final ActorRef pubSubMediator,
final HeaderTranslator headerTranslator) {

checkNotNull(streamingActor, "streamingActor");
final var queryFilterCriteriaFactory =
Expand All @@ -201,7 +214,7 @@ public static ThingsSseRouteBuilder getInstance(final ActorSystem actorSystem,
TimePlaceholder.getInstance());

return new ThingsSseRouteBuilder(actorSystem, streamingActor, streamingConfig, queryFilterCriteriaFactory,
pubSubMediator);
pubSubMediator, headerTranslator);
}

@Override
Expand Down Expand Up @@ -685,7 +698,7 @@ private static boolean targetThingIdMatches(final ThingEvent<?> event,
return targetThingIds.isEmpty() || targetThingIds.contains(event.getEntityId());
}

private static Collection<JsonValue> toNonemptyValue(final Thing thing, final ThingEvent<?> event,
private Collection<JsonValue> toNonemptyValue(final Thing thing, final ThingEvent<?> event,
final JsonPointer fieldPointer,
@Nullable final JsonFieldSelector fields) {
final var jsonSchemaVersion = event.getDittoHeaders()
Expand Down Expand Up @@ -763,21 +776,24 @@ private static Counter getCounterFor(final String path) {
* {@code withDittoHeaders}.
*
* @param objectBuilder the JsonObject build to add the {@code _context} to.
* @param withDittoHeaders the object to extract the {@code DittoHeaders} from.
* @param thingEvent the event to add context from.
* @return the built JsonObject including the {@code _context}.
*/
private static JsonObject addContext(final JsonObjectBuilder objectBuilder,
final WithDittoHeaders withDittoHeaders) {
private JsonObject addContext(final JsonObjectBuilder objectBuilder,
final ThingEvent<?> thingEvent) {

final Adaptable adaptable = dittoProtocolAdapter.toAdaptable(thingEvent);
objectBuilder.set(CONTEXT, JsonObject.newBuilder()
.set("headers", dittoHeadersToJson(withDittoHeaders.getDittoHeaders()))
.set(JsonifiableAdaptable.JsonFields.TOPIC, adaptable.getTopicPath().getPath())
.set(Payload.JsonFields.PATH, adaptable.getPayload().getPath().toString())
.set(JsonifiableAdaptable.JsonFields.HEADERS, dittoHeadersToJson(thingEvent.getDittoHeaders()))
.build()
);
return objectBuilder.build();
}

private static JsonObject dittoHeadersToJson(final DittoHeaders dittoHeaders) {
return dittoHeaders.entrySet()
private JsonObject dittoHeadersToJson(final DittoHeaders dittoHeaders) {
return headerTranslator.toExternalHeaders(dittoHeaders).entrySet()
.stream()
.map(entry -> JsonFactory.newField(JsonKey.of(entry.getKey()), JsonFactory.newValue(entry.getValue())))
.collect(JsonCollectors.fieldsToObject());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,10 @@ private static Route createRoute(final ActorSystem actorSystem,
.devopsRoute(new DevOpsRoute(routeBaseProperties, devopsAuthenticationDirective))
.policiesRoute(new PoliciesRoute(routeBaseProperties,
OAuthTokenIntegrationSubjectIdFactory.of(authConfig.getOAuthConfig())))
.sseThingsRoute(
ThingsSseRouteBuilder.getInstance(actorSystem, streamingActor, streamingConfig, pubSubMediator)
.withProxyActor(proxyActor)
.withSignalEnrichmentProvider(signalEnrichmentProvider))
.sseThingsRoute(ThingsSseRouteBuilder
.getInstance(actorSystem, streamingActor, streamingConfig, pubSubMediator, headerTranslator)
.withProxyActor(proxyActor)
.withSignalEnrichmentProvider(signalEnrichmentProvider))
.thingsRoute(new ThingsRoute(routeBaseProperties,
gatewayConfig.getMessageConfig(),
gatewayConfig.getClaimMessageConfig()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void setUp() {
.sseThingsRoute(ThingsSseRouteBuilder.getInstance(routeBaseProperties.getActorSystem(),
routeBaseProperties.getProxyActor(),
streamingConfig,
routeBaseProperties.getProxyActor()))
routeBaseProperties.getProxyActor(), httpHeaderTranslator))
.thingsRoute(new ThingsRoute(routeBaseProperties, messageConfig, claimMessageConfig))
.thingSearchRoute(new ThingSearchRoute(routeBaseProperties))
.whoamiRoute(new WhoamiRoute(routeBaseProperties))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.ditto.base.model.auth.AuthorizationModelFactory;
import org.eclipse.ditto.base.model.auth.DittoAuthorizationContextType;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.translator.HeaderTranslator;
import org.eclipse.ditto.gateway.api.GatewayServiceUnavailableException;
import org.eclipse.ditto.gateway.service.endpoints.EndpointTestBase;
import org.eclipse.ditto.gateway.service.streaming.actors.SessionedJsonifiable;
Expand Down Expand Up @@ -101,7 +102,8 @@ public void setUp() {
() -> CompletableFuture.completedFuture(dittoHeaders);

final var sseRouteBuilder =
ThingsSseRouteBuilder.getInstance(actorSystem, streamingActor.ref(), streamingConfig, proxyActor.ref());
ThingsSseRouteBuilder.getInstance(actorSystem, streamingActor.ref(), streamingConfig, proxyActor.ref(),
HeaderTranslator.empty());
sseRouteBuilder.withProxyActor(proxyActor.ref());
final Route sseRoute = extractRequestContext(ctx -> sseRouteBuilder.build(ctx, dittoHeadersSupplier));
underTest = testRoute(sseRoute);
Expand Down

0 comments on commit e99df3a

Please sign in to comment.