-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
Copy pathserver.go
238 lines (213 loc) · 9.64 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
// Copyright 2016 Google LLC.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package internal
// This file contains the server implementation of Bytestream declared at:
// https://github.com/googleapis/googleapis/blob/master/google/bytestream/bytestream.proto
//
// Bytestream uses bidirectional streaming (http://grpc.io/docs/guides/concepts.html#bidirectional-streaming-rpc).
import (
"context"
"fmt"
"io"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
pb "google.golang.org/genproto/googleapis/bytestream"
)
// ReadHandler reads from the Bytestream.
// Note: error returns must return an instance of grpc.rpcError unless otherwise handled in grpc-go/rpc_util.go.
// http://google.golang.org/grpc provides Errorf(code, fmt, ...) to create instances of grpc.rpcError.
// Note: Cancelling the context will abort the stream ("drop the connection"). Consider returning a non-nil error instead.
type ReadHandler interface {
// GetReader provides an io.ReaderAt, which will not be retained by the Server after the pb.ReadRequest.
GetReader(ctx context.Context, name string) (io.ReaderAt, error)
// Close does not have to do anything, but is here for if the io.ReaderAt wants to call Close().
Close(ctx context.Context, name string) error
}
// WriteHandler handles writes from the Bytestream. For example:
// Note: error returns must return an instance of grpc.rpcError unless otherwise handled in grpc-go/rpc_util.go.
// grpc-go/rpc_util.go provides the helper func Errorf(code, fmt, ...) to create instances of grpc.rpcError.
// Note: Cancelling the context will abort the stream ("drop the connection"). Consider returning a non-nil error instead.
type WriteHandler interface {
// GetWriter provides an io.Writer that is ready to write at initOffset.
// The io.Writer will not be retained by the Server after the pb.WriteRequest.
GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error)
// Close does not have to do anything, but is related to Server.AllowOverwrite. Or if the io.Writer simply wants a Close() call.
// Close is called when the server receives a pb.WriteRequest with finish_write = true.
// If Server.AllowOverwrite == true then Close() followed by GetWriter() for the same name indicates the name is being overwritten, even if the initOffset is different.
Close(ctx context.Context, name string) error
}
// Internal service that implements pb.ByteStreamServer. Because the methods Write() and Read() are exported for grpc to link against,
// grpcService is deliberately not exported so go code cannot call grpcService.Write() or grpcService.Read().
type grpcService struct {
parent *Server
}
// Server wraps the RPCs in pb. Use bytestream.NewServer() to create a Server.
type Server struct {
status map[string]*pb.QueryWriteStatusResponse
readHandler ReadHandler
writeHandler WriteHandler
rpc *grpcService
// AllowOverwrite controls Server behavior when a WriteRequest with finish_write = true is followed by another WriteRequest.
AllowOverwrite bool
// Bytestream allows a WriteRequest to omit the resource name, in which case it will be appended to the last WriteRequest.
LastWrittenResource string
}
// NewServer creates a new bytestream.Server using gRPC.
// gsrv is the *grpc.Server this bytestream.Server will listen on.
// readHandler handles any incoming pb.ReadRequest or nil which means all pb.ReadRequests will be rejected.
// writeHandler handles any incoming pb.WriteRequest or nil which means all pb.WriteRequests will be rejected.
// readHandler and writeHandler cannot both be nil.
func NewServer(gsrv *grpc.Server, readHandler ReadHandler, writeHandler WriteHandler) (*Server, error) {
if readHandler == nil && writeHandler == nil {
return nil, fmt.Errorf("readHandler and writeHandler cannot both be nil")
}
server := &Server{
status: make(map[string]*pb.QueryWriteStatusResponse),
readHandler: readHandler,
writeHandler: writeHandler,
rpc: &grpcService{},
}
server.rpc.parent = server
// Register a server.
pb.RegisterByteStreamServer(gsrv, server.rpc)
return server, nil
}
// Write handles the pb.ByteStream_WriteServer and sends a pb.WriteResponse
// Implements bytestream.proto "rpc Write(stream WriteRequest) returns (WriteResponse)".
func (rpc *grpcService) Write(stream pb.ByteStream_WriteServer) error {
for {
writeReq, err := stream.Recv()
if err == io.EOF {
// io.EOF errors are a non-error for the Write() caller.
return nil
} else if err != nil {
return grpc.Errorf(codes.Unknown, "stream.Recv() failed: %v", err)
}
if rpc.parent.writeHandler == nil {
return grpc.Errorf(codes.Unimplemented, "instance of NewServer(writeHandler = nil) rejects all writes")
}
status, ok := rpc.parent.status[writeReq.ResourceName]
if !ok {
// writeReq.ResourceName is a new resource name.
if writeReq.ResourceName == "" {
return grpc.Errorf(codes.InvalidArgument, "WriteRequest: empty or missing resource_name")
}
status = &pb.QueryWriteStatusResponse{
CommittedSize: writeReq.WriteOffset,
}
rpc.parent.status[writeReq.ResourceName] = status
} else {
// writeReq.ResourceName has already been seen by this server.
if status.Complete {
if !rpc.parent.AllowOverwrite {
return grpc.Errorf(codes.InvalidArgument, "%q finish_write = true already, got %d byte WriteRequest and Server.AllowOverwrite = false",
writeReq.ResourceName, len(writeReq.Data))
}
// Truncate the resource stream.
status.Complete = false
status.CommittedSize = writeReq.WriteOffset
}
}
if writeReq.WriteOffset != status.CommittedSize {
return grpc.Errorf(codes.FailedPrecondition, "%q write_offset=%d differs from server internal committed_size=%d",
writeReq.ResourceName, writeReq.WriteOffset, status.CommittedSize)
}
// WriteRequest with empty data is ok.
if len(writeReq.Data) != 0 {
writer, err := rpc.parent.writeHandler.GetWriter(stream.Context(), writeReq.ResourceName, status.CommittedSize)
if err != nil {
return grpc.Errorf(codes.Internal, "GetWriter(%q): %v", writeReq.ResourceName, err)
}
wroteLen, err := writer.Write(writeReq.Data)
if err != nil {
return grpc.Errorf(codes.Internal, "Write(%q): %v", writeReq.ResourceName, err)
}
status.CommittedSize += int64(wroteLen)
}
if writeReq.FinishWrite {
r := &pb.WriteResponse{CommittedSize: status.CommittedSize}
// Note: SendAndClose does NOT close the server stream.
if err = stream.SendAndClose(r); err != nil {
return grpc.Errorf(codes.Internal, "stream.SendAndClose(%q, WriteResponse{ %d }): %v", writeReq.ResourceName, status.CommittedSize, err)
}
status.Complete = true
if status.CommittedSize == 0 {
return grpc.Errorf(codes.FailedPrecondition, "writeHandler.Close(%q): 0 bytes written", writeReq.ResourceName)
}
if err = rpc.parent.writeHandler.Close(stream.Context(), writeReq.ResourceName); err != nil {
return grpc.Errorf(codes.Internal, "writeHandler.Close(%q): %v", writeReq.ResourceName, err)
}
}
}
}
// QueryWriteStatus implements bytestream.proto "rpc QueryWriteStatus(QueryWriteStatusRequest) returns (QueryWriteStatusResponse)".
// QueryWriteStatus returns the CommittedSize known to the server.
func (rpc *grpcService) QueryWriteStatus(ctx context.Context, request *pb.QueryWriteStatusRequest) (*pb.QueryWriteStatusResponse, error) {
s, ok := rpc.parent.status[request.ResourceName]
if !ok {
return nil, grpc.Errorf(codes.NotFound, "resource_name not found: QueryWriteStatusRequest %v", request)
}
return s, nil
}
func (rpc *grpcService) readFrom(request *pb.ReadRequest, reader io.ReaderAt, stream pb.ByteStream_ReadServer) error {
limit := int(request.ReadLimit)
if limit < 0 {
return grpc.Errorf(codes.InvalidArgument, "Read(): read_limit=%d is invalid", limit)
}
offset := request.ReadOffset
if offset < 0 {
return grpc.Errorf(codes.InvalidArgument, "Read(): offset=%d is invalid", offset)
}
var buf []byte
if limit > 0 {
buf = make([]byte, limit)
} else {
buf = make([]byte, 1024*1024) // 1M buffer is reasonable.
}
bytesSent := 0
for limit == 0 || bytesSent < limit {
n, err := reader.ReadAt(buf, offset)
if n > 0 {
if err := stream.Send(&pb.ReadResponse{Data: buf[:n]}); err != nil {
return grpc.Errorf(grpc.Code(err), "Send(resourceName=%q offset=%d): %v", request.ResourceName, offset, grpc.ErrorDesc(err))
}
} else if err == nil {
return grpc.Errorf(codes.Internal, "nil error on empty read: io.ReaderAt contract violated")
}
offset += int64(n)
bytesSent += n
if err == io.EOF {
break
}
if err != nil {
return grpc.Errorf(codes.Unknown, "ReadAt(resourceName=%q offset=%d): %v", request.ResourceName, offset, err)
}
}
return nil
}
// Read handles a pb.ReadRequest sending bytes to the pb.ByteStream_ReadServer
// Implements bytestream.proto "rpc Read(ReadRequest) returns (stream ReadResponse)"
func (rpc *grpcService) Read(request *pb.ReadRequest, stream pb.ByteStream_ReadServer) error {
if rpc.parent.readHandler == nil {
return grpc.Errorf(codes.Unimplemented, "instance of NewServer(readHandler = nil) rejects all reads")
}
if request == nil {
return grpc.Errorf(codes.Internal, "Read(ReadRequest == nil)")
}
if request.ResourceName == "" {
return grpc.Errorf(codes.InvalidArgument, "ReadRequest: empty or missing resource_name")
}
reader, err := rpc.parent.readHandler.GetReader(stream.Context(), request.ResourceName)
if err != nil {
return err
}
if err = rpc.readFrom(request, reader, stream); err != nil {
rpc.parent.readHandler.Close(stream.Context(), request.ResourceName)
return err
}
if err = rpc.parent.readHandler.Close(stream.Context(), request.ResourceName); err != nil {
return err
}
return nil
}