forked from stellar/go
/
server.go
103 lines (84 loc) · 2.14 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package sse
import (
"net/http"
"time"
"github.com/r3labs/sse"
"github.com/stellar/go/services/bifrost/common"
"github.com/stellar/go/support/log"
)
func (s *Server) init() {
s.eventsServer = sse.New()
s.lastID = -1
s.log = common.CreateLogger("SSEServer")
}
func (s *Server) BroadcastEvent(address string, event AddressEvent, data []byte) {
s.initOnce.Do(s.init)
eventRecord := Event{
Address: address,
Event: event,
Data: string(data),
}
err := s.Storage.AddEvent(eventRecord)
if err != nil {
s.log.WithFields(log.F{"err": err, "event": eventRecord}).Error("Error broadcasting event")
}
}
// StartPublishing starts publishing events from the shared storage.
func (s *Server) StartPublishing() error {
s.initOnce.Do(s.init)
var err error
s.lastID, _, err = s.Storage.GetEventsSinceID(s.lastID)
if err != nil {
return err
}
go func() {
// Start publishing
for {
lastID, events, err := s.Storage.GetEventsSinceID(s.lastID)
if err != nil {
s.log.WithField("err", err).Error("Error GetEventsSinceID")
time.Sleep(time.Second)
continue
}
if len(events) == 0 {
time.Sleep(time.Second)
continue
}
for _, event := range events {
s.publishEvent(event.Address, event.Event, []byte(event.Data))
}
s.lastID = lastID
}
}()
return nil
}
func (s *Server) publishEvent(address string, event AddressEvent, data []byte) {
s.initOnce.Do(s.init)
// Create SSE stream if not exists
if !s.eventsServer.StreamExists(address) {
s.eventsServer.CreateStream(address)
}
// github.com/r3labs/sse does not send new lines - TODO create PR
if data == nil {
data = []byte("{}\n")
} else {
data = append(data, byte('\n'))
}
s.eventsServer.Publish(address, &sse.Event{
ID: []byte(event),
Event: []byte(event),
Data: data,
})
}
func (s *Server) CreateStream(address string) {
s.initOnce.Do(s.init)
s.eventsServer.CreateStream(address)
}
func (s *Server) StreamExists(address string) bool {
s.initOnce.Do(s.init)
return s.eventsServer.StreamExists(address)
}
func (s *Server) HTTPHandler(w http.ResponseWriter, r *http.Request) {
s.initOnce.Do(s.init)
s.eventsServer.HTTPHandler(w, r)
}