/
service.go
118 lines (103 loc) · 3.68 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
/*
Package blobs contains a gRPC service to be used for remote file access.
It is used for bulk file reads and writes to files on any CockroachDB node.
Each node will run a blob service, which serves the file access for files on
that node. Each node will also have a blob client, which uses the nodedialer
to connect to another node's blob service, and access its files. The blob client
is the point of entry to this service and it supports the `BlobClient` interface,
which includes the following functionalities:
- ReadFile
- WriteFile
- List
- Delete
- Stat
*/
package blobs
import (
"context"
"io"
"github.com/cockroachdb/cockroach/pkg/blobs/blobspb"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Service implements the gRPC BlobService which exchanges bulk files between different nodes.
type Service struct {
localStorage *LocalStorage
}
var _ blobspb.BlobServer = &Service{}
// NewBlobService instantiates a blob service server.
func NewBlobService(externalIODir string) (*Service, error) {
localStorage, err := NewLocalStorage(externalIODir)
return &Service{localStorage: localStorage}, err
}
// GetStream implements the gRPC service.
func (s *Service) GetStream(req *blobspb.GetRequest, stream blobspb.Blob_GetStreamServer) error {
content, _, err := s.localStorage.ReadFile(req.Filename, req.Offset)
if err != nil {
return err
}
defer content.Close(stream.Context())
return streamContent(stream.Context(), stream, content)
}
// PutStream implements the gRPC service.
func (s *Service) PutStream(stream blobspb.Blob_PutStreamServer) error {
filename, ok := grpcutil.FastFirstValueFromIncomingContext(stream.Context(), "filename")
if !ok {
return errors.New("could not fetch metadata or no filename in metadata")
}
if filename == "" {
return errors.New("invalid filename in metadata")
}
reader := newPutStreamReader(stream)
defer reader.Close(stream.Context())
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
w, err := s.localStorage.Writer(ctx, filename)
if err != nil {
cancel()
return err
}
if _, err := io.Copy(w, ioctx.ReaderCtxAdapter(stream.Context(), reader)); err != nil {
cancel()
return errors.CombineErrors(err, w.Close())
}
err = w.Close()
cancel()
return err
}
// List implements the gRPC service.
func (s *Service) List(
ctx context.Context, req *blobspb.GlobRequest,
) (*blobspb.GlobResponse, error) {
matches, err := s.localStorage.List(req.Pattern)
return &blobspb.GlobResponse{Files: matches}, err
}
// Delete implements the gRPC service.
func (s *Service) Delete(
ctx context.Context, req *blobspb.DeleteRequest,
) (*blobspb.DeleteResponse, error) {
return &blobspb.DeleteResponse{}, s.localStorage.Delete(req.Filename)
}
// Stat implements the gRPC service.
func (s *Service) Stat(ctx context.Context, req *blobspb.StatRequest) (*blobspb.BlobStat, error) {
resp, err := s.localStorage.Stat(req.Filename)
if oserror.IsNotExist(err) {
// gRPC hides the underlying golang ErrNotExist error, so we send back an
// equivalent gRPC error which can be handled gracefully on the client side.
return nil, status.Error(codes.NotFound, err.Error())
}
return resp, err
}