forked from stellar/go
/
main.go
114 lines (95 loc) · 2.75 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package sse
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
)
// Event is the packet of data that gets sent over the wire to a connected
// client.
type Event struct {
Data interface{}
Error error
ID string
Event string
Retry int
}
// SseEvent returns the SSE compatible form of the Event... itself.
func (e Event) SseEvent() Event {
return e
}
// Eventable represents an object that can be converted to an SSE compatible
// event.
type Eventable interface {
// SseEvent returns the SSE compatible form of the implementer
SseEvent() Event
}
// WritePreamble prepares this http connection for streaming using Server Sent
// Events. It sends the initial http response with the appropriate headers to
// do so.
func WritePreamble(ctx context.Context, w http.ResponseWriter) bool {
_, flushable := w.(http.Flusher)
if !flushable {
//TODO: render a problem struct instead of simple string
http.Error(w, "Streaming Not Supported", http.StatusBadRequest)
return false
}
w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(200)
WriteEvent(ctx, w, helloEvent)
return true
}
// WriteEvent does the actual work of formatting an SSE compliant message
// sending it over the provided ResponseWriter and flushing.
func WriteEvent(ctx context.Context, w http.ResponseWriter, e Event) {
if e.Error != nil {
fmt.Fprint(w, "event: error\n")
fmt.Fprintf(w, "data: %s\n\n", e.Error.Error())
w.(http.Flusher).Flush()
return
}
// TODO: add tests to ensure retry get's properly rendered
if e.Retry != 0 {
fmt.Fprintf(w, "retry: %d\n", e.Retry)
}
if e.ID != "" {
fmt.Fprintf(w, "id: %s\n", e.ID)
}
if e.Event != "" {
fmt.Fprintf(w, "event: %s\n", e.Event)
}
fmt.Fprintf(w, "data: %s\n\n", getJSON(e.Data))
w.(http.Flusher).Flush()
}
// Upon successful completion of a query (i.e. the client didn't disconnect
// and we didn't error) we send a "Goodbye" event. This is a dummy event
// so that we can set a low retry value so that the client will immediately
// recoonnect and request more data. This helpes to give the feel of a infinite
// stream of data, even though we're actually responding in PAGE_SIZE chunks.
var goodbyeEvent = Event{
Data: "byebye",
Event: "close",
Retry: 10,
}
// Upon initial stream creation, we send this event to inform the client
// that they may retry an errored connection after 1 second.
var helloEvent = Event{
Data: "hello",
Event: "open",
Retry: 1000,
}
var (
lock sync.Mutex
nextTick = make(chan struct{})
)
func getJSON(val interface{}) string {
js, err := json.Marshal(val)
if err != nil {
panic(err)
}
return string(js)
}