/
start.go
152 lines (134 loc) · 3.04 KB
/
start.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package main
import (
"errors"
"log"
"os"
"time"
"github.com/bettercallshao/kwt/pkg/cmd"
"github.com/bettercallshao/kwt/pkg/exch"
"github.com/bettercallshao/kwt/pkg/menu"
"github.com/bettercallshao/kwt/pkg/msg"
"github.com/go-resty/resty/v2"
"github.com/gorilla/websocket"
)
func getInfo(master string, channel string) (msg.ChannelInfo, error) {
info := msg.ChannelInfo{}
herr := msg.Error{}
resp, err := resty.New().R().
SetResult(&info).
SetError(&herr).
Get(join(master, "channel", channel))
if err != nil {
return info, err
}
if !resp.IsSuccess() {
return info, errors.New("")
}
return info, nil
}
func start(master string, channel string, menuName string) error {
var conn *websocket.Conn
var info msg.ChannelInfo
var err error
// Construct url
url := addParam(
join(wsURL(master), "ws", "back", channel),
msg.MenuName,
menuName)
log.Printf("connecting to master %s ...\n", url)
// Open ws connection
conn, _, err = websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return err
}
defer conn.Close()
// Get channel info
time.Sleep(time.Millisecond)
info, err = getInfo(master, channel)
if err != nil {
return err
}
log.Printf("channel definition:\n%s\n", jsonParagraph(info))
// Set up a cancel channel
cancelMain := cancelChan()
cancelProc := cancelChan()
// Start exchange
source := make(chan interface{})
sink := make(chan interface{})
read := func() (interface{}, error) {
data := msg.Command{}
err := conn.ReadJSON(&data)
if err != nil {
log.Println("ws read failed")
}
return data, err
}
write := func(data interface{}) error {
err := conn.WriteJSON(data)
if err != nil {
log.Println("ws write failed")
}
return err
}
coInit := func(start bool) error {
if start {
log.Println("starting exchange loop ...")
} else {
log.Println("finished exchange loop")
log.Println("signaling processor termination ...")
cancelProc <- os.Interrupt
cancelMain <- os.Interrupt
}
return nil
}
go exch.Exchange(
source,
sink,
read,
write,
nil,
coInit,
)
// Start processor loop
go func() {
log.Println("starting processor loop ...")
for {
var raw interface{}
select {
case raw = <-sink:
break
case <-cancelProc:
log.Println("finished processor loop")
return
}
command, ok := raw.(msg.Command)
if !ok {
log.Printf("invalid command: %s\n", raw)
continue
}
log.Printf("command received:\n%s\n", jsonParagraph(command))
if !ok || command.Token != info.Token {
log.Printf("token mismatch: %s\n", command.Token)
continue
}
input, err := menu.Render(command.Action)
if err != nil {
log.Println("action failed to render")
continue
}
cmdSink := make(chan cmd.Payload)
log.Printf("running command: %s\n", input)
go cmd.Run(input, cmdSink)
for payload := range cmdSink {
log.Printf("sending payload:\n%s\n", jsonParagraph(payload))
source <- msg.Output{
Token: info.Token,
Payload: payload,
}
}
}
}()
// Trap until user cancel
<-cancelMain
return nil
}