Skip to content

Commit

Permalink
fix date and long parsing exception when doing wrong history API calls
Browse files Browse the repository at this point in the history
  • Loading branch information
thjaeckle committed Mar 4, 2024
1 parent 74450b4 commit 41d088a
Showing 1 changed file with 121 additions and 103 deletions.
Expand Up @@ -19,6 +19,7 @@
import java.text.MessageFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -44,7 +45,9 @@
import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.http.javadsl.marshalling.sse.EventStreamMarshalling;
import org.apache.pekko.http.javadsl.model.ContentTypes;
import org.apache.pekko.http.javadsl.model.HttpHeader;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import org.apache.pekko.http.javadsl.model.MediaTypes;
import org.apache.pekko.http.javadsl.model.StatusCodes;
import org.apache.pekko.http.javadsl.model.headers.Accept;
Expand All @@ -59,6 +62,7 @@
import org.apache.pekko.stream.KillSwitches;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.Source;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.exceptions.SignalEnrichmentFailedException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
Expand Down Expand Up @@ -353,109 +357,123 @@ private Route createSseRoute(final RequestContext ctx, final CompletionStage<Dit
@Nullable final ThingFieldSelector fields = getFieldSelector(parameters.get(PARAM_FIELDS));
@Nullable final ThingFieldSelector extraFields = getFieldSelector(parameters.get(PARAM_EXTRA_FIELDS));

@Nullable final Long fromHistoricalRevision = Optional.ofNullable(
parameters.get(PARAM_FROM_HISTORICAL_REVISION))
.map(Long::parseLong)
.orElse(null);
@Nullable final Long toHistoricalRevision = Optional.ofNullable(
parameters.get(PARAM_TO_HISTORICAL_REVISION))
.map(Long::parseLong)
.orElse(null);

@Nullable final Instant fromHistoricalTimestamp = Optional.ofNullable(
parameters.get(PARAM_FROM_HISTORICAL_TIMESTAMP))
.map(Instant::parse)
.orElse(null);
@Nullable final Instant toHistoricalTimestamp = Optional.ofNullable(
parameters.get(PARAM_TO_HISTORICAL_TIMESTAMP))
.map(Instant::parse)
.orElse(null);

final CompletionStage<SignalEnrichmentFacade> facadeStage = signalEnrichmentProvider == null
? CompletableFuture.completedFuture(null)
: signalEnrichmentProvider.getFacade(ctx.getRequest());


final var sseSourceStage = facadeStage.thenCompose(facade -> dittoHeadersStage.thenCompose(
dittoHeaders -> sseAuthorizationEnforcer.checkAuthorization(ctx, dittoHeaders).thenApply(unused -> {
if (filterString != null) {
// will throw an InvalidRqlExpressionException if the RQL expression was not valid:
queryFilterCriteriaFactory.filterCriteria(filterString, dittoHeaders);
}

final String connectionCorrelationId = dittoHeaders.getCorrelationId()
.orElseThrow(() -> new IllegalStateException(
"Expected correlation-id in SSE DittoHeaders: " + dittoHeaders));
final var authorizationContext = dittoHeaders.getAuthorizationContext();
final Object startStreaming;
if (null != fromHistoricalRevision) {
FeatureToggle
.checkHistoricalApiAccessFeatureEnabled(SubscribeForPersistedEvents.TYPE, dittoHeaders);
startStreaming = SubscribeForPersistedEvents.of(targetThingIds.get(0),
fieldPointer,
fromHistoricalRevision,
null != toHistoricalRevision ? toHistoricalRevision : Long.MAX_VALUE,
filterString,
dittoHeaders);
} else if (null != fromHistoricalTimestamp) {
FeatureToggle
.checkHistoricalApiAccessFeatureEnabled(SubscribeForPersistedEvents.TYPE, dittoHeaders);
startStreaming = SubscribeForPersistedEvents.of(targetThingIds.get(0),
fieldPointer,
fromHistoricalTimestamp,
toHistoricalTimestamp,
filterString,
dittoHeaders);
} else {
startStreaming =
StartStreaming.getBuilder(StreamingType.EVENTS, connectionCorrelationId,
authorizationContext)
.withNamespaces(namespaces)
.withFilter(filterString)
.withExtraFields(extraFields)
.build();
}

final Source<SessionedJsonifiable, SupervisedStream.WithQueue> publisherSource =
SupervisedStream.sourceQueue(10);

return publisherSource.viaMat(KillSwitches.single(), Keep.both())
.mapMaterializedValue(pair -> {
final SupervisedStream.WithQueue withQueue = pair.first();
final KillSwitch killSwitch = pair.second();

final var jsonSchemaVersion = dittoHeaders.getSchemaVersion()
.orElse(JsonSchemaVersion.LATEST);
sseConnectionSupervisor.supervise(withQueue.getSupervisedStream(),
connectionCorrelationId, dittoHeaders);
final var connect = new Connect(withQueue.getSourceQueue(), connectionCorrelationId,
STREAMING_TYPE_SSE, jsonSchemaVersion, null, Set.of(),
authorizationContext, namespaces, null);
Patterns.ask(streamingActor, connect, LOCAL_ASK_TIMEOUT)
.thenApply(ActorRef.class::cast)
.thenAccept(streamingSessionActor ->
streamingSessionActor.tell(startStreaming, ActorRef.noSender()))
.exceptionally(e -> {
killSwitch.abort(e);
return null;
});
return NotUsed.getInstance();
})
.mapAsync(streamingConfig.getParallelism(), jsonifiable ->
postprocess(jsonifiable, facade, targetThingIds, namespaces, fieldPointer, fields))
.mapConcat(jsonValues -> jsonValues)
.map(jsonValue -> {
THINGS_SSE_COUNTER.increment();
return ServerSentEvent.create(jsonValue.toString());
})
.log("SSE " + PATH_THINGS)
// sniffer shouldn't sniff heartbeats
.viaMat(eventSniffer.toAsyncFlow(ctx.getRequest()), Keep.none())
.keepAlive(Duration.ofSeconds(1), ServerSentEvent::heartbeat);
})
));

return completeOKWithFuture(sseSourceStage, EventStreamMarshalling.toEventStream());
try {
@Nullable final Long fromHistoricalRevision = Optional.ofNullable(
parameters.get(PARAM_FROM_HISTORICAL_REVISION))
.map(Long::parseLong)
.orElse(null);
@Nullable final Long toHistoricalRevision = Optional.ofNullable(
parameters.get(PARAM_TO_HISTORICAL_REVISION))
.map(Long::parseLong)
.orElse(null);

@Nullable final Instant fromHistoricalTimestamp = Optional.ofNullable(
parameters.get(PARAM_FROM_HISTORICAL_TIMESTAMP))
.map(Instant::parse)
.orElse(null);
@Nullable final Instant toHistoricalTimestamp = Optional.ofNullable(
parameters.get(PARAM_TO_HISTORICAL_TIMESTAMP))
.map(Instant::parse)
.orElse(null);

final CompletionStage<SignalEnrichmentFacade> facadeStage = signalEnrichmentProvider == null
? CompletableFuture.completedFuture(null)
: signalEnrichmentProvider.getFacade(ctx.getRequest());


final var sseSourceStage = facadeStage.thenCompose(facade -> dittoHeadersStage.thenCompose(
dittoHeaders -> sseAuthorizationEnforcer.checkAuthorization(ctx, dittoHeaders).thenApply(unused -> {
if (filterString != null) {
// will throw an InvalidRqlExpressionException if the RQL expression was not valid:
queryFilterCriteriaFactory.filterCriteria(filterString, dittoHeaders);
}

final String connectionCorrelationId = dittoHeaders.getCorrelationId()
.orElseThrow(() -> new IllegalStateException(
"Expected correlation-id in SSE DittoHeaders: " + dittoHeaders));
final var authorizationContext = dittoHeaders.getAuthorizationContext();
final Object startStreaming;
if (null != fromHistoricalRevision) {
FeatureToggle
.checkHistoricalApiAccessFeatureEnabled(SubscribeForPersistedEvents.TYPE, dittoHeaders);
startStreaming = SubscribeForPersistedEvents.of(targetThingIds.get(0),
fieldPointer,
fromHistoricalRevision,
null != toHistoricalRevision ? toHistoricalRevision : Long.MAX_VALUE,
filterString,
dittoHeaders);
} else if (null != fromHistoricalTimestamp) {
FeatureToggle
.checkHistoricalApiAccessFeatureEnabled(SubscribeForPersistedEvents.TYPE, dittoHeaders);
startStreaming = SubscribeForPersistedEvents.of(targetThingIds.get(0),
fieldPointer,
fromHistoricalTimestamp,
toHistoricalTimestamp,
filterString,
dittoHeaders);
} else {
startStreaming =
StartStreaming.getBuilder(StreamingType.EVENTS, connectionCorrelationId,
authorizationContext)
.withNamespaces(namespaces)
.withFilter(filterString)
.withExtraFields(extraFields)
.build();
}

final Source<SessionedJsonifiable, SupervisedStream.WithQueue> publisherSource =
SupervisedStream.sourceQueue(10);

return publisherSource.viaMat(KillSwitches.single(), Keep.both())
.mapMaterializedValue(pair -> {
final SupervisedStream.WithQueue withQueue = pair.first();
final KillSwitch killSwitch = pair.second();

final var jsonSchemaVersion = dittoHeaders.getSchemaVersion()
.orElse(JsonSchemaVersion.LATEST);
sseConnectionSupervisor.supervise(withQueue.getSupervisedStream(),
connectionCorrelationId, dittoHeaders);
final var connect = new Connect(withQueue.getSourceQueue(), connectionCorrelationId,
STREAMING_TYPE_SSE, jsonSchemaVersion, null, Set.of(),
authorizationContext, namespaces, null);
Patterns.ask(streamingActor, connect, LOCAL_ASK_TIMEOUT)
.thenApply(ActorRef.class::cast)
.thenAccept(streamingSessionActor ->
streamingSessionActor.tell(startStreaming, ActorRef.noSender()))
.exceptionally(e -> {
killSwitch.abort(e);
return null;
});
return NotUsed.getInstance();
})
.mapAsync(streamingConfig.getParallelism(), jsonifiable ->
postprocess(jsonifiable, facade, targetThingIds, namespaces, fieldPointer, fields))
.mapConcat(jsonValues -> jsonValues)
.map(jsonValue -> {
THINGS_SSE_COUNTER.increment();
return ServerSentEvent.create(jsonValue.toString());
})
.log("SSE " + PATH_THINGS)
// sniffer shouldn't sniff heartbeats
.viaMat(eventSniffer.toAsyncFlow(ctx.getRequest()), Keep.none())
.keepAlive(Duration.ofSeconds(1), ServerSentEvent::heartbeat);
})
));

return completeOKWithFuture(sseSourceStage, EventStreamMarshalling.toEventStream());
} catch (final DateTimeParseException | NumberFormatException runtimeException) {
return completeWithFuture(
dittoHeadersStage.thenApply(dittoHeaders -> new DittoJsonException(runtimeException, dittoHeaders))
.thenCompose(dittoJsonEx ->
CompletableFuture.completedFuture(
HttpResponse.create()
.withStatus(StatusCodes.BAD_REQUEST)
.withEntity(ContentTypes.APPLICATION_JSON,
dittoJsonEx.toJsonString())
)
)
);
}
}

private Route createMessagesSseRoute(final RequestContext ctx,
Expand Down

0 comments on commit 41d088a

Please sign in to comment.