Skip to content

Commit

Permalink
feat(http): fix context tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
shigma committed Feb 12, 2024
1 parent 67850ce commit 1e74c3c
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 120 deletions.
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -34,7 +34,7 @@
"tsx": "^4.7.0",
"typescript": "^5.3.2",
"yakumo": "^1.0.0-alpha.10",
"yakumo-esbuild": "^1.0.0-alpha.2",
"yakumo-esbuild": "^1.0.0-beta.3",
"yakumo-publish-sync": "^1.0.0-alpha.1",
"yakumo-tsc": "^1.0.0-alpha.2"
}
Expand Down
5 changes: 2 additions & 3 deletions packages/http/src/adapter/browser.ts
Expand Up @@ -3,9 +3,8 @@
import { LookupAddress } from 'dns'
import { HTTP } from '../index.js'

const ws = typeof WebSocket !== 'undefined' ? WebSocket : null

export { ws as WebSocket }
const { WebSocket } = globalThis
export { WebSocket }

const v4 = /^(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}$/

Expand Down
238 changes: 125 additions & 113 deletions packages/http/src/index.ts
Expand Up @@ -32,17 +32,6 @@ class HTTPError extends Error {
}
}

export interface HTTP {
<T>(url: string | URL, config?: HTTP.RequestConfig): Promise<HTTP.Response<T>>
<T>(method: HTTP.Method, url: string | URL, config?: HTTP.RequestConfig): Promise<HTTP.Response<T>>
config: HTTP.Config
get: HTTP.Request1
delete: HTTP.Request1
patch: HTTP.Request2
post: HTTP.Request2
put: HTTP.Request2
}

