-
Notifications
You must be signed in to change notification settings - Fork 194
/
data.go
113 lines (99 loc) · 2.16 KB
/
data.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package track
import (
"context"
"sync"
"time"
"go.uber.org/zap"
. "m7s.live/engine/v4/common"
"m7s.live/engine/v4/log"
"m7s.live/engine/v4/util"
)
type Data[T any] struct {
Base[T, *DataFrame[T]]
sync.Locker `json:"-" yaml:"-"` // 写入锁,可选,单一协程写入可以不加锁
}
func (dt *Data[T]) Init(n int) {
dt.Base.Init(n, NewDataFrame[T])
}
func (dt *Data[T]) Push(data T) {
if dt.Locker != nil {
dt.Lock()
defer dt.Unlock()
}
curValue := dt.Value
if log.Trace {
dt.Trace("push data", zap.Uint32("sequence", curValue.Sequence))
}
curValue.Data = data
dt.Step()
}
func (d *Data[T]) Play(ctx context.Context, onData func(*DataFrame[T]) error) (err error) {
d.Debug("play data track")
reader := DataReader[T]{}
for err = reader.StartRead(d.Ring); err == nil; err = reader.ReadNext() {
if log.Trace {
d.Trace("read data", zap.Uint32("sequence", reader.Value.Sequence))
}
if err = onData(reader.Value); err == nil {
err = ctx.Err()
}
if err != nil {
reader.Value.ReaderLeave()
return
}
}
return
}
func (d *Data[T]) Attach(s IStream) {
d.SetStuff(s)
if err := s.AddTrack(d).Await(); err != nil {
d.Error("attach data track failed", zap.Error(err))
} else {
d.Info("data track attached")
}
}
func (d *Data[T]) LastWriteTime() time.Time {
return d.LastValue.WriteTime
}
func NewDataTrack[T any](name string) (dt *Data[T]) {
dt = &Data[T]{}
dt.Init(10)
dt.SetStuff(name)
return
}
type RecycleData[T util.Recyclable] struct {
Data[T]
}
func (dt *RecycleData[T]) Push(data T) {
if dt.Locker != nil {
dt.Lock()
defer dt.Unlock()
}
curValue := dt.Value
if log.Trace {
dt.Trace("push data", zap.Uint32("sequence", curValue.Sequence))
}
curValue.Data = data
dt.Step()
if !dt.Value.WriteTime.IsZero() {
dt.Value.Data.Recycle()
}
}
func NewRecycleDataTrack[T util.Recyclable](name string) (dt *RecycleData[T]) {
dt = &RecycleData[T]{}
dt.Init(10)
dt.SetStuff(name)
return
}
type BytesData struct {
RecycleData[*util.ListItem[util.Buffer]]
Pool util.BytesPool
}
func NewBytesDataTrack(name string) (dt *BytesData) {
dt = &BytesData{
Pool: make(util.BytesPool, 17),
}
dt.Init(10)
dt.SetStuff(name)
return
}