This repository has been archived by the owner on May 27, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
miraihttp.go
186 lines (174 loc) · 4.75 KB
/
miraihttp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package miraihttp
import (
"encoding/json"
"errors"
"fmt"
"github.com/CuteReimu/goutil"
"github.com/gorilla/websocket"
"github.com/tidwall/gjson"
"log/slog"
"runtime/debug"
"strconv"
"sync"
"sync/atomic"
"time"
)
// WsChannel 连接通道
type WsChannel string
const (
// WsChannelMessage 推送消息
WsChannelMessage = "model"
// WsChannelEvent 推送事件
WsChannelEvent = "event"
// WsChannelAll 推送消息及事件
WsChannelAll = "all"
)
// Connect 连接mirai-api-http
//
// concurrentEvent 参数如果是true,表示采用并发方式处理事件和消息,由调用者自行解决并发问题。
// 如果是false表示用单线程处理事件和消息,调用者无需关心并发问题。
func Connect(host string, port int, channel WsChannel, verifyKey string, qq int64, concurrentEvent bool) (*Bot, error) {
addr := fmt.Sprintf("ws://%s:%d/%s?verifyKey=%s&qq=%d", host, port, channel, verifyKey, qq)
log := slog.With("addr", addr)
log.Info("Dialing")
c, _, err := websocket.DefaultDialer.Dial(addr, nil)
if err != nil {
log.Error("Connect failed")
return nil, err
}
log.Info("Connected successfully")
b := &Bot{QQ: qq, c: c, handler: make(map[string][]listenHandler)}
if !concurrentEvent {
b.eventChan = goutil.NewBlockingQueue[func()]()
go func() {
for {
b.eventChan.Take()()
}
}()
}
go func() {
for {
t, message, err := c.ReadMessage()
if t != websocket.TextMessage {
continue
}
if err != nil {
log.Error("read error", "error", err)
return
}
if !gjson.ValidBytes(message) {
log.Error("invalid json message: " + string(message))
continue
}
syncId := gjson.GetBytes(message, "syncId").String()
data := gjson.GetBytes(message, "data")
if data.Type != gjson.JSON {
log.Error("invalid json message: " + string(message))
continue
}
if len(syncId) > 0 && syncId[0] != '-' {
log.Debug("recv", "data", data, "syncId", syncId)
if ch, ok := b.syncIdMap.LoadAndDelete(syncId); ok {
ch0 := ch.(chan gjson.Result)
ch0 <- data
close(ch0)
}
continue
}
messageType := data.Get("type").String()
b.handlerLock.RLock()
if h, ok := b.handler[messageType]; ok {
b.handlerLock.RUnlock()
if p := decoder[messageType]; p == nil {
log.Error("cannot find message decoder: " + messageType)
} else if m := p(data); m != nil {
log.Debug("recv", "content", m)
fun := func() {
defer func() {
if r := recover(); r != nil {
log.Error("panic recovered", "error", r, "stack", string(debug.Stack()))
}
}()
for _, f := range h {
if !f(m) {
break
}
}
}
if b.eventChan == nil {
go fun()
} else {
b.eventChan.Put(fun)
}
}
} else {
b.handlerLock.RUnlock()
}
}
}()
return b, nil
}
type Bot struct {
QQ int64
c *websocket.Conn
syncId atomic.Int64
handlerLock sync.RWMutex
handler map[string][]listenHandler
syncIdMap sync.Map
eventChan *goutil.BlockingQueue[func()]
}
// request 发送请求
func (b *Bot) request(command, subCommand string, m any) (gjson.Result, error) {
msg := &requestMessage{
SyncId: b.syncId.Add(1),
Command: command,
SubCommand: subCommand,
Content: m,
}
log := slog.With("command", command, "subCommand", subCommand)
syncId := strconv.FormatInt(msg.SyncId, 10)
buf, err := json.Marshal(msg)
if err != nil {
log.Error("json marshal failed", "error", err)
return gjson.Result{}, err
}
ch := make(chan gjson.Result, 1)
b.syncIdMap.Store(syncId, ch)
err = b.c.WriteMessage(websocket.TextMessage, buf)
if err != nil {
log.Error("send error", "error", err)
return gjson.Result{}, err
}
log.Debug("send", "content", m, "syncId", syncId, "cmd", command, "subCmd", subCommand)
timeoutTimer := time.AfterFunc(5*time.Second, func() {
if ch, ok := b.syncIdMap.LoadAndDelete(syncId); ok {
close(ch.(chan gjson.Result))
}
})
result, ok := <-ch
if !ok {
log.Error("request timeout")
return gjson.Result{}, errors.New("request timeout")
}
timeoutTimer.Stop()
code := result.Get("code").Int()
if code != 0 {
e := fmt.Sprint("Non-zero code: ", code, ", error message: ", result.Get("msg"))
log.Error(e)
return gjson.Result{}, errors.New(e)
}
return result, nil
}
type requestMessage struct {
SyncId int64 `json:"syncId"`
Command string `json:"command"`
SubCommand string `json:"subCommand,omitempty"`
Content any `json:"content,omitempty"`
}
var decoder = make(map[string]func(data gjson.Result) any)
type listenHandler func(message any) bool
func listen[M any](b *Bot, key string, l func(message M) bool) {
b.handlerLock.Lock()
defer b.handlerLock.Unlock()
b.handler[key] = append(b.handler[key], func(m any) bool { return l(m.(M)) })
}