Skip to content

Commit

Permalink
Enabled Placeholder resolving for extra fields declaration in connect…
Browse files Browse the repository at this point in the history
…ivity
  • Loading branch information
Yannic92 committed Mar 7, 2022
1 parent 20cf2b9 commit 9a2dd02
Showing 1 changed file with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,14 @@
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.placeholders.ExpressionResolver;
import org.eclipse.ditto.placeholders.PipelineElement;
import org.eclipse.ditto.placeholders.PlaceholderFactory;
import org.eclipse.ditto.placeholders.PlaceholderResolver;
import org.eclipse.ditto.placeholders.TimePlaceholder;
Expand Down Expand Up @@ -368,8 +372,9 @@ private CompletionStage<Collection<OutboundSignalWithSender>> enrichAndFilterSig

final OutboundSignalWithSender outboundSignal = outboundSignalWithExtraFields.first();
final FilteredTopic filteredTopic = outboundSignalWithExtraFields.second();
final Optional<JsonFieldSelector> extraFieldsOptional =
Optional.ofNullable(filteredTopic).flatMap(FilteredTopic::getExtraFields);
final ExpressionResolver expressionResolver =
Resolvers.forSignal(outboundSignal.getSource(), connection.getId());
final Optional<JsonFieldSelector> extraFieldsOptional = getExtraFields(expressionResolver, filteredTopic);
if (extraFieldsOptional.isEmpty()) {
return CompletableFuture.completedFuture(Collections.singletonList(outboundSignal));
}
Expand Down Expand Up @@ -407,6 +412,20 @@ private CompletionStage<Collection<OutboundSignalWithSender>> enrichAndFilterSig
});
}

private static Optional<JsonFieldSelector> getExtraFields(final ExpressionResolver expressionResolver,
@Nullable final FilteredTopic filteredTopic) {

return Optional.ofNullable(filteredTopic)
.flatMap(FilteredTopic::getExtraFields)
.map(extraFields -> extraFields.getPointers().stream()
.map(JsonPointer::toString)
.map(expressionResolver::resolve)
.flatMap(PipelineElement::toStream)
.map(JsonPointer::of)
.collect(Collectors.toList()))
.map(JsonFactory::newFieldSelector);
}

private static Optional<EntityId> extractEntityId(Signal<?> signal) {
return Optional.of(signal)
.filter(WithEntityId.class::isInstance)
Expand Down

0 comments on commit 9a2dd02

Please sign in to comment.