-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
Copy pathclient.go
240 lines (216 loc) · 6.1 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
// Copyright 2016 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package bytestream provides a client for any service that exposes a ByteStream API.
//
// Note: This package is a work-in-progress. Backwards-incompatible changes should be expected.
package bytestream
// This file contains the client implementation of Bytestream declared at:
// https://github.com/googleapis/googleapis/blob/master/google/bytestream/bytestream.proto
import (
"context"
"fmt"
"math/rand"
"time"
"google.golang.org/grpc"
pb "google.golang.org/genproto/googleapis/bytestream"
)
const (
// MaxBufSize is the maximum buffer size (in bytes) received in a read chunk or sent in a write chunk.
MaxBufSize = 2 * 1024 * 1024
backoffBase = 10 * time.Millisecond
backoffMax = 1 * time.Second
maxTries = 5
)
// Client is the go wrapper around a ByteStreamClient and provides an interface to it.
type Client struct {
client pb.ByteStreamClient
options []grpc.CallOption
}
// NewClient creates a new bytestream.Client.
func NewClient(cc *grpc.ClientConn, options ...grpc.CallOption) *Client {
return &Client{
client: pb.NewByteStreamClient(cc),
options: options,
}
}
// Reader reads from a byte stream.
type Reader struct {
ctx context.Context
c *Client
readClient pb.ByteStream_ReadClient
resourceName string
err error
buf []byte
}
// ResourceName gets the resource name this Reader is reading.
func (r *Reader) ResourceName() string {
return r.resourceName
}
// Read implements io.Reader.
// Read buffers received bytes that do not fit in p.
func (r *Reader) Read(p []byte) (int, error) {
if r.err != nil {
return 0, r.err
}
var backoffDelay time.Duration
for tries := 0; len(r.buf) == 0 && tries < maxTries; tries++ {
// No data in buffer.
resp, err := r.readClient.Recv()
if err != nil {
r.err = err
return 0, err
}
r.buf = resp.Data
if len(r.buf) != 0 {
break
}
// back off
if backoffDelay < backoffBase {
backoffDelay = backoffBase
} else {
backoffDelay = time.Duration(float64(backoffDelay) * 1.3 * (1 - 0.4*rand.Float64()))
}
if backoffDelay > backoffMax {
backoffDelay = backoffMax
}
select {
case <-time.After(backoffDelay):
case <-r.ctx.Done():
if err := r.ctx.Err(); err != nil {
r.err = err
}
return 0, r.err
}
}
// Copy from buffer.
n := copy(p, r.buf)
r.buf = r.buf[n:]
return n, nil
}
// Close implements io.Closer.
func (r *Reader) Close() error {
if r.readClient == nil {
return nil
}
err := r.readClient.CloseSend()
r.readClient = nil
return err
}
// NewReader creates a new Reader to read a resource.
func (c *Client) NewReader(ctx context.Context, resourceName string) (*Reader, error) {
return c.NewReaderAt(ctx, resourceName, 0)
}
// NewReaderAt creates a new Reader to read a resource from the given offset.
func (c *Client) NewReaderAt(ctx context.Context, resourceName string, offset int64) (*Reader, error) {
// readClient is set up for Read(). ReadAt() will copy needed fields into its reentrantReader.
readClient, err := c.client.Read(ctx, &pb.ReadRequest{
ResourceName: resourceName,
ReadOffset: offset,
}, c.options...)
if err != nil {
return nil, err
}
return &Reader{
ctx: ctx,
c: c,
resourceName: resourceName,
readClient: readClient,
}, nil
}
// Writer writes to a byte stream.
type Writer struct {
ctx context.Context
writeClient pb.ByteStream_WriteClient
resourceName string
offset int64
err error
}
// ResourceName gets the resource name this Writer is writing.
func (w *Writer) ResourceName() string {
return w.resourceName
}
// Write implements io.Writer.
func (w *Writer) Write(p []byte) (int, error) {
if w.err != nil {
return 0, w.err
}
n := 0
for n < len(p) {
bufSize := len(p) - n
if bufSize > MaxBufSize {
bufSize = MaxBufSize
}
r := pb.WriteRequest{
WriteOffset: w.offset,
FinishWrite: false,
Data: p[n : n+bufSize],
}
// Bytestream only requires the resourceName to be sent in the first WriteRequest.
if w.offset == 0 {
r.ResourceName = w.resourceName
}
err := w.writeClient.Send(&r)
if err != nil {
w.err = err
return n, err
}
w.offset += int64(bufSize)
n += bufSize
}
return n, nil
}
// Close implements io.Closer. It is the caller's responsibility to call Close() when writing is done.
func (w *Writer) Close() error {
err := w.writeClient.Send(&pb.WriteRequest{
ResourceName: w.resourceName,
WriteOffset: w.offset,
FinishWrite: true,
Data: nil,
})
if err != nil {
w.err = err
return fmt.Errorf("Send(WriteRequest< FinishWrite >) failed: %v", err)
}
resp, err := w.writeClient.CloseAndRecv()
if err != nil {
w.err = err
return fmt.Errorf("CloseAndRecv: %v", err)
}
if resp == nil {
err = fmt.Errorf("expected a response on close, got %v", resp)
} else if resp.CommittedSize != w.offset {
err = fmt.Errorf("server only wrote %d bytes, want %d", resp.CommittedSize, w.offset)
}
w.err = err
return err
}
// NewWriter creates a new Writer to write a resource.
//
// resourceName specifies the name of the resource.
// The resource will be available after Close has been called.
//
// It is the caller's responsibility to call Close when writing is done.
//
// TODO: There is currently no way to resume a write. Maybe NewWriter should begin with a call to QueryWriteStatus.
func (c *Client) NewWriter(ctx context.Context, resourceName string) (*Writer, error) {
wc, err := c.client.Write(ctx, c.options...)
if err != nil {
return nil, err
}
return &Writer{
ctx: ctx,
writeClient: wc,
resourceName: resourceName,
}, nil
}