-
Notifications
You must be signed in to change notification settings - Fork 1
/
datastream.go
82 lines (67 loc) · 1.27 KB
/
datastream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package ssp
import (
"sync/atomic"
"github.com/affo/ssp/values"
)
var _ Transport = (*infiniteStream)(nil)
type DataStream interface {
Next() values.Value
}
type sliceStream struct {
i int
vs []values.Value
}
func NewIntValues(ints ...int) []values.Value {
vs := make([]values.Value, 0, len(ints))
for _, i := range ints {
vs = append(vs, values.New(i))
}
return vs
}
func NewStreamFromElements(elems ...values.Value) DataStream {
return &sliceStream{
vs: elems,
}
}
func (s *sliceStream) Next() values.Value {
if s.i >= len(s.vs) {
return nil
}
v := s.vs[s.i]
s.i++
return v
}
const defaultBufferSize = 1024
type infiniteStream struct {
s chan values.Value
bufferSize int
closed int64
}
func NewInfiniteStream() *infiniteStream {
is := &infiniteStream{
bufferSize: defaultBufferSize,
}
is.s = make(chan values.Value, is.bufferSize)
return is
}
func (s *infiniteStream) Collect(v values.Value) {
s.s <- v
}
func (s *infiniteStream) isClosed() bool {
return atomic.LoadInt64(&s.closed) != 0
}
func (s *infiniteStream) close() {
atomic.StoreInt64(&s.closed, 1)
close(s.s)
}
func (s *infiniteStream) Next() values.Value {
if s.isClosed() {
return nil
}
v := <-s.s
if v.Type() == values.Close {
s.close()
return nil
}
return v
}