/
TracingHttpServiceFilter.java
180 lines (161 loc) · 7.7 KB
/
TracingHttpServiceFilter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
/*
* Copyright © 2018-2019 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.opentracing.http;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpHeaders;
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.api.HttpServiceContext;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.api.StreamingHttpService;
import io.servicetalk.http.api.StreamingHttpServiceFilter;
import io.servicetalk.http.api.StreamingHttpServiceFilterFactory;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMap;
import java.util.function.UnaryOperator;
import javax.annotation.Nullable;
import static io.opentracing.tag.Tags.HTTP_METHOD;
import static io.opentracing.tag.Tags.HTTP_URL;
import static io.opentracing.tag.Tags.SPAN_KIND;
import static io.opentracing.tag.Tags.SPAN_KIND_SERVER;
import static io.servicetalk.http.api.HttpExecutionStrategies.offloadNone;
/**
* A {@link StreamingHttpService} that supports open tracing.
* <p>
* Append this filter before others that are expected to to see {@link Scope} for this request/response. Filters
* appended after this filter that use operators with the <strong>after*</strong> prefix on
* {@link StreamingHttpService#handle(HttpServiceContext, StreamingHttpRequest, StreamingHttpResponseFactory)
* response meta data} or the {@link StreamingHttpResponse#transformMessageBody(UnaryOperator) response message body}
* (e.g. {@link Publisher#afterFinally(Runnable)}) will execute after this filter invokes {@link Scope#close()} and
* therefore will not see the {@link Span} for the current request/response.
*/
public class TracingHttpServiceFilter extends AbstractTracingHttpFilter implements StreamingHttpServiceFilterFactory {
/**
* Create a new instance.
* @param tracer The {@link Tracer}.
* @param componentName The component name used during building new spans.
*/
public TracingHttpServiceFilter(Tracer tracer, String componentName) {
this(tracer, componentName, true);
}
/**
* Create a new instance.
* @param tracer The {@link Tracer}.
* @param componentName The component name used during building new spans.
* @param validateTraceKeyFormat {@code true} to validate the contents of the trace ids.
*/
public TracingHttpServiceFilter(Tracer tracer, String componentName, boolean validateTraceKeyFormat) {
super(tracer, componentName, validateTraceKeyFormat);
}
/**
* Create a new instance.
* @param tracer The {@link Tracer}.
* @param componentName The component name used during building new spans.
* @param format the {@link Format} to use to inject/extract trace info to/from {@link HttpHeaders}.
*/
public TracingHttpServiceFilter(final Tracer tracer, final String componentName, final Format<HttpHeaders> format) {
super(tracer, componentName, format);
}
/**
* Create a new instance.
* @param tracer The {@link Tracer}.
* @param format the {@link Format} to use to inject/extract trace info to/from {@link TextMap}.
* @param componentName The component name used during building new spans.
*/
public TracingHttpServiceFilter(final Tracer tracer, final Format<TextMap> format, final String componentName) {
super(tracer, format, componentName);
}
@Override
public final StreamingHttpServiceFilter create(final StreamingHttpService service) {
return new StreamingHttpServiceFilter(service) {
@Override
public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
final StreamingHttpRequest request,
final StreamingHttpResponseFactory responseFactory) {
return trackRequest(delegate(), ctx, request, responseFactory);
}
};
}
@Override
public HttpExecutionStrategy requiredOffloads() {
return offloadNone();
}
private Single<StreamingHttpResponse> trackRequest(final StreamingHttpService delegate,
final HttpServiceContext ctx,
final StreamingHttpRequest request,
final StreamingHttpResponseFactory responseFactory) {
ScopeTracker tracker = newTracker(request);
Single<StreamingHttpResponse> response;
try {
response = delegate.handle(ctx, request, responseFactory);
} catch (Throwable t) {
tracker.onError(t);
return Single.failed(t);
}
return tracker.track(response);
}
private ScopeTracker newTracker(final StreamingHttpRequest request) {
SpanContext parentSpanContext = extractor.apply(request.headers());
Span span = tracer.buildSpan(getOperationName(componentName, request))
.asChildOf(parentSpanContext)
.withTag(SPAN_KIND.getKey(), SPAN_KIND_SERVER)
.withTag(HTTP_METHOD.getKey(), request.method().name())
.withTag(HTTP_URL.getKey(), request.path())
.start();
Scope scope = tracer.activateSpan(span);
return new ServiceScopeTracker(scope, span, parentSpanContext);
}
private final class ServiceScopeTracker extends ScopeTracker {
@Nullable
private final SpanContext parentSpanContext;
ServiceScopeTracker(Scope scope, final Span span, @Nullable final SpanContext parentSpanContext) {
super(scope, span);
this.parentSpanContext = parentSpanContext;
}
@Override
void onResponseMeta(final HttpResponseMetaData metaData) {
super.onResponseMeta(metaData);
if (injectSpanContextIntoResponse(parentSpanContext)) {
injector.accept(getSpan().context(), metaData.headers());
}
}
}
/**
* Get the operation name to build the span with.
* @param componentName The component name.
* @param metaData The {@link HttpRequestMetaData}.
* @return the operation name to build the span with.
*/
protected String getOperationName(String componentName, HttpRequestMetaData metaData) {
return componentName + ' ' + metaData.method().name() + ' ' + metaData.requestTarget();
}
/**
* Determine if the current span context should be injected into the response.
* @param parentSpanContext The parent span extracted from the request.
* @return {@code true} if the current span context should be injected into the response.
*/
protected boolean injectSpanContextIntoResponse(@Nullable SpanContext parentSpanContext) {
return parentSpanContext == null;
}
}