-
Notifications
You must be signed in to change notification settings - Fork 88
/
byte_stream_server.go
134 lines (117 loc) · 3.92 KB
/
byte_stream_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package grpcservers
import (
"context"
"io"
remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/buffer"
"github.com/buildbarn/bb-storage/pkg/digest"
"google.golang.org/genproto/googleapis/bytestream"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type byteStreamServer struct {
blobAccess blobstore.BlobAccess
readChunkSize int
}
// NewByteStreamServer creates a GRPC service for reading blobs from and
// writing blobs to a BlobAccess. It is used by Bazel to access the
// Content Addressable Storage (CAS).
func NewByteStreamServer(blobAccess blobstore.BlobAccess, readChunkSize int) bytestream.ByteStreamServer {
return &byteStreamServer{
blobAccess: blobAccess,
readChunkSize: readChunkSize,
}
}
func (s *byteStreamServer) Read(in *bytestream.ReadRequest, out bytestream.ByteStream_ReadServer) error {
if in.ReadLimit != 0 {
return status.Error(codes.Unimplemented, "This service does not support downloading partial files")
}
digest, compressor, err := digest.NewDigestFromByteStreamReadPath(in.ResourceName)
if err != nil {
return err
}
if compressor != remoteexecution.Compressor_IDENTITY {
return status.Error(codes.Unimplemented, "This service does not support downloading compressed files")
}
r := s.blobAccess.Get(out.Context(), digest).ToChunkReader(in.ReadOffset, s.readChunkSize)
defer r.Close()
for {
readBuf, readErr := r.Read()
if readErr == io.EOF {
return nil
}
if readErr != nil {
return readErr
}
if writeErr := out.Send(&bytestream.ReadResponse{Data: readBuf}); writeErr != nil {
return writeErr
}
}
}
type byteStreamWriteServerChunkReader struct {
stream bytestream.ByteStream_WriteServer
writeOffset int64
data []byte
finishedWrite bool
}
func (r *byteStreamWriteServerChunkReader) setRequest(request *bytestream.WriteRequest) error {
if r.finishedWrite {
return status.Error(codes.InvalidArgument, "Client closed stream twice")
}
if request.WriteOffset != r.writeOffset {
return status.Errorf(codes.InvalidArgument, "Attempted to write at offset %d, while %d was expected", request.WriteOffset, r.writeOffset)
}
r.writeOffset += int64(len(request.Data))
r.data = request.Data
r.finishedWrite = request.FinishWrite
return nil
}
func (r *byteStreamWriteServerChunkReader) Read() ([]byte, error) {
// Read next chunk if no data is present.
if len(r.data) == 0 {
request, err := r.stream.Recv()
if err != nil {
if err == io.EOF && !r.finishedWrite {
return nil, status.Error(codes.InvalidArgument, "Client closed stream without finishing write")
}
return nil, err
}
if err := r.setRequest(request); err != nil {
return nil, err
}
}
data := r.data
r.data = nil
return data, nil
}
func (r *byteStreamWriteServerChunkReader) Close() {}
func (s *byteStreamServer) Write(stream bytestream.ByteStream_WriteServer) error {
request, err := stream.Recv()
if err != nil {
return err
}
digest, compressor, err := digest.NewDigestFromByteStreamWritePath(request.ResourceName)
if err != nil {
return err
}
if compressor != remoteexecution.Compressor_IDENTITY {
return status.Error(codes.Unimplemented, "This service does not support uploading compressed files")
}
r := &byteStreamWriteServerChunkReader{stream: stream}
if err := r.setRequest(request); err != nil {
return err
}
if err := s.blobAccess.Put(
stream.Context(),
digest,
buffer.NewCASBufferFromChunkReader(digest, r, buffer.UserProvided)); err != nil {
return err
}
return stream.SendAndClose(&bytestream.WriteResponse{
CommittedSize: digest.GetSizeBytes(),
})
}
func (s *byteStreamServer) QueryWriteStatus(ctx context.Context, in *bytestream.QueryWriteStatusRequest) (*bytestream.QueryWriteStatusResponse, error) {
return nil, status.Error(codes.Unimplemented, "This service does not support querying write status")
}