Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
Expand All @@ -34,6 +38,8 @@
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
Expand All @@ -43,12 +49,19 @@

public class HttpClientExecuteInterceptor implements InstanceMethodsAroundInterceptor {
private static final String ERROR_URI = "/_blank";
private static final ILog LOGGER = LogManager.getLogger(HttpClientExecuteInterceptor.class);

/**
* Lazily-resolved set of ports that must not receive SkyWalking
* propagation headers. Built once from
* {@link HttpClientPluginConfig.Plugin.HttpClient#PROPAGATION_EXCLUDE_PORTS}.
*/
private volatile Set<Integer> excludePortsCache;

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
if (allArguments[0] == null || allArguments[1] == null) {
// illegal args, can't trace. ignore.
if (skipIntercept(allArguments)) {
return;
}
final HttpHost httpHost = (HttpHost) allArguments[0];
Expand Down Expand Up @@ -82,7 +95,7 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
if (allArguments[0] == null || allArguments[1] == null) {
if (skipIntercept(allArguments)) {
return ret;
}

Expand Down Expand Up @@ -111,10 +124,62 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
if (skipIntercept(allArguments)) {
return;
}
AbstractSpan activeSpan = ContextManager.activeSpan();
activeSpan.log(t);
}

private boolean skipIntercept(Object[] allArguments) {
if (allArguments[0] == null || allArguments[1] == null) {
return true;
}
HttpHost httpHost = (HttpHost) allArguments[0];
return isExcludedPort(port(httpHost));
}

/**
* Returns {@code true} when {@code port} is listed in
* {@link HttpClientPluginConfig.Plugin.HttpClient#PROPAGATION_EXCLUDE_PORTS}.
*
* <p>The config value is parsed lazily and cached so that it is read after
* the agent has fully initialised its configuration subsystem.
*/
private boolean isExcludedPort(int port) {
if (port <= 0) {
return false;
}
if (excludePortsCache == null) {
synchronized (this) {
if (excludePortsCache == null) {
excludePortsCache = parseExcludePorts(
HttpClientPluginConfig.Plugin.HttpClient.PROPAGATION_EXCLUDE_PORTS);
}
}
}
return excludePortsCache.contains(port);
}

private static Set<Integer> parseExcludePorts(String raw) {
if (raw == null || raw.trim().isEmpty()) {
return Collections.emptySet();
}
return Arrays.stream(raw.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.map(s -> {
try {
return Integer.parseInt(s);
} catch (NumberFormatException e) {
LOGGER.warn("Ignoring invalid port in PROPAGATION_EXCLUDE_PORTS: {}", s);
return -1;
}
})
.filter(p -> p > 0)
.collect(Collectors.toSet());
}

private String getRequestURI(String uri) {
if (isUrl(uri)) {
String requestPath;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.httpClient.v4;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;

import java.util.List;

import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.ProtocolVersion;
import org.apache.http.RequestLine;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.HttpGet;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import org.apache.skywalking.apm.plugin.httpclient.HttpClientPluginConfig;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

/**
* Verifies that requests to ports listed in
* {@link HttpClientPluginConfig.Plugin.HttpClient#PROPAGATION_EXCLUDE_PORTS}
* are silently skipped (no span created, no sw8 header injected).
*/
@RunWith(TracingSegmentRunner.class)
public class HttpClientPropagationExcludePortTest {

@SegmentStoragePoint
private SegmentStorage segmentStorage;

@Rule
public AgentServiceRule agentServiceRule = new AgentServiceRule();
@Rule
public MockitoRule rule = MockitoJUnit.rule();

@Mock
private HttpHost clickHouseHost;
@Mock
private HttpHost regularHost;
@Mock
private HttpGet request;
@Mock
private HttpResponse httpResponse;
@Mock
private StatusLine statusLine;
@Mock
private EnhancedInstance enhancedInstance;

private HttpClientExecuteInterceptor interceptor;

private Object[] clickHouseArgs;
private Object[] regularArgs;
private Class<?>[] argumentsType;

@Before
public void setUp() throws Exception {
ServiceManager.INSTANCE.boot();

HttpClientPluginConfig.Plugin.HttpClient.PROPAGATION_EXCLUDE_PORTS = "8123";
interceptor = new HttpClientExecuteInterceptor();

when(statusLine.getStatusCode()).thenReturn(200);
when(httpResponse.getStatusLine()).thenReturn(statusLine);

RequestLine requestLine = new RequestLine() {
@Override
public String getMethod() {
return "GET";
}

@Override
public ProtocolVersion getProtocolVersion() {
return new ProtocolVersion("http", 1, 1);
}

@Override
public String getUri() {
return "http://my-service:8080/api/ping";
}
};
when(request.getRequestLine()).thenReturn(requestLine);

when(clickHouseHost.getHostName()).thenReturn("clickhouse-server");
when(clickHouseHost.getSchemeName()).thenReturn("http");
when(clickHouseHost.getPort()).thenReturn(8123);

when(regularHost.getHostName()).thenReturn("my-service");
when(regularHost.getSchemeName()).thenReturn("http");
when(regularHost.getPort()).thenReturn(8080);

clickHouseArgs = new Object[]{clickHouseHost, request};
regularArgs = new Object[]{regularHost, request};
argumentsType = new Class[]{HttpHost.class, HttpGet.class};
}

@Test
public void requestToExcludedPort_noSpanAndNoHeaderInjected() throws Throwable {
interceptor.beforeMethod(enhancedInstance, null, clickHouseArgs, argumentsType, null);
interceptor.afterMethod(enhancedInstance, null, clickHouseArgs, argumentsType, httpResponse);

List<TraceSegment> segments = segmentStorage.getTraceSegments();
assertThat(segments.size(), is(0));
verify(request, never()).setHeader(anyString(), anyString());
}

@Test
public void requestToRegularPort_spanCreatedAndHeadersInjected() throws Throwable {
interceptor.beforeMethod(enhancedInstance, null, regularArgs, argumentsType, null);
interceptor.afterMethod(enhancedInstance, null, regularArgs, argumentsType, httpResponse);

List<TraceSegment> segments = segmentStorage.getTraceSegments();
assertThat(segments.size(), is(1));
verify(request, times(3)).setHeader(anyString(), anyString());
}

@Test
public void whenExcludePortsEmpty_allPortsAreTraced() throws Throwable {
HttpClientPluginConfig.Plugin.HttpClient.PROPAGATION_EXCLUDE_PORTS = "";

HttpClientExecuteInterceptor freshInterceptor = new HttpClientExecuteInterceptor();
freshInterceptor.beforeMethod(enhancedInstance, null, clickHouseArgs, argumentsType, null);
freshInterceptor.afterMethod(enhancedInstance, null, clickHouseArgs, argumentsType, httpResponse);

List<TraceSegment> segments = segmentStorage.getTraceSegments();
assertThat(segments.size(), is(1));
}

@Test
public void multipleExcludedPorts_allSkippedAndNonExcludedStillTraced() throws Throwable {
HttpClientPluginConfig.Plugin.HttpClient.PROPAGATION_EXCLUDE_PORTS = "8123,9200";

HttpClientExecuteInterceptor freshInterceptor = new HttpClientExecuteInterceptor();

// 8123 should be excluded
freshInterceptor.beforeMethod(enhancedInstance, null, clickHouseArgs, argumentsType, null);
freshInterceptor.afterMethod(enhancedInstance, null, clickHouseArgs, argumentsType, httpResponse);
assertThat(segmentStorage.getTraceSegments().size(), is(0));

// 9200 should also be excluded
HttpHost esHost = mock(HttpHost.class);
when(esHost.getHostName()).thenReturn("es-server");
when(esHost.getSchemeName()).thenReturn("http");
when(esHost.getPort()).thenReturn(9200);
Object[] esArgs = new Object[]{esHost, request};

freshInterceptor = new HttpClientExecuteInterceptor();
HttpClientPluginConfig.Plugin.HttpClient.PROPAGATION_EXCLUDE_PORTS = "8123,9200";
freshInterceptor.beforeMethod(enhancedInstance, null, esArgs, argumentsType, null);
freshInterceptor.afterMethod(enhancedInstance, null, esArgs, argumentsType, httpResponse);
assertThat(segmentStorage.getTraceSegments().size(), is(0));

// 8080 should still be traced
freshInterceptor = new HttpClientExecuteInterceptor();
HttpClientPluginConfig.Plugin.HttpClient.PROPAGATION_EXCLUDE_PORTS = "8123,9200";
freshInterceptor.beforeMethod(enhancedInstance, null, regularArgs, argumentsType, null);
freshInterceptor.afterMethod(enhancedInstance, null, regularArgs, argumentsType, httpResponse);
assertThat(segmentStorage.getTraceSegments().size(), is(1));
}

@Test
public void excludedPort_handleMethodExceptionSkipped() throws Throwable {
interceptor.beforeMethod(enhancedInstance, null, clickHouseArgs, argumentsType, null);
interceptor.handleMethodException(enhancedInstance, null, clickHouseArgs, argumentsType, new RuntimeException("test"));
interceptor.afterMethod(enhancedInstance, null, clickHouseArgs, argumentsType, httpResponse);

List<TraceSegment> segments = segmentStorage.getTraceSegments();
assertThat(segments.size(), is(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,22 @@ public static class HttpClient {
* This config item controls that whether the HttpClient plugin should collect the parameters of the request.
*/
public static boolean COLLECT_HTTP_PARAMS = false;

/**
* Comma-separated list of destination ports whose outbound HTTP requests
* will be completely skipped by the httpclient-4.x interceptor: no exit
* span is created and no SkyWalking propagation headers are injected.
*
* <p>Some HTTP-based database protocols (e.g. ClickHouse on port 8123)
* reject requests that contain unknown HTTP headers, returning HTTP 400.
* Adding such ports here prevents the agent from creating exit spans
* and from injecting the {@code sw8} tracing headers into those outbound
* requests.
*
* <p>Example – exclude ClickHouse and Elasticsearch ports:
* {@code plugin.httpclient.propagation_exclude_ports=8123,9200}
*/
public static String PROPAGATION_EXCLUDE_PORTS = "";
}

@PluginConfig(root = HttpClientPluginConfig.class)
Expand Down
4 changes: 3 additions & 1 deletion apm-sniffer/config/agent.config
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,10 @@ plugin.jdkthreading.threading_class_prefixes=${SW_PLUGIN_JDKTHREADING_THREADING_
plugin.tomcat.collect_http_params=${SW_PLUGIN_TOMCAT_COLLECT_HTTP_PARAMS:false}
# This config item controls that whether the SpringMVC plugin should collect the parameters of the request, when your Spring application is based on Tomcat, consider only setting either `plugin.tomcat.collect_http_params` or `plugin.springmvc.collect_http_params`. Also, activate implicitly in the profiled trace.
plugin.springmvc.collect_http_params=${SW_PLUGIN_SPRINGMVC_COLLECT_HTTP_PARAMS:false}
# This config item controls that whether the HttpClient plugin should collect the parameters of the request
# This config item controls that whether the HttpClient plugin should collect the parameters of the request
plugin.httpclient.collect_http_params=${SW_PLUGIN_HTTPCLIENT_COLLECT_HTTP_PARAMS:false}
# Comma-separated list of destination ports whose outbound HTTP requests will be completely skipped by the httpclient-4.x interceptor (no exit span created, no sw8 headers injected).
plugin.httpclient.propagation_exclude_ports=${SW_PLUGIN_HTTPCLIENT_PROPAGATION_EXCLUDE_PORTS:}
# When `COLLECT_HTTP_PARAMS` is enabled, how many characters to keep and send to the OAP backend, use negative values to keep and send the complete parameters, NB. this config item is added for the sake of performance.
plugin.http.http_params_length_threshold=${SW_PLUGIN_HTTP_HTTP_PARAMS_LENGTH_THRESHOLD:1024}
# When `include_http_headers` declares header names, this threshold controls the length limitation of all header values. use negative values to keep and send the complete headers. Note. this config item is added for the sake of performance.
Expand Down
2 changes: 2 additions & 0 deletions changes/changes-9.5.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ Release Notes.
enhanced by both SkyWalking and Spring AOP.
* Build: Centralized plugin version management in the root POM and remove redundant declarations.
* Support Spring Cloud Gateway 4.3.x.
* Add `PROPAGATION_EXCLUDE_PORTS` config to httpclient-4.x plugin to skip tracing and header injection for
specified ports (default: 8123), fixing ClickHouse HTTP 400 caused by injected sw8 headers.

All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/236?closed=1)

Loading
Loading