-
Notifications
You must be signed in to change notification settings - Fork 0
/
command.go
113 lines (96 loc) · 2.51 KB
/
command.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package command
import (
"encoding/json"
"fmt"
"io"
"github.com/ericluj/emq/internal/common"
"github.com/ericluj/emq/internal/protocol"
)
const (
PING = "PING"
SUB = "SUB"
PUB = "PUB"
NOP = "NOP"
REQ = "REQ"
IDENTIFY = "IDENTIFY"
REGISTER = "REGISTER"
UNREGISTER = "UNREGISTER"
)
type Command struct {
Name []byte
Params [][]byte
Body []byte
}
func PingCmd() *Command {
return &Command{Name: []byte(PING), Params: nil, Body: nil}
}
func IDENTIFYCmd(data map[string]interface{}) (*Command, error) {
bs, err := json.Marshal(data)
if err != nil {
return nil, err
}
return &Command{Name: []byte(IDENTIFY), Params: nil, Body: bs}, nil
}
func RegisterCmd(topic, channel string) *Command {
params := [][]byte{}
params = append(params, []byte(topic))
if len(channel) > 0 {
params = append(params, []byte(channel))
}
return &Command{Name: []byte(REGISTER), Params: params, Body: nil}
}
func UnRegisterCmd(topic, channel string) *Command {
params := [][]byte{}
params = append(params, []byte(topic))
if len(channel) > 0 {
params = append(params, []byte(channel))
}
return &Command{Name: []byte(UNREGISTER), Params: params, Body: nil}
}
func SubscribeCmd(topic string, channel string) *Command {
var params = [][]byte{[]byte(topic), []byte(channel)}
return &Command{Name: []byte(SUB), Params: params, Body: nil}
}
func PublishCmd(topic string, body []byte) *Command {
var params = [][]byte{[]byte(topic)}
return &Command{Name: []byte(PUB), Params: params, Body: body}
}
func NopCmd() *Command {
return &Command{[]byte(NOP), nil, nil}
}
func RequeueCmd(id []byte) *Command {
var params = [][]byte{id[:]}
return &Command{Name: []byte(REQ), Params: params, Body: nil}
}
// emqcli to emqd
func (cmd *Command) Write(w io.Writer) error {
// 发送命令名
_, err := w.Write(cmd.Name)
if err != nil {
return fmt.Errorf("cmdName error: %v", err)
}
// 发送参数
for _, param := range cmd.Params {
_, err = w.Write(common.SeparatorBytes) // 空格分开
if err != nil {
return fmt.Errorf("SeparatorBytes error: %v", err)
}
_, err = w.Write(param)
if err != nil {
return fmt.Errorf("param error: %v", err)
}
}
// 发送消息内容前换行
_, err = w.Write(common.NewLineBytes)
if err != nil {
return fmt.Errorf("NewLineBytes error: %v", err)
}
// 发送消息内容
if cmd.Body != nil {
err = protocol.SendFrameData(w, common.FrameTypeMessage, cmd.Body)
if err != nil {
return fmt.Errorf("SendFrameData error: %v", err)
}
}
return nil
}