export namespace HTTP {
export type Method =
| 'get' | 'GET'
Expand Down Expand Up @@ -114,15 +103,43 @@ export namespace HTTP {
export type Error = HTTPError
}

export class HTTP {
export interface HTTP {
[Context.current]: Context
<T>(url: string | URL, config?: HTTP.RequestConfig): Promise<HTTP.Response<T>>
<T>(method: HTTP.Method, url: string | URL, config?: HTTP.RequestConfig): Promise<HTTP.Response<T>>
config: HTTP.Config
get: HTTP.Request1
delete: HTTP.Request1
patch: HTTP.Request2
post: HTTP.Request2
put: HTTP.Request2
}

export class HTTP extends Function {
static Error = HTTPError
/** @deprecated use `HTTP.Error.is()` instead */
static isAxiosError = HTTPError.is

protected [Context.current]: Context
static {
for (const method of ['get', 'delete'] as const) {
defineProperty(HTTP.prototype, method, async function (this: HTTP, url: string, config?: HTTP.Config) {
const caller = this[Context.current]
const response = await this.call(caller, method, url, config)
return response.data
})
}

constructor(ctx: Context, config: HTTP.Config = {}) {
ctx.provide('http')
for (const method of ['patch', 'post', 'put'] as const) {
defineProperty(HTTP.prototype, method, async function (this: HTTP, url: string, data?: any, config?: HTTP.Config) {
const caller = this[Context.current]
const response = await this.call(caller, method, url, { data, ...config })
return response.data
})
}
}

constructor(ctx: Context, config: HTTP.Config = {}, isExtend?: boolean) {
super()

function resolveDispatcher(href?: string) {
if (!href) return
Expand All @@ -137,132 +154,129 @@ export class HTTP {
if (typeof args[1] === 'string' || args[1] instanceof URL) {
method = args.shift()
}
const config = this.http.resolveConfig(args[1])
const url = this.http.resolveURL(args[0], config)
const caller = isExtend ? ctx : this
const config = (http as HTTP).resolveConfig(caller, args[1])
const url = HTTP.resolveURL(caller, args[0], config)

const controller = new AbortController()
this.on('dispose', () => {
let timer: NodeJS.Timeout | number | undefined
const dispose = caller.on('dispose', () => {
clearTimeout(timer)
controller.abort('context disposed')
})
if (config.timeout) {
const timer = setTimeout(() => {
timer = setTimeout(() => {
controller.abort('timeout')
}, config.timeout)
this.on('dispose', () => clearTimeout(timer))
}

const raw = await fetch(url, {
method,
body: config.data,
headers: config.headers,
keepalive: config.keepAlive,
signal: controller.signal,
['dispatcher' as never]: resolveDispatcher(config?.proxyAgent),
}).catch((cause) => {
const error = new HTTP.Error(cause.message)
error.cause = cause
throw error
})

const response: HTTP.Response = {
data: null,
url: raw.url,
status: raw.status,
statusText: raw.statusText,
headers: raw.headers,
}
try {
const raw = await fetch(url, {
method,
body: config.data,
headers: config.headers,
keepalive: config.keepAlive,
signal: controller.signal,
['dispatcher' as never]: resolveDispatcher(config?.proxyAgent),
}).catch((cause) => {
const error = new HTTP.Error(`fetch ${url} failed`)
error.cause = cause
throw error
})

if (!raw.ok) {
const error = new HTTP.Error(raw.statusText)
error.response = response
try {
const response: HTTP.Response = {
data: null,
url: raw.url,
status: raw.status,
statusText: raw.statusText,
headers: raw.headers,
}

if (!raw.ok) {
const error = new HTTP.Error(raw.statusText)
error.response = response
try {
response.data = await this.http.decodeResponse(raw)
} catch {}
throw error
}

if (config.responseType === 'arraybuffer') {
response.data = await raw.arrayBuffer()
} else if (config.responseType === 'stream') {
response.data = raw.body
} else {
response.data = await this.http.decodeResponse(raw)
} catch {}
throw error
}
return response
} finally {
dispose()
}

if (config.responseType === 'arraybuffer') {
response.data = await raw.arrayBuffer()
} else if (config.responseType === 'stream') {
response.data = raw.body
} else {
response.data = await this.http.decodeResponse(raw)
}
return response
} as HTTP

http.config = config
defineProperty(http, Context.current, ctx)
Object.setPrototypeOf(http, Object.getPrototypeOf(this))

for (const method of ['get', 'delete'] as const) {
http[method] = async function <T>(this: HTTP, url: string, config?: HTTP.Config) {
const caller = this[Context.current]
const response = await caller.http<T>(url, {
method,
...config,
})
return response.data
}
}

for (const method of ['patch', 'post', 'put'] as const) {
http[method] = async function <T>(this: HTTP, url: string, data?: any, config?: HTTP.Config) {
const caller = this[Context.current]
const response = await caller.http<T>(url, {
method,
data,
...config,
})
return response.data
}
if (!isExtend) {
ctx.provide('http')
ctx.http = http
ctx.on('dispose', () => {
ctx.http = null as never
})
}

ctx.http = Context.associate(http, 'http')
ctx.on('dispose', () => {
ctx.http = null as never
})

return http
}

resolveConfig(init?: HTTP.RequestConfig): HTTP.RequestConfig {
let result = { headers: {}, ...this.config }
const merge = (init?: HTTP.RequestConfig) => {
result = {
...result,
...this.config,
headers: {
...result.headers,
...init?.headers,
},
}
}
static mergeConfig = (target: HTTP.Config, source?: HTTP.Config) => ({
...target,
...source,
headers: {
...target.headers,
...source?.headers,
},
})

extend(config: HTTP.Config = {}) {
return new HTTP(this[Context.current], HTTP.mergeConfig(this.config, config), true)
}

const caller = this[Context.current]
resolveConfig(caller: Context, init?: HTTP.RequestConfig): HTTP.RequestConfig {
let result = { headers: {}, ...this.config }
let intercept = caller[Context.intercept]

Check failure on line 247 in packages/http/src/index.ts

View workflow job for this annotation

GitHub Actions / build

Property 'intercept' does not exist on type 'typeof Context'.
while (intercept) {
merge(intercept.http)
result = HTTP.mergeConfig(result, intercept.http)
intercept = Object.getPrototypeOf(intercept)
}
merge(init)
result = HTTP.mergeConfig(result, init)
return result
}

resolveURL(url: string | URL, config: HTTP.RequestConfig) {
static resolveURL(caller: Context, url: string | URL, config: HTTP.RequestConfig) {
if (config.endpoint) {
this[Context.current].emit('internal/warning', 'endpoint is deprecated, please use baseURL instead')
url = trimSlash(config.endpoint) + url
// caller.emit('internal/warning', 'endpoint is deprecated, please use baseURL instead')
try {
new URL(url)
} catch {
url = trimSlash(config.endpoint) + url
}
}
try {
url = new URL(url, config.baseURL)
} catch (error) {
// prettify the error message
throw new TypeError(`Invalid URL: ${url}`)
}
url = new URL(url, config.baseURL)
for (const [key, value] of Object.entries(config.params ?? {})) {
url.searchParams.append(key, value)
}
return url
}

decodeResponse(response: Response) {
const type = response.headers.get('Content-Type')
if (type === 'application/json') {
const type = response.headers.get('content-type')
if (type?.startsWith('application/json')) {
return response.json()
} else if (type?.startsWith('text/')) {
return response.text()
Expand All @@ -273,18 +287,15 @@ export class HTTP {

async head(url: string, config?: HTTP.Config) {
const caller = this[Context.current]
const response = await caller.http(url, {
method: 'HEAD',
...config,
})
const response = await this.call(caller, 'HEAD', url, config)
return response.headers
}

/** @deprecated use `ctx.http()` instead */
async axios<T>(url: string, config?: HTTP.Config) {
axios<T>(url: string, config?: HTTP.Config): HTTP.Response<T> {
const caller = this[Context.current]
caller.emit('internal/warning', 'ctx.http.axios() is deprecated, use ctx.http() instead')
return caller.http<T>(url, config)
return this.call(caller, url, config)
}

resolveAgent(href?: string) {
Expand All @@ -296,14 +307,15 @@ export class HTTP {
}

async ws(this: HTTP, url: string | URL, init?: HTTP.Config) {
const config = this.resolveConfig(init)
url = this.resolveURL(url, config)
const caller = this[Context.current]
const config = this.resolveConfig(caller, init)
url = HTTP.resolveURL(caller, url, config)
const socket = new WebSocket(url, 'Server' in WebSocket ? {
agent: this.resolveAgent(config?.proxyAgent),
handshakeTimeout: config?.timeout,
headers: config?.headers,
} as ClientOptions as never : undefined)
this[Context.current].on('dispose', () => {
caller.on('dispose', () => {
socket.close(1001, 'context disposed')
})
return socket
Expand All @@ -318,12 +330,12 @@ export class HTTP {
const [, mime, base64] = capture
return { mime, data: base64ToArrayBuffer(base64) }
}
const { headers, data, url: responseUrl } = await caller.http<ArrayBuffer>(url, {
const { headers, data, url: responseUrl } = await this.call(caller, url, {
method: 'GET',
responseType: 'arraybuffer',
timeout: +options.timeout! || undefined,
})
const mime = headers.get('Content-Type') ?? undefined
const mime = headers.get('content-type') ?? undefined
const [, name] = responseUrl.match(/.+\/([^/?]*)(?=\?)?/)!
return { mime, name, data }
}
Expand Down
3 changes: 0 additions & 3 deletions packages/http/src/utils.ts
Expand Up @@ -40,17 +40,14 @@ function parseIPv4(ip: string) {
function parseIPv6(ip: string) {
const exp = ip.indexOf('::')
let num = 0n
// :: 左边有内容
if (exp !== -1 && exp !== 0) {
ip.slice(0, exp).split(':').forEach((piece, i) => {
num |= BigInt(`0x${piece}`) << BigInt((7 - i) * 16)
})
}
// :: 在最右边
if (exp === ip.length - 2) {
return num
}
// :: 右边的内容
const rest = exp === -1 ? ip : ip.slice(exp + 2)
const v4 = rest.includes('.')
const pieces = rest.split(':')
Expand Down
4 changes: 4 additions & 0 deletions packages/socks/src/index.ts
Expand Up @@ -7,6 +7,10 @@ import { SocksClient, SocksProxy } from 'socks'
import type { Agent, buildConnector, Client } from 'undici'
import { SocksProxyAgent } from 'socks-proxy-agent'

// @ts-ignore
// ensure the global dispatcher is initialized
fetch().catch(() => {})

function getUniqueSymbol(object: object, name: string) {
const symbol = Object.getOwnPropertySymbols(object).find(s => s.toString() === `Symbol(${name})`)
return object[symbol!]
Expand Down
8 changes: 8 additions & 0 deletions yakumo.yml
@@ -0,0 +1,8 @@
- name: yakumo
config:
pipeline:
build:
- tsc
- esbuild
- name: yakumo-esbuild
- name: yakumo-tsc

0 comments on commit 1e74c3c

Please sign in to comment.