This repository has been archived by the owner on Aug 21, 2023. It is now read-only.
/
flusher.go
136 lines (107 loc) · 2.3 KB
/
flusher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package rtstats
import (
"context"
"sync"
"time"
"golang.org/x/xerrors"
nsbus "github.com/13k/night-stalker/internal/bus"
nscol "github.com/13k/night-stalker/internal/collections"
nslog "github.com/13k/night-stalker/internal/logger"
nspb "github.com/13k/night-stalker/internal/protobuf/protocol"
"github.com/13k/night-stalker/models"
)
type flusherOptions struct {
Log *nslog.Logger
Bus *nsbus.Bus
Interval time.Duration
Cap int
}
type flusher struct {
options *flusherOptions
log *nslog.Logger
bus *nsbus.Bus
buf nscol.LiveMatchStats
mtx *sync.Mutex
ctx context.Context
values chan *models.LiveMatchStats
errors chan error
}
func newFlusher(options *flusherOptions) *flusher {
return &flusher{
options: options,
log: options.Log.WithPackage("flusher"),
bus: options.Bus,
mtx: &sync.Mutex{},
}
}
func (f *flusher) Errors() <-chan error {
return f.errors
}
func (f *flusher) Start(ctx context.Context) {
f.ctx = ctx
f.values = make(chan *models.LiveMatchStats)
f.errors = make(chan error)
go f.loop()
}
func (f *flusher) stop(t *time.Ticker) {
t.Stop()
close(f.values)
f.values = nil
close(f.errors)
f.errors = nil
f.ctx = nil
f.log.Trace("stop")
}
func (f *flusher) loop() {
t := time.NewTicker(f.options.Interval)
defer f.stop(t)
f.log.Trace("start")
for {
select {
case <-f.ctx.Done():
return
case <-t.C:
f.Flush()
case v, ok := <-f.values:
if !ok {
return
}
f.add(v)
}
}
}
func (f *flusher) Add(stats *models.LiveMatchStats) {
f.values <- stats
}
func (f *flusher) add(stats *models.LiveMatchStats) {
f.mtx.Lock()
defer f.mtx.Unlock()
f.buf = append(f.buf, stats)
if len(f.buf) >= f.options.Cap {
f.safeFlush()
}
}
func (f *flusher) Flush() {
f.mtx.Lock()
defer f.mtx.Unlock()
if len(f.buf) == 0 {
return
}
f.safeFlush()
}
// TODO: it should reset flush timer
func (f *flusher) safeFlush() {
f.log.WithField("count", len(f.buf)).Trace("flush")
busMsg := nsbus.Message{
Topic: nsbus.TopicLiveMatchStatsAdd,
Payload: &nsbus.LiveMatchStatsChangeMessage{
Op: nspb.CollectionOp_COLLECTION_OP_ADD,
Stats: f.buf,
},
}
if err := f.bus.Pub(busMsg); err != nil {
f.errors <- xerrors.Errorf("error publishing live match stats change: %w", err)
return
}
f.buf = nil
}