/
multiplexed_chunk_reader.go
92 lines (80 loc) · 2.3 KB
/
multiplexed_chunk_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package buffer
import (
"sync"
)
type readResult struct {
data []byte
err error
}
type multiplexedChunkReader struct {
r ChunkReader
lock sync.Mutex
pendingConsumers int
waitingConsumers []chan readResult
}
// newMultiplexedChunkReader creates a decorator for ChunkReader that
// multiplexes data on the stream to multiple consumers. Calling Read()
// on this stream will hang until all other consumers either call Read()
// or Close().
//
// This multiplexer is used by Buffer.CloneStream(), which can be used
// to implement advanced buffer replication strategies.
func newMultiplexedChunkReader(r ChunkReader, additionalConsumers int) ChunkReader {
return &multiplexedChunkReader{
r: r,
pendingConsumers: 1 + additionalConsumers,
}
}
func (r *multiplexedChunkReader) readAndShareWithOthers(currentConsumerContinues int) ([]byte, error) {
data, err := r.r.Read()
for _, c := range r.waitingConsumers {
c <- readResult{data: data, err: err}
}
r.pendingConsumers = len(r.waitingConsumers) + currentConsumerContinues
r.waitingConsumers = r.waitingConsumers[:0]
return data, err
}
func (r *multiplexedChunkReader) Read() ([]byte, error) {
r.lock.Lock()
if r.pendingConsumers <= 0 {
panic("Multiplexed chunk reader has no pending consumers")
}
r.pendingConsumers--
if r.pendingConsumers == 0 {
// Last consumer of the stream to call Read(). Call
// Read() on the underlying ChunkReader and share the
// data with the rest.
data, err := r.readAndShareWithOthers(1)
r.lock.Unlock()
return data, err
}
// At least one more consumer needs to call Read(). Wait for it
// to share its results.
c := make(chan readResult, 1)
r.waitingConsumers = append(r.waitingConsumers, c)
r.lock.Unlock()
result := <-c
return result.data, result.err
}
func (r *multiplexedChunkReader) Close() {
r.lock.Lock()
defer r.lock.Unlock()
if r.pendingConsumers <= 0 {
panic("Multiplexed chunk reader has no pending consumers")
}
r.pendingConsumers--
if r.pendingConsumers > 0 {
return
}
if len(r.waitingConsumers) == 0 {
// All other consumers have left. We're the last to
// close.
r.r.Close()
r.r = nil
} else {
// All other consumers are waiting for us to call
// Read(). Read on their behalf, but opt out from the
// next iteration.
r.readAndShareWithOthers(0)
}
}