/
retry_reader.go
192 lines (166 loc) · 7.45 KB
/
retry_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
//go:build go1.18
// +build go1.18
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
package blob
import (
"context"
"io"
"net"
"strings"
"sync"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
)
// HTTPGetter is a function type that refers to a method that performs an HTTP GET operation.
type httpGetter func(ctx context.Context, i httpGetterInfo) (io.ReadCloser, error)
// HTTPGetterInfo is passed to an HTTPGetter function passing it parameters
// that should be used to make an HTTP GET request.
type httpGetterInfo struct {
Range HTTPRange
// ETag specifies the resource's etag that should be used when creating
// the HTTP GET request's If-Match header
ETag *azcore.ETag
}
// RetryReaderOptions configures the retry reader's behavior.
// Zero-value fields will have their specified default values applied during use.
// This allows for modification of a subset of fields.
type RetryReaderOptions struct {
// MaxRetries specifies the maximum number of attempts a failed read will be retried
// before producing an error.
// The default value is three.
MaxRetries int32
// OnFailedRead, when non-nil, is called after any failure to read. Expected usage is diagnostic logging.
OnFailedRead func(failureCount int32, lastError error, rnge HTTPRange, willRetry bool)
// EarlyCloseAsError can be set to true to prevent retries after "read on closed response body". By default,
// retryReader has the following special behaviour: closing the response body before it is all read is treated as a
// retryable error. This is to allow callers to force a retry by closing the body from another goroutine (e.g. if the =
// read is too slow, caller may want to force a retry in the hope that the retry will be quicker). If
// TreatEarlyCloseAsError is true, then retryReader's special behaviour is suppressed, and "read on closed body" is instead
// treated as a fatal (non-retryable) error.
// Note that setting TreatEarlyCloseAsError only guarantees that Closing will produce a fatal error if the Close happens
// from the same "thread" (goroutine) as Read. Concurrent Close calls from other goroutines may instead produce network errors
// which will be retried.
// The default value is false.
EarlyCloseAsError bool
doInjectError bool
doInjectErrorRound int32
injectedError error
}
// RetryReader attempts to read from response, and if there is a retry-able network error
// returned during reading, it will retry according to retry reader option through executing
// user defined action with provided data to get a new response, and continue the overall reading process
// through reading from the new response.
// RetryReader implements the io.ReadCloser interface.
type RetryReader struct {
ctx context.Context
info httpGetterInfo
retryReaderOptions RetryReaderOptions
getter httpGetter
countWasBounded bool
// we support Close-ing during Reads (from other goroutines), so we protect the shared state, which is response
responseMu *sync.Mutex
response io.ReadCloser
}
// newRetryReader creates a retry reader.
func newRetryReader(ctx context.Context, initialResponse io.ReadCloser, info httpGetterInfo, getter httpGetter, o RetryReaderOptions) *RetryReader {
if o.MaxRetries < 1 {
o.MaxRetries = 3
}
return &RetryReader{
ctx: ctx,
getter: getter,
info: info,
countWasBounded: info.Range.Count != CountToEnd,
response: initialResponse,
responseMu: &sync.Mutex{},
retryReaderOptions: o,
}
}
// setResponse function
func (s *RetryReader) setResponse(r io.ReadCloser) {
s.responseMu.Lock()
defer s.responseMu.Unlock()
s.response = r
}
// Read from retry reader
func (s *RetryReader) Read(p []byte) (n int, err error) {
for try := int32(0); ; try++ {
//fmt.Println(try) // Comment out for debugging.
if s.countWasBounded && s.info.Range.Count == CountToEnd {
// User specified an original count and the remaining bytes are 0, return 0, EOF
return 0, io.EOF
}
s.responseMu.Lock()
resp := s.response
s.responseMu.Unlock()
if resp == nil { // We don't have a response stream to read from, try to get one.
newResponse, err := s.getter(s.ctx, s.info)
if err != nil {
return 0, err
}
// Successful GET; this is the network stream we'll read from.
s.setResponse(newResponse)
resp = newResponse
}
n, err := resp.Read(p) // Read from the stream (this will return non-nil err if forceRetry is called, from another goroutine, while it is running)
// Injection mechanism for testing.
if s.retryReaderOptions.doInjectError && try == s.retryReaderOptions.doInjectErrorRound {
if s.retryReaderOptions.injectedError != nil {
err = s.retryReaderOptions.injectedError
} else {
err = &net.DNSError{IsTemporary: true}
}
}
// We successfully read data or end EOF.
if err == nil || err == io.EOF {
s.info.Range.Offset += int64(n) // Increments the start offset in case we need to make a new HTTP request in the future
if s.info.Range.Count != CountToEnd {
s.info.Range.Count -= int64(n) // Decrement the count in case we need to make a new HTTP request in the future
}
return n, err // Return the return to the caller
}
_ = s.Close()
s.setResponse(nil) // Our stream is no longer good
// Check the retry count and error code, and decide whether to retry.
retriesExhausted := try >= s.retryReaderOptions.MaxRetries
_, isNetError := err.(net.Error)
isUnexpectedEOF := err == io.ErrUnexpectedEOF
willRetry := (isNetError || isUnexpectedEOF || s.wasRetryableEarlyClose(err)) && !retriesExhausted
// Notify, for logging purposes, of any failures
if s.retryReaderOptions.OnFailedRead != nil {
failureCount := try + 1 // because try is zero-based
s.retryReaderOptions.OnFailedRead(failureCount, err, s.info.Range, willRetry)
}
if willRetry {
continue
// Loop around and try to get and read from new stream.
}
return n, err // Not retryable, or retries exhausted, so just return
}
}
// By default, we allow early Closing, from another concurrent goroutine, to be used to force a retry
// Is this safe, to close early from another goroutine? Early close ultimately ends up calling
// net.Conn.Close, and that is documented as "Any blocked Read or Write operations will be unblocked and return errors"
// which is exactly the behaviour we want.
// NOTE: that if caller has forced an early Close from a separate goroutine (separate from the Read)
// then there are two different types of error that may happen - either the one we check for here,
// or a net.Error (due to closure of connection). Which one happens depends on timing. We only need this routine
// to check for one, since the other is a net.Error, which our main Read retry loop is already handing.
func (s *RetryReader) wasRetryableEarlyClose(err error) bool {
if s.retryReaderOptions.EarlyCloseAsError {
return false // user wants all early closes to be errors, and so not retryable
}
// unfortunately, http.errReadOnClosedResBody is private, so the best we can do here is to check for its text
return strings.HasSuffix(err.Error(), ReadOnClosedBodyMessage)
}
// ReadOnClosedBodyMessage of retry reader
const ReadOnClosedBodyMessage = "read on closed response body"
// Close retry reader
func (s *RetryReader) Close() error {
s.responseMu.Lock()
defer s.responseMu.Unlock()
if s.response != nil {
return s.response.Close()
}
return nil
}