-
Notifications
You must be signed in to change notification settings - Fork 2
/
sse.go
131 lines (119 loc) · 2.55 KB
/
sse.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package xhttp
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
"github.com/eyasliu/cs"
)
// SSEMsgType SSE 的消息时间模式
type SSEMsgType byte
const (
SSEEvent = 0 // 使用事件, cmd 和 data 分开
SSEMessage = 1 // 使用消息, cmd 和 data 都放到sse 的 data 域
)
// SSEConn SSE 的连接实例
type SSEConn struct {
w http.ResponseWriter
flusher http.Flusher
msgType SSEMsgType
hbTime time.Duration
isClose bool
notifyErr chan error
}
func newSSEConn(w http.ResponseWriter, msgType SSEMsgType, heartbeatTime time.Duration) (*SSEConn, error) {
s := &SSEConn{
w: w,
msgType: msgType,
hbTime: heartbeatTime,
notifyErr: make(chan error),
}
flusher, ok := s.w.(http.Flusher)
if !ok {
return nil, errors.New("Streaming unsupported")
}
s.flusher = flusher
err := s.init()
return s, err
}
func (s *SSEConn) init() error {
s.w.Header().Set("Content-Type", "text/event-stream")
s.w.Header().Set("Cache-Control", "no-cache")
s.w.Header().Set("Connection", "keep-alive")
// s.w.Header().Del("Content-Length")
// retry
_, err := fmt.Fprint(s.w, "retry: 10000\n\n")
// _, err := writer(s.w, "retry: 10000\n\n")
if err != nil {
return err
}
s.flusher.Flush()
// heartbeat
go func(s *SSEConn) {
for {
time.Sleep(s.hbTime / 2)
if s.isClose {
break
}
_, err := fmt.Fprint(s.w, ": heartbeat\n\n")
if err != nil {
s.destroy(err)
break
}
s.flusher.Flush()
}
}(s)
return nil
}
// Send 给 sse 连接推送数据
func (s *SSEConn) Send(v ...*cs.Response) error {
if s.w == nil {
err := errors.New("connection is already closed")
s.destroy(err)
return err
}
for _, resp := range v {
msg := ""
if s.msgType == SSEEvent {
if resp.Seqno != "" {
msg += "id: " + resp.Seqno + "\n"
}
msg += "event: " + resp.Cmd + "\n"
if resp.Data != nil {
dataBt, err := json.Marshal(resp.Data)
if err != nil {
return err
}
msg += "data: " + string(dataBt) + "\n"
}
} else if s.msgType == SSEMessage {
resp1 := &responseData{
Cmd: resp.Cmd,
Seqno: resp.Seqno,
Code: resp.Code,
Msg: resp.Msg,
Data: resp.Data,
}
dataBt, err := json.Marshal(resp1)
if err != nil {
return err
}
msg += "data: " + string(dataBt) + "\n"
} else {
return errors.New("unsupport sse message type")
}
msg += "\n\n"
_, err := fmt.Fprint(s.w, msg)
s.flusher.Flush()
if err != nil {
s.destroy(err)
return err
}
}
return nil
}
func (s *SSEConn) destroy(err error) {
s.notifyErr <- err
s.isClose = true
}