This repository has been archived by the owner on Oct 19, 2023. It is now read-only.
/
websocket.go
122 lines (110 loc) · 3 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package driver
import (
"encoding/json"
"net/http"
"sync"
"time"
"github.com/RomiChan/websocket"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/tidwall/gjson"
"golang.org/x/sync/syncmap"
nilbot "github.com/Ink-33/NilBot"
"github.com/Ink-33/NilBot/internal/logger"
)
// WSClient defines the default websocket client.
type WSClient struct {
AccessToken string
conn *websocket.Conn
lock sync.Mutex
rayIDMap syncmap.Map
Timeout time.Duration
URL string
}
// NewWebSocketClient returns a websocket client.
func NewWebSocketClient(url, accessToken string, timeout time.Duration) *WSClient {
return &WSClient{URL: url, AccessToken: accessToken, Timeout: timeout}
}
// Connect websocket server.
func (c *WSClient) Connect() {
logger.Info("Try connecting to websocket server %v", c.URL)
header := http.Header{
"User-Agent": []string{"NilBot/dev"},
}
if c.AccessToken != "" {
header["Authorization"] = []string{"Bear" + " " + c.AccessToken}
}
TRY:
conn, resp, err := websocket.DefaultDialer.Dial(c.URL, header)
if err != nil {
logger.Warn("An error occured while trying connecting to websocket server %v : %v", c.URL, err.Error())
time.Sleep(3 * time.Second)
goto TRY
}
c.conn = conn
resp.Body.Close()
logger.Info("Connected to websocket server %v .", c.URL)
}
// Listen websocket events.
func (c *WSClient) Listen(handler ...nilbot.EventHandler) {
for {
mtype, payload, err := c.conn.ReadMessage()
if err != nil {
logger.Warn("Disconnected from websocket server %v", c.URL)
c.conn = nil
c.Connect()
}
if mtype == websocket.TextMessage {
resp := gjson.ParseBytes(payload)
if resp.Get("echo").Exists() {
if ch, ok := c.rayIDMap.LoadAndDelete(resp.Get("echo").String()); ok {
ch.(chan *nilbot.APIResponse) <- &nilbot.APIResponse{
Data: resp.Get("data"),
Echo: resp.Get("echo").Str,
Msg: resp.Get("msg").Str,
RetCode: resp.Get("retcode").Int(),
Status: resp.Get("status").Str,
Wording: resp.Get("wording").Str,
}
close(ch.(chan *nilbot.APIResponse))
}
} else {
for k := range handler {
go handler[k].Handle(payload, c)
}
}
}
}
}
// CallAPI sends websocket request.
func (c *WSClient) CallAPI(req *nilbot.APIRequest) (resp *nilbot.APIResponse, err error) {
if req == nil {
return nil, errors.New("request is nil")
}
if c.conn == nil {
return nil, errors.New("connection does not create or has been closed")
}
req.Echo = uuid.NewString()
ch := make(chan *nilbot.APIResponse, 1)
c.rayIDMap.Store(req.Echo, ch)
payload, err := json.Marshal(req)
if err != nil {
return nil, err
}
c.lock.Lock()
err = c.conn.WriteMessage(websocket.TextMessage, payload)
c.lock.Unlock()
if err != nil {
logger.Warn("An error occured while requesting API : %v\n", err.Error())
return nil, err
}
select {
case resp, ok := <-ch:
if !ok {
return nil, errors.New("channel closed")
}
return resp, nil
case <-time.After(c.Timeout):
return nil, errors.New("time out")
}
}