/
ndjson.go
86 lines (71 loc) · 1.91 KB
/
ndjson.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package stream
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/hashicorp/nomad/nomad/structs"
)
var (
// JsonHeartbeat is an empty JSON object to send as a heartbeat
// Avoids creating many heartbeat instances
JsonHeartbeat = &structs.EventJson{Data: []byte("{}")}
)
// JsonStream is used to send new line delimited JSON and heartbeats
// to a destination (out channel)
type JsonStream struct {
// ctx is a passed in context used to notify the json stream
// when it should terminate
ctx context.Context
outCh chan *structs.EventJson
// heartbeat is the interval to send heartbeat messages to keep a connection
// open.
heartbeatTick *time.Ticker
}
// NewJsonStream creates a new json stream that will output Json structs
// to the passed output channel. The constructor starts a goroutine
// to begin heartbeating on its set interval.
func NewJsonStream(ctx context.Context, heartbeat time.Duration) *JsonStream {
s := &JsonStream{
ctx: ctx,
outCh: make(chan *structs.EventJson, 10),
heartbeatTick: time.NewTicker(heartbeat),
}
go s.heartbeat()
return s
}
func (n *JsonStream) OutCh() chan *structs.EventJson {
return n.outCh
}
func (n *JsonStream) heartbeat() {
for {
select {
case <-n.ctx.Done():
return
case <-n.heartbeatTick.C:
// Send a heartbeat frame
select {
case n.outCh <- JsonHeartbeat:
case <-n.ctx.Done():
return
}
}
}
}
// Send encodes an object into Newline delimited json. An error is returned
// if json encoding fails or if the stream is no longer running.
func (n *JsonStream) Send(v interface{}) error {
if n.ctx.Err() != nil {
return n.ctx.Err()
}
buf, err := json.Marshal(v)
if err != nil {
return fmt.Errorf("error marshaling json for stream: %w", err)
}
select {
case <-n.ctx.Done():
return fmt.Errorf("error stream is no longer running: %w", err)
case n.outCh <- &structs.EventJson{Data: buf}:
}
return nil
}