-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
replayable_request.go
155 lines (130 loc) · 4.25 KB
/
replayable_request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
import (
"bytes"
"io"
"sync"
"github.com/dapr/kit/byteslicepool"
streamutils "github.com/dapr/kit/streams"
)
// Minimum capacity for the slices is 2KB
const minByteSliceCapacity = 2 << 10
// Contain pools of *bytes.Buffer and []byte objects.
// Used to reduce the number of allocations in replayableRequest for buffers and relieve pressure on the GC.
var (
bufPool = sync.Pool{New: newBuffer}
bsPool = byteslicepool.NewByteSlicePool(minByteSliceCapacity)
)
func newBuffer() any {
return new(bytes.Buffer)
}
// replayableRequest is implemented by InvokeMethodRequest and InvokeMethodResponse
type replayableRequest struct {
data io.Reader
replay *bytes.Buffer
lock sync.Mutex
currentTeeReader *streamutils.TeeReadCloser
currentData []byte
}
// WithRawData sets message data.
func (rr *replayableRequest) WithRawData(data io.Reader) {
rr.lock.Lock()
defer rr.lock.Unlock()
if rr.replay != nil {
// We are panicking here because we can't return errors
// This is just to catch issues during development however, and will never happen at runtime
panic("WithRawData cannot be invoked after replaying has been enabled")
}
rr.data = data
}
// SetReplay enables replaying for the data stream.
func (rr *replayableRequest) SetReplay(enabled bool) {
rr.lock.Lock()
defer rr.lock.Unlock()
if !enabled {
rr.closeReplay()
} else if rr.replay == nil {
rr.replay = bufPool.Get().(*bytes.Buffer)
rr.replay.Reset()
}
}
// CanReplay returns true if the data stream can be replayed.
func (rr *replayableRequest) CanReplay() bool {
return rr.replay != nil
}
// RawData returns the stream body.
func (rr *replayableRequest) RawData() (r io.Reader) {
rr.lock.Lock()
defer rr.lock.Unlock()
// If there's a previous TeeReadCloser, stop it so readers won't add more data into its replay buffer
if rr.currentTeeReader != nil {
_ = rr.currentTeeReader.Stop()
}
if rr.data == nil {
// If there's no data, and there's never been, just return a reader with no data
r = bytes.NewReader(nil)
} else if rr.replay != nil {
// If there's replaying enabled, we need to create a new TeeReadCloser
// We need to copy the data read insofar from the reply buffer because the buffer becomes invalid after new data is written into the it, then reset the buffer
l := rr.replay.Len()
// Get a new byte slice from the pool if we don't have one, and ensure it has enough capacity
if rr.currentData == nil {
rr.currentData = bsPool.Get(l)
}
rr.currentData = bsPool.Resize(rr.currentData, l)
// Copy the data from the replay buffer into the byte slice
copy(rr.currentData[0:l], rr.replay.Bytes())
rr.replay.Reset()
// Create a new TeeReadCloser that reads from the previously-read data and then the data not yet processed
// The TeeReadCloser also keeps all the data it reads into the replay buffer
mr := streamutils.NewMultiReaderCloser(
bytes.NewReader(rr.currentData[0:l]),
rr.data,
)
rr.currentTeeReader = streamutils.NewTeeReadCloser(mr, rr.replay)
r = rr.currentTeeReader
} else {
// No replay enabled
r = rr.data
}
return r
}
func (rr *replayableRequest) closeReplay() {
// Return the buffer and byte slice to the pools if we got one
if rr.replay != nil {
bufPool.Put(rr.replay)
rr.replay = nil
}
if rr.currentData != nil {
bsPool.Put(rr.currentData)
rr.currentData = nil
}
}
// Close the data stream and replay buffers.
// It's safe to call Close multiple times on the same object.
func (rr *replayableRequest) Close() (err error) {
rr.lock.Lock()
defer rr.lock.Unlock()
rr.closeReplay()
if rr.data != nil {
if rc, ok := rr.data.(io.Closer); ok {
err = rc.Close()
if err != nil {
return err
}
}
rr.data = nil
}
return nil
}