forked from stellar/go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
134 lines (111 loc) · 3.19 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package sse
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
)
// Event is the packet of data that gets sent over the wire to a connected
// client.
type Event struct {
Data interface{}
Error error
ID string
Event string
Retry int
}
// SseEvent returns the SSE compatible form of the Event... itself.
func (e Event) SseEvent() Event {
return e
}
// Eventable represents an object that can be converted to an SSE compatible
// event.
type Eventable interface {
// SseEvent returns the SSE compatible form of the implementer
SseEvent() Event
}
// Pumped returns a channel that will be closed the next time the input pump
// sends. It can be used similar to `ctx.Done()`, like so: `<-sse.Pumped()`
func Pumped() <-chan struct{} {
lock.Lock()
defer lock.Unlock()
return nextTick
}
// Tick triggers any open SSE streams to tick by replacing and closing the
// `nextTick` trigger channel.
func Tick() {
lock.Lock()
prev := nextTick
nextTick = make(chan struct{})
lock.Unlock()
close(prev)
}
// WritePreamble prepares this http connection for streaming using Server Sent
// Events. It sends the initial http response with the appropriate headers to
// do so.
func WritePreamble(ctx context.Context, w http.ResponseWriter) bool {
_, flushable := w.(http.Flusher)
if !flushable {
//TODO: render a problem struct instead of simple string
http.Error(w, "Streaming Not Supported", http.StatusBadRequest)
return false
}
w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(200)
WriteEvent(ctx, w, helloEvent)
return true
}
// WriteEvent does the actual work of formatting an SSE compliant message
// sending it over the provided ResponseWriter and flushing.
func WriteEvent(ctx context.Context, w http.ResponseWriter, e Event) {
if e.Error != nil {
fmt.Fprint(w, "event: err\n")
fmt.Fprintf(w, "data: %s\n\n", e.Error.Error())
w.(http.Flusher).Flush()
return
}
// TODO: add tests to ensure retry get's properly rendered
if e.Retry != 0 {
fmt.Fprintf(w, "retry: %d\n", e.Retry)
}
if e.ID != "" {
fmt.Fprintf(w, "id: %s\n", e.ID)
}
if e.Event != "" {
fmt.Fprintf(w, "event: %s\n", e.Event)
}
fmt.Fprintf(w, "data: %s\n\n", getJSON(e.Data))
w.(http.Flusher).Flush()
}
// Upon successful completion of a query (i.e. the client didn't disconnect
// and we didn't error) we send a "Goodbye" event. This is a dummy event
// so that we can set a low retry value so that the client will immediately
// recoonnect and request more data. This helpes to give the feel of a infinite
// stream of data, even though we're actually responding in PAGE_SIZE chunks.
var goodbyeEvent = Event{
Data: "byebye",
Event: "close",
Retry: 10,
}
// Upon initial stream creation, we send this event to inform the client
// that they may retry an errored connection after 1 second.
var helloEvent = Event{
Data: "hello",
Event: "open",
Retry: 1000,
}
var (
lock sync.Mutex
nextTick = make(chan struct{})
)
func getJSON(val interface{}) string {
js, err := json.Marshal(val)
if err != nil {
panic(err)
}
return string(js)
}