-
Notifications
You must be signed in to change notification settings - Fork 1
/
stream.go
126 lines (111 loc) · 2.61 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright 2022 Ed Huang<i@huangdx.net>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tipubsub
import (
"encoding/json"
"time"
"github.com/c4pt0r/log"
)
type Message struct {
ID int64 `json:"id,string"`
Ts int64 `json:"ts,string"`
Data string `json:"data"`
}
func (m Message) String() string {
b, _ := json.Marshal(m)
return string(b)
}
type Stream struct {
name string
mq chan *Message
store Store
maxBatchSize int
}
var (
pullTimeout = time.Millisecond * 100
)
func (s *Stream) Name() string {
return s.name
}
func NewStream(cfg *Config, s Store, name string) (*Stream, error) {
return &Stream{
store: s,
name: name,
mq: make(chan *Message, cfg.MaxBatchSize),
maxBatchSize: cfg.MaxBatchSize,
}, nil
}
func (s *Stream) Open() error {
err := s.store.CreateStream(s.name)
if err != nil {
return err
}
log.Info("pub: open stream:", s.name)
go s.pubWorker()
return nil
}
func (s *Stream) Publish(m *Message) {
if m.Ts == 0 {
m.Ts = time.Now().UnixNano()
}
s.mq <- m
}
func (s *Stream) MinMaxID() (int64, int64, error) {
return s.store.MinMaxID(s.name)
}
func (s *Stream) getBatches(maxItems int, maxTimeout time.Duration) chan []*Message {
// Create a channel to receive batches
batches := make(chan []*Message)
go func() {
defer close(batches)
for keepGoing := true; keepGoing; {
var batch []*Message
expire := time.After(maxTimeout)
for {
select {
case value, ok := <-s.mq:
if !ok {
keepGoing = false
goto done
}
batch = append(batch, value)
// if batch is full, return batch
if len(batch) == maxItems {
goto done
}
// if channel is empty, block for maxTimeout
case <-expire:
goto done
}
}
done:
if len(batch) > 0 {
batches <- batch
}
}
}()
return batches
}
func (s *Stream) pubWorker() {
log.Info("pub: Starting pub worker...")
batches := s.getBatches(s.maxBatchSize, pullTimeout)
for batch := range batches {
// Put batch to store
err := s.store.PutMessages(s.name, batch)
if err != nil {
// TODO: Retry?
log.Error(err)
}
}
}