/
jcall.go
347 lines (316 loc) · 9.21 KB
/
jcall.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
// Copyright (C) 2017 Michael J. Fromberger. All Rights Reserved.
// Program jcall issues RPC calls to a JSON-RPC server.
//
// Usage:
//
// jcall [options] <address> {<method> <params>}...
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"log"
"net"
"os"
"os/signal"
"path/filepath"
"strings"
"time"
"github.com/creachadair/jrpc2"
"github.com/creachadair/jrpc2/channel"
"github.com/creachadair/jrpc2/jhttp"
"github.com/creachadair/wschannel"
)
var (
dialTimeout = flag.Duration("dial", 5*time.Second, "Timeout on dialing the server (0 for no timeout)")
callTimeout = flag.Duration("timeout", 0, "Timeout on each call (0 for no timeout)")
doNotify = flag.Bool("notify", false, "Send a notification")
chanFraming = flag.String("f", envOrDefault("JCALL_FRAMING", "line"), "Channel framing")
doBatch = flag.Bool("batch", false, "Issue calls as a batch rather than sequentially")
doErrors = flag.Bool("e", false, "Print error values to stdout")
doIndent = flag.Bool("i", false, "Indent JSON output")
doMulti = flag.Bool("m", false, "Issue the same call repeatedly with different arguments")
doTiming = flag.Bool("T", false, "Print call timing stats")
doWaitExit = flag.Bool("W", false, "Wait for interrupt at exit")
withLogging = flag.Bool("v", false, "Enable verbose logging")
)
func init() {
flag.Usage = func() {
fmt.Fprintf(os.Stderr, `Usage: %[1]s [options] <address> {<method> <params>}...
%[1]s [options] -m <address> <method> <params>...
Connect to the specified address and transmit the specified JSON-RPC method
calls in sequence (or as a batch, if -batch is set). The resulting response
values are printed to stdout.
Supported address formats include:
host:port -- TCP connection to the given host and port
some/path -- Unix-domain socket at the given path
http://host:port/path -- HTTP connection to the given URL
ws://host:port/path -- Websocket connection to the given URL
Without -m, each pair of arguments names a method and its parameters to call.
With -m, the first argument names a method to be repeatedly called with each of
the remaining arguments as its parameter.
The -f flag sets the framing discipline to use. The client must agree with the
server in order for communication to work. The options are:
header:<t> -- header-framed, content-type <t>
strict:<t> -- strict header-framed, content-type <t>
line -- byte-terminated, records end in LF (Unicode 10)
lsp -- header-framed, content-type application/vscode-jsonrpc (like LSP)
raw -- unframed, each message is a complete JSON value
See also: https://godoc.org/github.com/creachadair/jrpc2/channel.
The default framing is read from the JCALL_FRAMING environment variable, if set.
The -f flag overrides the environment.
Options:
`, filepath.Base(os.Args[0]))
flag.PrintDefaults()
}
}
func main() {
flag.Parse()
// There must be at least one request, and more are permitted. Each method
// must have an argument, though it may be empty.
if *doMulti {
if flag.NArg() < 3 {
log.Fatal("Arguments are <address> <method> <params>...")
}
} else if flag.NArg() < 3 || flag.NArg()%2 == 0 {
log.Fatal("Arguments are <address> {<method> <params>}...")
}
// Set up the context for the call, including a timeouts if specified.
ctx := context.Background()
if *callTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, *callTimeout)
defer cancel()
}
// Establish a client channel. If we are using HTTP we do not need to dial a
// connection; the HTTP client will handle that.
start := time.Now()
var cc channel.Channel
if isHTTP(flag.Arg(0)) {
cc = jhttp.NewChannel(flag.Arg(0), nil)
} else if isWebsocket(flag.Arg(0)) {
ch, err := wschannel.Dial(flag.Arg(0), nil)
if err != nil {
log.Fatalf("Dial %q: %v", flag.Arg(0), err)
}
defer ch.Close()
cc = ch
} else if nc := newFraming(*chanFraming); nc == nil {
log.Fatalf("Unknown channel framing %q", *chanFraming)
} else {
ntype, _ := jrpc2.Network(flag.Arg(0))
conn, err := net.DialTimeout(ntype, flag.Arg(0), *dialTimeout)
if err != nil {
log.Fatalf("Dial %q: %v", flag.Arg(0), err)
}
defer conn.Close()
cc = nc(conn, conn)
}
tdial := time.Now()
done := make(chan os.Signal, 1)
if *doWaitExit {
signal.Notify(done, os.Interrupt)
} else {
close(done)
}
cli := newClient(cc)
pdur, err := issueCalls(ctx, cli, flag.Args()[1:])
// defer failure on error till after we print aggregate timing stats
tcall := time.Now()
if e, ok := err.(*jrpc2.Error); ok && *doErrors {
etxt, _ := json.Marshal(e)
fmt.Println(string(etxt))
} else if err != nil {
log.Printf("Call failed: %v", err)
}
cdur := tcall.Sub(tdial) - pdur
tprintf("%v elapsed: %v dial, %v call, %v print [%s]",
tcall.Sub(start), tdial.Sub(start), cdur, pdur, callStatus(err))
if err != nil {
os.Exit(1)
}
if *doWaitExit {
log.Print("<waiting at exit>")
}
<-done
}
func newClient(conn channel.Channel) *jrpc2.Client {
opts := &jrpc2.ClientOptions{
OnNotify: func(req *jrpc2.Request) {
var p json.RawMessage
req.UnmarshalParams(&p)
fmt.Printf(`{"method":%q,"params":%s}`+"\n", req.Method(), string(p))
},
}
if *withLogging {
opts.Logger = jrpc2.StdLogger(nil)
}
return jrpc2.NewClient(conn, opts)
}
func printResults(rsps []*jrpc2.Response) (time.Duration, error) {
var err error
set := func(e error) {
if err == nil {
err = e
}
}
var dur time.Duration
for i, rsp := range rsps {
if rerr := rsp.Error(); rerr != nil {
if *doErrors {
etxt, _ := json.Marshal(rerr)
fmt.Println(formatJSON(etxt))
} else {
log.Printf("Error (%d): %v", i+1, rerr)
}
set(errors.New("batch contained errors"))
continue
}
pstart := time.Now()
var result json.RawMessage
if perr := rsp.UnmarshalResult(&result); perr != nil {
log.Printf("Decoding (%d): %v", i+1, perr)
set(perr)
continue
}
fmt.Println(formatJSON(result))
dur += time.Since(pstart)
}
return dur, err
}
func issueCalls(ctx context.Context, cli *jrpc2.Client, args []string) (time.Duration, error) {
specs := newSpecs(args)
if *doBatch {
rsps, err := cli.Batch(ctx, specs)
if err != nil {
return 0, err
}
return printResults(rsps)
}
return issueSequential(ctx, cli, specs)
}
func tprintf(msg string, args ...interface{}) {
if !*doTiming {
return
}
fmt.Fprintf(os.Stderr, msg, args...)
if !strings.HasSuffix(msg, "\n") {
fmt.Fprintln(os.Stderr)
}
}
func issueSequential(ctx context.Context, cli *jrpc2.Client, specs []jrpc2.Spec) (time.Duration, error) {
var dur time.Duration
for _, spec := range specs {
cstart := time.Now()
if spec.Notify {
err := cli.Notify(ctx, spec.Method, spec.Params)
tprintf("[notify %s]: %v call [%s]", spec.Method, time.Since(cstart), callStatus(err))
if err != nil {
return dur, err
}
continue
}
rsp, err := cli.Call(ctx, spec.Method, spec.Params)
if err != nil {
return dur, err
}
cdur := time.Since(cstart)
pstart := time.Now()
var result json.RawMessage
if perr := rsp.UnmarshalResult(&result); perr != nil {
return dur, err
}
fmt.Println(formatJSON(result))
pdur := time.Since(pstart)
dur += pdur
tprintf("[call %s]: %v call, %v print [%s]\n", spec.Method, cdur, pdur, callStatus(err))
}
return dur, nil
}
func newSpecs(args []string) []jrpc2.Spec {
if *doMulti {
specs := make([]jrpc2.Spec, 0, len(args)-1)
method := args[0]
for _, arg := range args[1:] {
specs = append(specs, jrpc2.Spec{
Method: method,
Params: param(arg),
Notify: *doNotify,
})
}
return specs
}
specs := make([]jrpc2.Spec, 0, len(args)/2)
for i := 0; i < len(args); i += 2 {
specs = append(specs, jrpc2.Spec{
Method: args[i],
Params: param(args[i+1]),
Notify: *doNotify,
})
}
return specs
}
func param(s string) interface{} {
if s == "" {
return nil
}
return json.RawMessage(s)
}
func formatJSON(data []byte) string {
if *doIndent {
var buf bytes.Buffer
json.Indent(&buf, data, "", " ")
return buf.String()
}
return string(data)
}
func isHTTP(addr string) bool {
return strings.HasPrefix(addr, "http:") || strings.HasPrefix(addr, "https:")
}
func isWebsocket(addr string) bool {
return strings.HasPrefix(addr, "ws:") || strings.HasPrefix(addr, "wss:")
}
func callStatus(err error) string {
switch err.(type) {
case nil:
return "OK"
case *jrpc2.Error:
return "server error"
default:
return "failed"
}
}
func envOrDefault(env, dflt string) string {
if s, ok := os.LookupEnv(env); ok {
return s
}
return dflt
}
// newFraming returns a channel.Framing described by the specified name, or nil
// if the name is unknown. The framing types currently understood are:
//
// header:t -- corresponds to channel.Header(t)
// strict:t -- corresponds to channel.StrictHeader(t)
// line -- corresponds to channel.Line
// lsp -- corresponds to channel.LSP
// raw -- corresponds to channel.RawJSON
func newFraming(name string) channel.Framing {
if t := strings.TrimPrefix(name, "header:"); t != name {
return channel.Header(t)
}
if t := strings.TrimPrefix(name, "strict:"); t != name {
return channel.StrictHeader(t)
}
switch name {
case "line":
return channel.Line
case "lsp":
return channel.LSP
case "raw":
return channel.RawJSON
}
return nil
}