/
controls.go
executable file
·217 lines (187 loc) · 5.58 KB
/
controls.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
package xfer
import (
"fmt"
"net/rpc"
"strconv"
"sync"
)
// ErrInvalidMessage is the error returned when the on-wire message is unexpected.
var ErrInvalidMessage = fmt.Errorf("Invalid Message")
// Request is the UI -> App -> Probe message type for control RPCs
type Request struct {
AppID string // filled in by the probe on receiving this request
NodeID string
Control string
ControlArgs map[string]string
}
// Response is the Probe -> App -> UI message type for the control RPCs.
type Response struct {
Value interface{} `json:"value,omitempty"`
Error string `json:"error,omitempty"`
// Pipe specific fields
Pipe string `json:"pipe,omitempty"`
RawTTY bool `json:"raw_tty,omitempty"`
ResizeTTYControl string `json:"resize_tty_control,omitempty"`
// Remove specific fields
RemovedNode string `json:"removedNode,omitempty"` // Set if node was removed
}
// Message is the unions of Request, Response and arbitrary Value.
type Message struct {
Request *rpc.Request
Response *rpc.Response
Value interface{}
}
// ControlHandler is interface used in the app and the probe to represent
// a control RPC.
type ControlHandler interface {
Handle(req Request, res *Response) error
}
// ControlHandlerFunc is a adapter (ala golang's http RequestHandlerFunc)
// for ControlHandler
type ControlHandlerFunc func(Request) Response
// Handle is an adapter method to make ControlHandlers exposable via golang rpc
func (c ControlHandlerFunc) Handle(req Request, res *Response) error {
*res = c(req)
return nil
}
// ResizeTTYControlWrapper extracts the arguments needed by the resize tty control handler
func ResizeTTYControlWrapper(next func(pipeID string, height, width uint) Response) ControlHandlerFunc {
return func(req Request) Response {
var (
height, width uint64
err error
)
pipeID, ok := req.ControlArgs["pipeID"]
if !ok {
return ResponseErrorf("Missing argument: pipeID")
}
heightS, ok := req.ControlArgs["height"]
if !ok {
return ResponseErrorf("Missing argument: height")
}
widthS, ok := req.ControlArgs["width"]
if !ok {
return ResponseErrorf("Missing argument: width")
}
height, err = strconv.ParseUint(heightS, 10, 32)
if err != nil {
return ResponseErrorf("Bad parameter: height (%q): %v", heightS, err)
}
width, err = strconv.ParseUint(widthS, 10, 32)
if err != nil {
return ResponseErrorf("Bad parameter: width (%q): %v", widthS, err)
}
return next(pipeID, uint(height), uint(width))
}
}
// ResponseErrorf creates a new Response with the given formatted error string.
func ResponseErrorf(format string, a ...interface{}) Response {
return Response{
Error: fmt.Sprintf(format, a...),
}
}
// ResponseError creates a new Response with the given error.
func ResponseError(err error) Response {
if err != nil {
return Response{
Error: err.Error(),
}
}
return Response{}
}
// JSONWebsocketCodec is golang rpc compatible Server and Client Codec
// that transmits and receives RPC messages over a websocker, as JSON.
type JSONWebsocketCodec struct {
sync.Mutex
conn Websocket
err chan error
}
// NewJSONWebsocketCodec makes a new JSONWebsocketCodec
func NewJSONWebsocketCodec(conn Websocket) *JSONWebsocketCodec {
return &JSONWebsocketCodec{
conn: conn,
err: make(chan error, 1),
}
}
// WaitForReadError blocks until any read on this codec returns an error.
// This is useful to know when the server has disconnected from the client.
func (j *JSONWebsocketCodec) WaitForReadError() error {
return <-j.err
}
// WriteRequest implements rpc.ClientCodec
func (j *JSONWebsocketCodec) WriteRequest(r *rpc.Request, v interface{}) error {
j.Lock()
defer j.Unlock()
if err := j.conn.WriteJSON(Message{Request: r}); err != nil {
return err
}
return j.conn.WriteJSON(Message{Value: v})
}
// WriteResponse implements rpc.ServerCodec
func (j *JSONWebsocketCodec) WriteResponse(r *rpc.Response, v interface{}) error {
j.Lock()
defer j.Unlock()
if err := j.conn.WriteJSON(Message{Response: r}); err != nil {
return err
}
return j.conn.WriteJSON(Message{Value: v})
}
func (j *JSONWebsocketCodec) readMessage(v interface{}) (*Message, error) {
m := Message{Value: v}
if err := j.conn.ReadJSON(&m); err != nil {
j.err <- err
close(j.err)
return nil, err
}
return &m, nil
}
// ReadResponseHeader implements rpc.ClientCodec
func (j *JSONWebsocketCodec) ReadResponseHeader(r *rpc.Response) error {
m, err := j.readMessage(nil)
if err != nil {
return err
}
if m.Response == nil {
return ErrInvalidMessage
}
*r = *m.Response
return nil
}
// ReadResponseBody implements rpc.ClientCodec
func (j *JSONWebsocketCodec) ReadResponseBody(v interface{}) error {
_, err := j.readMessage(v)
if err != nil {
return err
}
if v == nil {
return ErrInvalidMessage
}
return nil
}
// Close implements rpc.ClientCodec and rpc.ServerCodec
func (j *JSONWebsocketCodec) Close() error {
return j.conn.Close()
}
// ReadRequestHeader implements rpc.ServerCodec
func (j *JSONWebsocketCodec) ReadRequestHeader(r *rpc.Request) error {
m, err := j.readMessage(nil)
if err != nil {
return err
}
if m.Request == nil {
return ErrInvalidMessage
}
*r = *m.Request
return nil
}
// ReadRequestBody implements rpc.ServerCodec
func (j *JSONWebsocketCodec) ReadRequestBody(v interface{}) error {
_, err := j.readMessage(v)
if err != nil {
return err
}
if v == nil {
return ErrInvalidMessage
}
return nil
}