From 737c7172c76024db3450c5c81525803c5707b53d Mon Sep 17 00:00:00 2001 From: Claus Ibsen Date: Mon, 18 May 2026 13:52:04 +0200 Subject: [PATCH] CAMEL-23544: Fix null fromRouteId and endpoint tracking for delegating consumers DefaultPlatformHttpConsumer creates its nested VertxPlatformHttpConsumer in doInit(), which runs before RouteService.doSetup() calls setRouteId() on the outer consumer. The nested consumer never received the route ID, causing exchanges to carry a null fromRouteId and TOTAL counts to show 0 in endpoint statistics. Fix: override setRouteId() in DefaultPlatformHttpConsumer to propagate the route ID to the nested consumer whenever it is set. DefaultRuntimeEndpointRegistry now also tracks the consumer's endpoint URI in the inputs map when it differs from the route's logical endpoint URI (e.g. rest-openapi -> platform-http). Both RouteAddedEvent (adds consumer URI to inputs) and RouteRemovedEvent (removes from inputUtilization) are updated. The ExchangeCreatedEvent handler retains a defensive null-routeId fallback scan for any other delegating component. Introduce SyntheticBacklogTracer (extends BacklogTracer) as a new SPI for components that process exchanges inline and bypass the route pipeline (e.g. rest-openapi mock mode). It exposes traceFirstNode/traceLastNode so those components can participate in message-history capture without relying on CamelInternalProcessor. BacklogTracer impl now implements this interface. DefaultRestOpenapiProcessorStrategy uses it to emit synthetic trace events around mock responses. Co-Authored-By: Claude Sonnet 4.6 --- .../http/DefaultPlatformHttpConsumer.java | 9 +++ .../DefaultRestOpenapiProcessorStrategy.java | 77 ++++++++++++++++++- .../camel/spi/SyntheticBacklogTracer.java | 70 +++++++++++++++++ .../camel/impl/debugger/BacklogTracer.java | 50 +++++++++++- .../DefaultRuntimeEndpointRegistry.java | 28 +++++++ 5 files changed, 231 insertions(+), 3 deletions(-) create mode 100644 core/camel-api/src/main/java/org/apache/camel/spi/SyntheticBacklogTracer.java diff --git a/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/DefaultPlatformHttpConsumer.java b/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/DefaultPlatformHttpConsumer.java index 6c2298561b8ea..fd760be971bc0 100644 --- a/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/DefaultPlatformHttpConsumer.java +++ b/components/camel-platform-http/src/main/java/org/apache/camel/component/platform/http/DefaultPlatformHttpConsumer.java @@ -22,6 +22,7 @@ import org.apache.camel.Suspendable; import org.apache.camel.component.platform.http.spi.PlatformHttpConsumer; import org.apache.camel.component.platform.http.spi.PlatformHttpConsumerAware; +import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.service.ServiceHelper; @@ -68,6 +69,14 @@ public void registerAfterConfigured(AfterPropertiesConfigured listener) { this.afterConfiguredListener = listener; } + @Override + public void setRouteId(String routeId) { + super.setRouteId(routeId); + if (platformHttpConsumer instanceof RouteIdAware ria) { + ria.setRouteId(routeId); + } + } + @Override protected void doInit() throws Exception { platformHttpConsumer = getEndpoint().createPlatformHttpConsumer(getProcessor()); diff --git a/components/camel-rest-openapi/src/main/java/org/apache/camel/component/rest/openapi/DefaultRestOpenapiProcessorStrategy.java b/components/camel-rest-openapi/src/main/java/org/apache/camel/component/rest/openapi/DefaultRestOpenapiProcessorStrategy.java index 086e6d84fdc45..ebf6c45b4d5f0 100644 --- a/components/camel-rest-openapi/src/main/java/org/apache/camel/component/rest/openapi/DefaultRestOpenapiProcessorStrategy.java +++ b/components/camel-rest-openapi/src/main/java/org/apache/camel/component/rest/openapi/DefaultRestOpenapiProcessorStrategy.java @@ -36,6 +36,7 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.NamedNode; import org.apache.camel.NonManagedService; import org.apache.camel.Route; import org.apache.camel.component.platform.http.PlatformHttpComponent; @@ -43,6 +44,7 @@ import org.apache.camel.spi.PackageScanResourceResolver; import org.apache.camel.spi.ProducerCache; import org.apache.camel.spi.Resource; +import org.apache.camel.spi.SyntheticBacklogTracer; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.PluginHelper; import org.apache.camel.support.cache.DefaultProducerCache; @@ -208,7 +210,19 @@ public boolean process( exchange.setRouteStop(true); } else if ("mock".equalsIgnoreCase(missingOperation)) { // no route then try to load mock data as the answer - loadMockData(operation, verb, path, exchange); + SyntheticBacklogTracer syntheticTracer + = camelContext.getCamelContextExtension().getContextPlugin(SyntheticBacklogTracer.class); + NamedNode mockNode = new MockOperationNode(verb, path, operation.getOperationId()); + if (syntheticTracer != null && (syntheticTracer.isEnabled() || syntheticTracer.isStandby())) { + syntheticTracer.traceFirstNode(mockNode, exchange); + } + try { + loadMockData(operation, verb, path, exchange); + } finally { + if (syntheticTracer != null && (syntheticTracer.isEnabled() || syntheticTracer.isStandby())) { + syntheticTracer.traceLastNode(mockNode, exchange); + } + } } if (requestError == null) { var responseError = binding.doClientResponseValidation(exchange); @@ -441,4 +455,65 @@ protected void doStop() throws Exception { } } + /** + * Minimal {@link NamedNode} to represent a mocked OpenAPI operation for tracing purposes. + */ + private static final class MockOperationNode implements NamedNode { + + private final String id; + private final String label; + + private MockOperationNode(String verb, String path, String operationId) { + this.id = operationId != null ? operationId : verb + ":" + path; + this.label = "mock:" + verb + ":" + path; + } + + @Override + public String getId() { + return id; + } + + @Override + public String getNodePrefixId() { + return null; + } + + @Override + public String getShortName() { + return "mock"; + } + + @Override + public String getLabel() { + return label; + } + + @Override + public String getDescriptionText() { + return null; + } + + @Override + public NamedNode getParent() { + return null; + } + + @Override + public int getLineNumber() { + return -1; + } + + @Override + public void setLineNumber(int lineNumber) { + } + + @Override + public String getLocation() { + return null; + } + + @Override + public void setLocation(String location) { + } + } } diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/SyntheticBacklogTracer.java b/core/camel-api/src/main/java/org/apache/camel/spi/SyntheticBacklogTracer.java new file mode 100644 index 0000000000000..9f422d63a3d91 --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/spi/SyntheticBacklogTracer.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import org.apache.camel.Exchange; +import org.apache.camel.NamedNode; + +/** + * Extended {@link BacklogTracer} API for components that process exchanges inline and bypass the normal route pipeline + * (e.g. mock mode in the rest-openapi consumer). Such components cannot rely on the automatic tracing that + * {@code CamelInternalProcessor} applies to every node in the route graph, so they must emit synthetic first/last trace + * events manually to participate in message-history capture. + *

+ * Callers should obtain a {@link BacklogTracer} from the context extension and check whether it also implements this + * interface before invoking the synthetic tracing methods: + * + *

{@code
+ * BacklogTracer bt = camelContext.getCamelContextExtension().getContextPlugin(BacklogTracer.class);
+ * if (bt instanceof SyntheticBacklogTracer st && (st.isEnabled() || st.isStandby())) {
+ *     st.traceFirstNode(node, exchange);
+ *     try {
+ *         // ... inline processing ...
+ *     } finally {
+ *         st.traceLastNode(node, exchange);
+ *     }
+ * }
+ * }
+ * + * @since 4.21 + */ +public interface SyntheticBacklogTracer extends BacklogTracer { + + /** + * Emits a synthetic first trace event ({@code first=true, last=false}) for the given node and exchange. + *

+ * Call this before the inline processing begins. It pairs with {@link #traceLastNode} to bracket the operation, + * mirroring what {@code BacklogTracerRouteAdvice} does automatically for normal route nodes. + * + * @param node the synthetic node representing the inline operation + * @param exchange the current exchange + * @since 4.21 + */ + void traceFirstNode(NamedNode node, Exchange exchange); + + /** + * Emits a synthetic last trace event ({@code first=false, last=true}) for the given node and exchange. + *

+ * Call this after the inline processing completes (typically in a {@code finally} block). The {@code last=true} + * flag triggers message-history completion in the tracer, making the exchange visible in the history view. + * + * @param node the synthetic node representing the inline operation + * @param exchange the current exchange + * @since 4.21 + */ + void traceLastNode(NamedNode node, Exchange exchange); +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java index 3f75319df11e2..0c9ab52c09b25 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogTracer.java @@ -27,11 +27,15 @@ import org.apache.camel.CamelContext; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePropertyKey; import org.apache.camel.NamedNode; import org.apache.camel.Predicate; +import org.apache.camel.Route; import org.apache.camel.spi.BacklogTracerEventMessage; import org.apache.camel.spi.Language; import org.apache.camel.support.CamelContextHelper; +import org.apache.camel.support.LoggerHelper; +import org.apache.camel.support.MessageHelper; import org.apache.camel.support.PatternHelper; import org.apache.camel.support.service.ServiceSupport; import org.apache.camel.util.StringHelper; @@ -45,7 +49,7 @@ * This tracer allows to store message tracers per node in the Camel routes. The tracers is stored in a backlog queue * (FIFO based) which allows to pull the traced messages on demand. */ -public class BacklogTracer extends ServiceSupport implements org.apache.camel.spi.BacklogTracer { +public class BacklogTracer extends ServiceSupport implements org.apache.camel.spi.SyntheticBacklogTracer { // limit the tracer to a thousand messages in total public static final int MAX_BACKLOG_SIZE = 1000; @@ -98,6 +102,7 @@ public static BacklogTracer createTracer(CamelContext context) { * @param exchange the exchange * @return true to trace, false to skip tracing */ + @Override public boolean shouldTrace(NamedNode definition, Exchange exchange) { // special in standby mode we allow using tracer to capture latest tracing data for // enriched message history @@ -138,7 +143,47 @@ private boolean shouldTracePattern(NamedNode definition) { return false; } - public void traceEvent(DefaultBacklogTracerEventMessage event) { + @Override + public void traceFirstNode(NamedNode node, Exchange exchange) { + traceNode(node, exchange, true, false); + } + + @Override + public void traceLastNode(NamedNode node, Exchange exchange) { + traceNode(node, exchange, false, true); + } + + private void traceNode(NamedNode node, Exchange exchange, boolean first, boolean last) { + if (!shouldTrace(node, exchange)) { + return; + } + long timestamp = System.currentTimeMillis(); + String toNode = node.getId(); + String toNodeParentId = node.getParentId(); + String toNodeShortName = node.getShortName(); + String toNodeLabel = StringHelper.limitLength(node.getLabel(), 50); + String exchangeId = exchange.getExchangeId(); + String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); + int level = node.getLevel(); + String fromRouteId = exchange.getFromRouteId(); + String source = LoggerHelper.getLineNumberLoggerName(node); + JsonObject data = MessageHelper.dumpAsJSonObject(exchange.getIn(), isIncludeExchangeProperties(), + isIncludeExchangeVariables(), true, true, isBodyIncludeStreams(), isBodyIncludeFiles(), getBodyMaxChars()); + DefaultBacklogTracerEventMessage event = new DefaultBacklogTracerEventMessage( + camelContext, first, last, incrementTraceCounter(), timestamp, source, fromRouteId, fromRouteId, toNode, + toNodeParentId, null, null, toNodeShortName, toNodeLabel, level, + exchangeId, correlationExchangeId, false, false, data); + if ((first || last) && fromRouteId != null) { + Route route = camelContext.getRoute(fromRouteId); + if (route != null && route.getConsumer() != null) { + event.setEndpointUri(route.getConsumer().getEndpoint().getEndpointUri()); + } + } + traceEvent(event); + } + + @Override + public void traceEvent(BacklogTracerEventMessage event) { // special in standby mode we allow using tracer to capture latest tracing data for // enriched message history boolean history = (enabled || standby) && camelContext.isMessageHistory(); @@ -495,6 +540,7 @@ public void clear() { provisionalHistoryQueue.clear(); } + @Override public long incrementTraceCounter() { return traceCounter.incrementAndGet(); } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRuntimeEndpointRegistry.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRuntimeEndpointRegistry.java index 691992ba4e4b2..c3256233a47dd 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRuntimeEndpointRegistry.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRuntimeEndpointRegistry.java @@ -218,6 +218,15 @@ public void notify(CamelEvent event) throws Exception { // a HashSet is fine for inputs as we only have a limited number of those Set uris = new HashSet<>(); uris.add(endpoint.getEndpointUri()); + // some components (e.g. rest-openapi) delegate to an underlying consumer (e.g. platform-http) whose + // endpoint URI differs from the route's logical endpoint URI; include the consumer's URI so that + // ExchangeCreatedEvent hits recorded under the consumer URI are matched when looking up statistics + if (rse.getRoute().getConsumer() != null) { + String consumerUri = rse.getRoute().getConsumer().getEndpoint().getEndpointUri(); + if (!endpoint.getEndpointUri().equals(consumerUri)) { + uris.add(consumerUri); + } + } inputs.put(routeId, uris); // use a LRUCache for outputs as we could potential have unlimited uris if dynamic routing is in use // and therefore need to have the limit in use @@ -235,6 +244,15 @@ public void notify(CamelEvent event) throws Exception { if (key != null) { inputUtilization.remove(key); } + if (rse.getRoute().getConsumer() != null) { + String consumerUri = rse.getRoute().getConsumer().getEndpoint().getEndpointUri(); + if (!uri.equals(consumerUri)) { + String consumerKey = asUtilizationKey(routeId, consumerUri); + if (consumerKey != null) { + inputUtilization.remove(consumerKey); + } + } + } } } else if (extended && event instanceof ExchangeCreatedEvent ece) { // we only capture details in extended mode @@ -242,6 +260,16 @@ public void notify(CamelEvent event) throws Exception { if (endpoint != null) { String routeId = ece.getExchange().getFromRouteId(); String uri = endpoint.getEndpointUri(); + // some components (e.g. rest-openapi) delegate to an underlying consumer (e.g. platform-http) + // whose exchange may not carry a fromRouteId; fall back to scanning inputs by URI + if (routeId == null) { + for (Map.Entry> entry : inputs.entrySet()) { + if (entry.getValue().contains(uri)) { + routeId = entry.getKey(); + break; + } + } + } String key = asUtilizationKey(routeId, uri); if (key != null) { inputUtilization.onHit(key);