/
httpbatchclient.ts
120 lines (105 loc) · 3.72 KB
/
httpbatchclient.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import {
isJsonRpcErrorResponse,
JsonRpcRequest,
JsonRpcSuccessResponse,
parseJsonRpcResponse,
} from "@cosmjs/json-rpc";
import { http } from "./http";
import { HttpEndpoint } from "./httpclient";
import { hasProtocol, RpcClient } from "./rpcclient";
export interface HttpBatchClientOptions {
/** Interval for dispatching batches (in milliseconds) */
dispatchInterval: number;
/** Max number of items sent in one request */
batchSizeLimit: number;
}
// Those values are private and can change any time.
// Does a user need to know them? I don't think so. You either set
// a custom value or leave the option field unset.
const defaultHttpBatchClientOptions: HttpBatchClientOptions = {
dispatchInterval: 20,
batchSizeLimit: 20,
};
export class HttpBatchClient implements RpcClient {
protected readonly url: string;
protected readonly headers: Record<string, string> | undefined;
protected readonly options: HttpBatchClientOptions;
private timer?: NodeJS.Timer;
private readonly queue: Array<{
request: JsonRpcRequest;
resolve: (a: JsonRpcSuccessResponse) => void;
reject: (a: Error) => void;
}> = [];
public constructor(endpoint: string | HttpEndpoint, options: Partial<HttpBatchClientOptions> = {}) {
this.options = {
batchSizeLimit: options.batchSizeLimit ?? defaultHttpBatchClientOptions.batchSizeLimit,
dispatchInterval: options.dispatchInterval ?? defaultHttpBatchClientOptions.dispatchInterval,
};
if (typeof endpoint === "string") {
// accept host.name:port and assume http protocol
this.url = hasProtocol(endpoint) ? endpoint : "http://" + endpoint;
} else {
this.url = endpoint.url;
this.headers = endpoint.headers;
}
this.timer = setInterval(() => this.tick(), options.dispatchInterval);
this.validate();
}
public disconnect(): void {
this.timer && clearInterval(this.timer);
this.timer = undefined;
}
public async execute(request: JsonRpcRequest): Promise<JsonRpcSuccessResponse> {
return new Promise((resolve, reject) => {
this.queue.push({ request, resolve, reject });
if (this.queue.length >= this.options.batchSizeLimit) {
// this train is full, let's go
this.tick();
}
});
}
private validate(): void {
if (
!this.options.batchSizeLimit ||
!Number.isSafeInteger(this.options.batchSizeLimit) ||
this.options.batchSizeLimit < 1
) {
throw new Error("batchSizeLimit must be a safe integer >= 1");
}
}
/**
* This is called in an interval where promise rejections cannot be handled.
* So this is not async and HTTP errors need to be handled by the queued promises.
*/
private tick(): void {
// Avoid race conditions
const batch = this.queue.splice(0, this.options.batchSizeLimit);
if (!batch.length) return;
const requests = batch.map((s) => s.request);
const requestIds = requests.map((request) => request.id);
http("POST", this.url, this.headers, requests).then(
(raw) => {
// Requests with a single entry return as an object
const arr = Array.isArray(raw) ? raw : [raw];
arr.forEach((el) => {
const req = batch.find((s) => s.request.id === el.id);
if (!req) return;
const { reject, resolve } = req;
const response = parseJsonRpcResponse(el);
if (isJsonRpcErrorResponse(response)) {
reject(new Error(JSON.stringify(response.error)));
} else {
resolve(response);
}
});
},
(error) => {
for (const requestId of requestIds) {
const req = batch.find((s) => s.request.id === requestId);
if (!req) return;
req.reject(error);
}
},
);
}
}