-
Notifications
You must be signed in to change notification settings - Fork 2
/
lhr_buffer.go
211 lines (178 loc) · 4.87 KB
/
lhr_buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
package live
import (
"io"
"sync"
elog "github.com/eluv-io/log-go"
)
type RWBuffer struct {
ch [][]byte
front int
rear int // rear-1 is the index of last element
capacity int // Capacity of the queue (max number of elements in the queue)
sz int // Total size of elements in the queue
count int // Number of elements in the queue
inReadBuf []byte // Partially read packet by avpipe
inReadIndex int // Index of partially read data from a packet
m *sync.Mutex
cond *sync.Cond
closed RWBufferCloseState
}
type RWBufferCloseState int
const (
RWBufferOpen RWBufferCloseState = iota
RWBufferReadClosed
RWBufferWriteClosed
RWBufferClosed
)
var blog = elog.Get("/eluvio/avpipe/live/rwb")
/*
* Creates a RWBuffer which is open for reading/writing.
* The writer can write to the buffer until Close(RWBufferWriteClosed) is called.
* The reader can read from the buffer until Close(RWBufferReadClosed) is called or
* EOF is issued.
* An EOF is issued for reader when the writer closed the buffer and there is no data
* in the buffer.
*/
func NewRWBuffer(capacity int) io.ReadWriteCloser {
if capacity < 0 {
return nil
}
rwb := &RWBuffer{
ch: make([][]byte, capacity),
sz: 0,
count: 0,
front: 0,
rear: -1,
capacity: capacity,
closed: 0,
}
rwb.m = &sync.Mutex{}
rwb.cond = sync.NewCond(rwb.m)
return rwb
}
/*
* It simply makes a copy from buf and enqueues the new copy of buf.
* For more improvemnt it can avoid copying buffer buf by passing the ownership of buffer buf to rwb (RM).
*/
func (rwb *RWBuffer) Write(buf []byte) (n int, err error) {
b := make([]byte, len(buf))
copy(b, buf)
rwb.m.Lock()
defer rwb.m.Unlock()
if rwb.closed&RWBufferWriteClosed != 0 {
blog.Debug("Write RWBuffer WRITE closed")
return 0, io.ErrClosedPipe
}
if rwb.closed&RWBufferReadClosed != 0 {
blog.Debug("Write RWBuffer READ closed")
return 0, io.ErrClosedPipe
}
if rwb.count >= rwb.capacity {
blog.Warn("RWBuffer buffer queue is full", "capacity", rwb.capacity)
rwb.cond.Wait()
}
rwb.sz += len(buf)
rwb.count++
rwb.rear = (rwb.rear + 1) % rwb.capacity
rwb.ch[rwb.rear] = b
rwb.cond.Broadcast()
return len(buf), nil
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
/*
* Reads one packet at a time into buf.
* It is possible avpipe asks for data even less than a packet size, which can cause a partial read.
* RWBuffer can keep track of a partial read and will return the rest of the packet in next Read() call.
*/
func (rwb *RWBuffer) Read(buf []byte) (n int, err error) {
rwb.m.Lock()
defer rwb.m.Unlock()
if rwb.closed&RWBufferReadClosed != 0 {
return 0, io.ErrClosedPipe
}
nCopied := 0
/*
* If there is a partially read packet by avpipe, continue reading the rest of the packet
* into buf. (The inReadBuf points to a partially read packet from buffer).
*/
if rwb.inReadBuf != nil {
nCopied = min(len(buf), len(rwb.inReadBuf[rwb.inReadIndex:]))
copy(buf, rwb.inReadBuf[rwb.inReadIndex:rwb.inReadIndex+nCopied])
if nCopied == len(rwb.inReadBuf[rwb.inReadIndex:]) {
rwb.inReadBuf = nil
rwb.inReadIndex = 0
} else {
rwb.inReadIndex += nCopied
}
rwb.sz -= nCopied
//fmt.Printf("Read partial len=%d, sz=%d, start=%d, end=%d, nCopied=%d, inReadIndex=%d\n",
// len(rwb.inReadBuf[rwb.inReadIndex:]), rwb.sz, rwb.front, rwb.rear, nCopied, rwb.inReadIndex)
return nCopied, nil
}
for {
if rwb.closed&RWBufferReadClosed != 0 {
blog.Debug("Read reading from RWBuffer closed")
break
}
if rwb.closed&RWBufferWriteClosed != 0 && rwb.count <= 0 {
blog.Debug("Read writing to RWBuffer closed and no data")
break
}
if nCopied == len(buf) {
break
}
if rwb.count <= 0 {
// Qeueue is empty, wait for a Write()
rwb.cond.Wait()
}
if rwb.count <= 0 {
continue
}
rwb.count--
b := rwb.ch[rwb.front]
nCopied = min(len(buf), len(b))
copy(buf, b[:nCopied])
if nCopied < len(b) {
rwb.inReadBuf = b
rwb.inReadIndex = nCopied
}
rwb.front = (rwb.front + 1) % rwb.capacity
//fmt.Printf("Read len(buf)=%d, sz=%d, start=%d, end=%d, nCopied=%d, inReadIndex=%d\n",
// len(buf), rwb.sz, rwb.front, rwb.rear, nCopied, rwb.inReadIndex)
rwb.sz -= nCopied
break
}
rwb.cond.Broadcast()
if rwb.closed&RWBufferWriteClosed != 0 && rwb.count <= 0 && nCopied == 0 {
blog.Debug("Read RWBuffer EOF")
return 0, io.EOF
}
return nCopied, nil
}
func (rwb *RWBuffer) Size() int {
rwb.m.Lock()
defer rwb.m.Unlock()
return rwb.sz
}
func (rwb *RWBuffer) Len() int {
rwb.m.Lock()
defer rwb.m.Unlock()
return rwb.count
}
// io.Closer
func (rwb *RWBuffer) Close() error {
return rwb.CloseSide(RWBufferWriteClosed)
}
func (rwb *RWBuffer) CloseSide(state RWBufferCloseState) error {
rwb.m.Lock()
defer rwb.m.Unlock()
rwb.closed |= state
blog.Debug("Close RWBuffer", "state", state)
rwb.cond.Broadcast()
return nil
}