Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.camel.Suspendable;
import org.apache.camel.component.platform.http.spi.PlatformHttpConsumer;
import org.apache.camel.component.platform.http.spi.PlatformHttpConsumerAware;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.service.ServiceHelper;

Expand Down Expand Up @@ -68,6 +69,14 @@ public void registerAfterConfigured(AfterPropertiesConfigured listener) {
this.afterConfiguredListener = listener;
}

@Override
public void setRouteId(String routeId) {
super.setRouteId(routeId);
if (platformHttpConsumer instanceof RouteIdAware ria) {
ria.setRouteId(routeId);
}
}

@Override
protected void doInit() throws Exception {
platformHttpConsumer = getEndpoint().createPlatformHttpConsumer(getProcessor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
import org.apache.camel.CamelContextAware;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.NamedNode;
import org.apache.camel.NonManagedService;
import org.apache.camel.Route;
import org.apache.camel.component.platform.http.PlatformHttpComponent;
import org.apache.camel.component.platform.http.spi.PlatformHttpConsumerAware;
import org.apache.camel.spi.PackageScanResourceResolver;
import org.apache.camel.spi.ProducerCache;
import org.apache.camel.spi.Resource;
import org.apache.camel.spi.SyntheticBacklogTracer;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.PluginHelper;
import org.apache.camel.support.cache.DefaultProducerCache;
Expand Down Expand Up @@ -208,7 +210,19 @@ public boolean process(
exchange.setRouteStop(true);
} else if ("mock".equalsIgnoreCase(missingOperation)) {
// no route then try to load mock data as the answer
loadMockData(operation, verb, path, exchange);
SyntheticBacklogTracer syntheticTracer
= camelContext.getCamelContextExtension().getContextPlugin(SyntheticBacklogTracer.class);
NamedNode mockNode = new MockOperationNode(verb, path, operation.getOperationId());
if (syntheticTracer != null && (syntheticTracer.isEnabled() || syntheticTracer.isStandby())) {
syntheticTracer.traceFirstNode(mockNode, exchange);
}
try {
loadMockData(operation, verb, path, exchange);
} finally {
if (syntheticTracer != null && (syntheticTracer.isEnabled() || syntheticTracer.isStandby())) {
syntheticTracer.traceLastNode(mockNode, exchange);
}
}
}
if (requestError == null) {
var responseError = binding.doClientResponseValidation(exchange);
Expand Down Expand Up @@ -441,4 +455,65 @@ protected void doStop() throws Exception {
}
}

/**
* Minimal {@link NamedNode} to represent a mocked OpenAPI operation for tracing purposes.
*/
private static final class MockOperationNode implements NamedNode {

private final String id;
private final String label;

private MockOperationNode(String verb, String path, String operationId) {
this.id = operationId != null ? operationId : verb + ":" + path;
this.label = "mock:" + verb + ":" + path;
}

@Override
public String getId() {
return id;
}

@Override
public String getNodePrefixId() {
return null;
}

@Override
public String getShortName() {
return "mock";
}

@Override
public String getLabel() {
return label;
}

@Override
public String getDescriptionText() {
return null;
}

@Override
public NamedNode getParent() {
return null;
}

@Override
public int getLineNumber() {
return -1;
}

@Override
public void setLineNumber(int lineNumber) {
}

@Override
public String getLocation() {
return null;
}

@Override
public void setLocation(String location) {
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.spi;

import org.apache.camel.Exchange;
import org.apache.camel.NamedNode;

/**
* Extended {@link BacklogTracer} API for components that process exchanges inline and bypass the normal route pipeline
* (e.g. mock mode in the rest-openapi consumer). Such components cannot rely on the automatic tracing that
* {@code CamelInternalProcessor} applies to every node in the route graph, so they must emit synthetic first/last trace
* events manually to participate in message-history capture.
* <p>
* Callers should obtain a {@link BacklogTracer} from the context extension and check whether it also implements this
* interface before invoking the synthetic tracing methods:
*
* <pre>{@code
* BacklogTracer bt = camelContext.getCamelContextExtension().getContextPlugin(BacklogTracer.class);
* if (bt instanceof SyntheticBacklogTracer st && (st.isEnabled() || st.isStandby())) {
* st.traceFirstNode(node, exchange);
* try {
* // ... inline processing ...
* } finally {
* st.traceLastNode(node, exchange);
* }
* }
* }</pre>
*
* @since 4.21
*/
public interface SyntheticBacklogTracer extends BacklogTracer {

/**
* Emits a synthetic <em>first</em> trace event ({@code first=true, last=false}) for the given node and exchange.
* <p>
* Call this before the inline processing begins. It pairs with {@link #traceLastNode} to bracket the operation,
* mirroring what {@code BacklogTracerRouteAdvice} does automatically for normal route nodes.
*
* @param node the synthetic node representing the inline operation
* @param exchange the current exchange
* @since 4.21
*/
void traceFirstNode(NamedNode node, Exchange exchange);

/**
* Emits a synthetic <em>last</em> trace event ({@code first=false, last=true}) for the given node and exchange.
* <p>
* Call this after the inline processing completes (typically in a {@code finally} block). The {@code last=true}
* flag triggers message-history completion in the tracer, making the exchange visible in the history view.
*
* @param node the synthetic node representing the inline operation
* @param exchange the current exchange
* @since 4.21
*/
void traceLastNode(NamedNode node, Exchange exchange);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.NamedNode;
import org.apache.camel.Predicate;
import org.apache.camel.Route;
import org.apache.camel.spi.BacklogTracerEventMessage;
import org.apache.camel.spi.Language;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.LoggerHelper;
import org.apache.camel.support.MessageHelper;
import org.apache.camel.support.PatternHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.StringHelper;
Expand All @@ -45,7 +49,7 @@
* This tracer allows to store message tracers per node in the Camel routes. The tracers is stored in a backlog queue
* (FIFO based) which allows to pull the traced messages on demand.
*/
public class BacklogTracer extends ServiceSupport implements org.apache.camel.spi.BacklogTracer {
public class BacklogTracer extends ServiceSupport implements org.apache.camel.spi.SyntheticBacklogTracer {

// limit the tracer to a thousand messages in total
public static final int MAX_BACKLOG_SIZE = 1000;
Expand Down Expand Up @@ -98,6 +102,7 @@ public static BacklogTracer createTracer(CamelContext context) {
* @param exchange the exchange
* @return <tt>true</tt> to trace, <tt>false</tt> to skip tracing
*/
@Override
public boolean shouldTrace(NamedNode definition, Exchange exchange) {
// special in standby mode we allow using tracer to capture latest tracing data for
// enriched message history
Expand Down Expand Up @@ -138,7 +143,47 @@ private boolean shouldTracePattern(NamedNode definition) {
return false;
}

public void traceEvent(DefaultBacklogTracerEventMessage event) {
@Override
public void traceFirstNode(NamedNode node, Exchange exchange) {
traceNode(node, exchange, true, false);
}

@Override
public void traceLastNode(NamedNode node, Exchange exchange) {
traceNode(node, exchange, false, true);
}

private void traceNode(NamedNode node, Exchange exchange, boolean first, boolean last) {
if (!shouldTrace(node, exchange)) {
return;
}
long timestamp = System.currentTimeMillis();
String toNode = node.getId();
String toNodeParentId = node.getParentId();
String toNodeShortName = node.getShortName();
String toNodeLabel = StringHelper.limitLength(node.getLabel(), 50);
String exchangeId = exchange.getExchangeId();
String correlationExchangeId = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
int level = node.getLevel();
String fromRouteId = exchange.getFromRouteId();
String source = LoggerHelper.getLineNumberLoggerName(node);
JsonObject data = MessageHelper.dumpAsJSonObject(exchange.getIn(), isIncludeExchangeProperties(),
isIncludeExchangeVariables(), true, true, isBodyIncludeStreams(), isBodyIncludeFiles(), getBodyMaxChars());
DefaultBacklogTracerEventMessage event = new DefaultBacklogTracerEventMessage(
camelContext, first, last, incrementTraceCounter(), timestamp, source, fromRouteId, fromRouteId, toNode,
toNodeParentId, null, null, toNodeShortName, toNodeLabel, level,
exchangeId, correlationExchangeId, false, false, data);
if ((first || last) && fromRouteId != null) {
Route route = camelContext.getRoute(fromRouteId);
if (route != null && route.getConsumer() != null) {
event.setEndpointUri(route.getConsumer().getEndpoint().getEndpointUri());
}
}
traceEvent(event);
}

@Override
public void traceEvent(BacklogTracerEventMessage event) {
// special in standby mode we allow using tracer to capture latest tracing data for
// enriched message history
boolean history = (enabled || standby) && camelContext.isMessageHistory();
Expand Down Expand Up @@ -495,6 +540,7 @@ public void clear() {
provisionalHistoryQueue.clear();
}

@Override
public long incrementTraceCounter() {
return traceCounter.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,15 @@ public void notify(CamelEvent event) throws Exception {
// a HashSet is fine for inputs as we only have a limited number of those
Set<String> uris = new HashSet<>();
uris.add(endpoint.getEndpointUri());
// some components (e.g. rest-openapi) delegate to an underlying consumer (e.g. platform-http) whose
// endpoint URI differs from the route's logical endpoint URI; include the consumer's URI so that
// ExchangeCreatedEvent hits recorded under the consumer URI are matched when looking up statistics
if (rse.getRoute().getConsumer() != null) {
String consumerUri = rse.getRoute().getConsumer().getEndpoint().getEndpointUri();
if (!endpoint.getEndpointUri().equals(consumerUri)) {
uris.add(consumerUri);
}
}
inputs.put(routeId, uris);
// use a LRUCache for outputs as we could potential have unlimited uris if dynamic routing is in use
// and therefore need to have the limit in use
Expand All @@ -235,13 +244,32 @@ public void notify(CamelEvent event) throws Exception {
if (key != null) {
inputUtilization.remove(key);
}
if (rse.getRoute().getConsumer() != null) {
String consumerUri = rse.getRoute().getConsumer().getEndpoint().getEndpointUri();
if (!uri.equals(consumerUri)) {
String consumerKey = asUtilizationKey(routeId, consumerUri);
if (consumerKey != null) {
inputUtilization.remove(consumerKey);
}
}
}
}
} else if (extended && event instanceof ExchangeCreatedEvent ece) {
// we only capture details in extended mode
Endpoint endpoint = ece.getExchange().getFromEndpoint();
if (endpoint != null) {
String routeId = ece.getExchange().getFromRouteId();
String uri = endpoint.getEndpointUri();
// some components (e.g. rest-openapi) delegate to an underlying consumer (e.g. platform-http)
// whose exchange may not carry a fromRouteId; fall back to scanning inputs by URI
if (routeId == null) {
for (Map.Entry<String, Set<String>> entry : inputs.entrySet()) {
if (entry.getValue().contains(uri)) {
routeId = entry.getKey();
break;
}
}
}
String key = asUtilizationKey(routeId, uri);
if (key != null) {
inputUtilization.onHit(key);
Expand Down
Loading