/
fetch.ts
197 lines (177 loc) · 6.67 KB
/
fetch.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import {
trace,
SpanOptions,
SpanKind,
propagation,
context as api_context,
Attributes,
Exception,
Context,
SpanStatusCode,
} from '@opentelemetry/api'
import { SemanticAttributes } from '@opentelemetry/semantic-conventions'
import { Initialiser, getActiveConfig, setConfig } from '../config.js'
import { wrap } from '../wrap.js'
import { instrumentEnv } from './env.js'
import { exportSpans, proxyExecutionContext } from './common.js'
import { ResolvedTraceConfig } from '../types.js'
export type IncludeTraceContextFn = (request: Request) => boolean
export interface FetcherConfig {
includeTraceContext?: boolean | IncludeTraceContextFn
}
type FetchHandler = ExportedHandlerFetchHandler
type FetchHandlerArgs = Parameters<FetchHandler>
const traceIdSymbol = Symbol('traceId')
export function sanitiseURL(url: string): string {
const u = new URL(url)
return `${u.protocol}//${u.host}${u.pathname}${u.search}`
}
const gatherOutgoingCfAttributes = (cf: RequestInitCfProperties): Attributes => {
const attrs: Record<string, string | number> = {}
Object.keys(cf).forEach((key) => {
const value = cf[key]
if (typeof value === 'string' || typeof value === 'number') {
attrs[`cf.${key}`] = value
} else {
attrs[`cf.${key}`] = JSON.stringify(value)
}
})
return attrs
}
export function gatherRequestAttributes(request: Request): Attributes {
const attrs: Record<string, string | number> = {}
const headers = request.headers
// attrs[SemanticAttributes.HTTP_CLIENT_IP] = '1.1.1.1'
attrs[SemanticAttributes.HTTP_METHOD] = request.method
attrs[SemanticAttributes.HTTP_URL] = sanitiseURL(request.url)
attrs[SemanticAttributes.HTTP_USER_AGENT] = headers.get('user-agent')!
attrs[SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH] = headers.get('content-length')!
attrs['http.request_content-type'] = headers.get('content-type')!
attrs['http.accepts'] = headers.get('accepts')!
return attrs
}
export function gatherResponseAttributes(response: Response): Attributes {
const attrs: Record<string, string | number> = {}
attrs[SemanticAttributes.HTTP_STATUS_CODE] = response.status
attrs[SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH] = response.headers.get('content-length')!
attrs['http.response_content-type'] = response.headers.get('content-type')!
return attrs
}
export function gatherIncomingCfAttributes(request: Request): Attributes {
const attrs: Record<string, string | number> = {}
attrs[SemanticAttributes.HTTP_SCHEME] = request.cf?.httpProtocol as string
attrs['net.colo'] = request.cf?.colo as string
attrs['net.country'] = request.cf?.country as string
attrs['net.request_priority'] = request.cf?.requestPriority as string
attrs['net.tls_cipher'] = request.cf?.tlsCipher as string
attrs['net.tls_version'] = request.cf?.tlsVersion as string
attrs['net.asn'] = request.cf?.asn as number
attrs['net.tcp_rtt'] = request.cf?.clientTcpRtt as number
return attrs
}
export function getParentContextFromHeaders(headers: Headers): Context {
return propagation.extract(api_context.active(), headers, {
get(headers, key) {
return headers.get(key) || undefined
},
keys(headers) {
return [...headers.keys()]
},
})
}
export function waitUntilTrace(fn: () => Promise<any>): Promise<void> {
const tracer = trace.getTracer('waitUntil')
return tracer.startActiveSpan('waitUntil', async (span) => {
await fn()
span.end()
})
}
let cold_start = true
export function executeFetchHandler(fetchFn: FetchHandler, [request, env, ctx]: FetchHandlerArgs): Promise<Response> {
const spanContext = getParentContextFromHeaders(request.headers)
const tracer = trace.getTracer('fetchHandler')
const attributes = {
[SemanticAttributes.FAAS_TRIGGER]: 'http',
[SemanticAttributes.FAAS_COLDSTART]: cold_start,
}
cold_start = false
Object.assign(attributes, gatherRequestAttributes(request))
Object.assign(attributes, gatherIncomingCfAttributes(request))
const options: SpanOptions = {
attributes,
kind: SpanKind.SERVER,
}
const promise = tracer.startActiveSpan('fetchHandler', options, spanContext, async (span) => {
try {
const response: Response = await fetchFn(request, env, ctx)
if (response.status < 500) {
span.setStatus({ code: SpanStatusCode.OK })
}
span.setAttributes(gatherResponseAttributes(response))
span.end()
return response
} catch (error) {
span.recordException(error as Exception)
span.setStatus({ code: SpanStatusCode.ERROR })
span.end()
throw error
}
})
return promise
}
export function createFetchHandler(fetchFn: FetchHandler, initialiser: Initialiser) {
const fetchHandler: ProxyHandler<FetchHandler> = {
apply: async (target, _thisArg, argArray: Parameters<FetchHandler>): Promise<Response> => {
const [request, orig_env, orig_ctx] = argArray
const config = initialiser(orig_env as Record<string, unknown>, request)
const env = instrumentEnv(orig_env as Record<string, unknown>)
const { ctx, tracker } = proxyExecutionContext(orig_ctx)
const context = setConfig(config)
try {
const args: FetchHandlerArgs = [request, env, ctx]
return await api_context.with(context, executeFetchHandler, undefined, target, args)
} catch (error) {
throw error
} finally {
orig_ctx.waitUntil(exportSpans(tracker))
}
},
}
return wrap(fetchFn, fetchHandler)
}
type getFetchConfig = (config: ResolvedTraceConfig) => FetcherConfig
export function instrumentFetcher(
fetchFn: Fetcher['fetch'],
configFn: getFetchConfig,
attrs?: Attributes
): Fetcher['fetch'] {
const handler: ProxyHandler<typeof fetch> = {
apply: (target, thisArg, argArray): ReturnType<typeof fetch> => {
const workerConfig = getActiveConfig()
const config = configFn(workerConfig)
const request = new Request(argArray[0], argArray[1])
const tracer = trace.getTracer('fetcher')
const options: SpanOptions = { kind: SpanKind.CLIENT, attributes: attrs }
const host = new URL(request.url).host
const spanName = typeof attrs?.['name'] === 'string' ? attrs?.['name'] : `fetch: ${host}`
const promise = tracer.startActiveSpan(spanName, options, async (span) => {
if (config.includeTraceContext) {
propagation.inject(api_context.active(), request.headers, {
set: (h, k, v) => h.set(k, typeof v === 'string' ? v : String(v)),
})
}
span.setAttributes(gatherRequestAttributes(request))
if (request.cf) span.setAttributes(gatherOutgoingCfAttributes(request.cf))
const response: Response = await Reflect.apply(target, thisArg, [request])
span.setAttributes(gatherResponseAttributes(response))
span.end()
return response
})
return promise
},
}
return wrap(fetchFn, handler, true)
}
export function instrumentGlobalFetch(): void {
globalThis.fetch = instrumentFetcher(globalThis.fetch, (config) => config.fetch)
}