-
Notifications
You must be signed in to change notification settings - Fork 0
/
oak.ts
89 lines (75 loc) · 3 KB
/
oak.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import * as oak from 'jsr:@oak/oak@16.0.0'
import * as contracts from '../src/contracts.ts'
import * as adapter_base from './mod.ts'
import {ApiController} from '../server.ts'
type OakRouterContext = oak.RouterContext<string, Record<string | number, string>, Record<string, any>>
type OakRouterFunction = (ctx: OakRouterContext) => Promise<void>
class ServerSentEventsAdapter<Events> extends adapter_base.ServerSentEventsAdapter<Events> {
#status_resolved: PromiseWithResolvers<void>
#target: oak.ServerSentEventTarget
constructor(target: oak.ServerSentEventTarget) {
super()
this.#target = target
this.#status_resolved = Promise.withResolvers()
target.addEventListener('close', () => {
this.#status_resolved.resolve()
})
}
get status() {
return this.#status_resolved.promise
}
send(contract: contracts.EventContract) {
const success = this.#target.dispatchMessage(JSON.stringify(contract))
if (!success) {
throw new Error(`Failed to dispatch message over server sent events adapter ${this.#target}`)
}
}
}
class ServerAdapter extends adapter_base.ServerAdapter {
async handle_server_sent_events_request(ctx: OakRouterContext) {
// TODO error out if x-rpc-connection-id is already present?
const target = await ctx.sendEvents()
const sse_adapter = new ServerSentEventsAdapter<any>(target)
this.add_realtime_client(sse_adapter)
}
async handle_rpc_request(ctx: OakRouterContext) {
const request_contract: contracts.RequestContract = await ctx.request.body.json()
const connection_id = ctx.request.headers.get('x-rpc-connection-id')
const rpc_controller = this.load_controller(request_contract.namespace, connection_id)
const response_contract = await this.handle_request(rpc_controller, request_contract)
ctx.response.body = response_contract
}
static adapt<C, E>(rpc_class: typeof ApiController<C, E, any>, context: C): OakRouterFunction {
const adapter = new ServerAdapter(rpc_class, context)
return async (ctx: OakRouterContext) => {
if (ctx.request.headers.get('accept') === 'text/event-stream') {
await adapter.handle_server_sent_events_request(ctx)
} else {
await adapter.handle_rpc_request(ctx)
}
}
}
}
/**
* Adapt your rpc class to the [Oak Framework](https://jsr.io/@oak/oak) backend.
*
* @param rpc_class The rpc class containing your server rpc implementation
* @param context An object containing whatever data you want available in all rpc method
*
* @example
* ```ts
* import * as rpc from 'jsr:@andykais/ts-rpc/adapters/oak.ts'
* import * as oak from 'jsr:@oak/oak'
*
* const app = new oak.Application()
* const router = new oak.Router()
* const context: Context = { db: new Database() }
* router.put('/rpc/:signature', rpc.adapt(Api, context))
* app.use(router.routes())
* const abort_controller = new AbortController()
* app.listen({ port: 8001, signal: abort_controller.signal })
* ````
*
*/
export const adapt = ServerAdapter.adapt
export * from '../server.ts'