-
Notifications
You must be signed in to change notification settings - Fork 49
/
client.go
1146 lines (996 loc) · 42 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Package client contains a high-level remote execution client library.
package client
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net/http"
"os"
"os/user"
"strings"
"sync"
"time"
"errors"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/actas"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/balancer"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/chunker"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/retry"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/uploadinfo"
"golang.org/x/oauth2"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/oauth"
"google.golang.org/grpc/status"
// Redundant imports are required for the google3 mirror. Aliases should not be changed.
configpb "github.com/bazelbuild/remote-apis-sdks/go/pkg/balancer/proto"
regrpc "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
log "github.com/golang/glog"
bsgrpc "google.golang.org/genproto/googleapis/bytestream"
bspb "google.golang.org/genproto/googleapis/bytestream"
opgrpc "google.golang.org/genproto/googleapis/longrunning"
oppb "google.golang.org/genproto/googleapis/longrunning"
emptypb "google.golang.org/protobuf/types/known/emptypb"
)
const (
scopes = "https://www.googleapis.com/auth/cloud-platform"
// HomeDirMacro is replaced by the current user's home dir in the CredFile dial parameter.
HomeDirMacro = "${HOME}"
)
// ErrEmptySegment indicates an attempt to construct a resource name with an empty segment.
var ErrEmptySegment = errors.New("empty segment in resoure name")
// AuthType indicates the type of authentication being used.
type AuthType int
const (
// UnknownAuth refers to unknown authentication type.
UnknownAuth AuthType = iota
// NoAuth refers to no authentication when connecting to the RBE service.
NoAuth
// ExternalTokenAuth is used to connect to the RBE service.
ExternalTokenAuth
// CredsFileAuth refers to a JSON credentials file used to connect to the RBE service.
CredsFileAuth
// ApplicationDefaultCredsAuth refers to Google Application default credentials that is
// used to connect to the RBE service.
ApplicationDefaultCredsAuth
// GCECredsAuth refers to GCE machine credentials that is
// used to connect to the RBE service.
GCECredsAuth
)
// String returns a human readable form of authentication used to connect to RBE.
func (a AuthType) String() string {
switch a {
case NoAuth:
return "no authentication"
case ExternalTokenAuth:
return "external authentication token (gcert?)"
case CredsFileAuth:
return "credentials file"
case ApplicationDefaultCredsAuth:
return "application default credentials"
case GCECredsAuth:
return "gce credentials"
}
return "unknown authentication type"
}
// InitError is used to wrap the error returned when initializing a new
// client to also indicate the type of authentication used.
type InitError struct {
// Err refers to the underlying client initialization error.
Err error
// AuthUsed stores the type of authentication used to connect to RBE.
AuthUsed AuthType
}
// Error returns a string error that includes information about the
// type of auth used to connect to RBE.
func (ce *InitError) Error() string {
return fmt.Sprintf("%v, authentication type (identity) used=%q", ce.Err.Error(), ce.AuthUsed)
}
// GrpcClientConn allows accepting pre-created connections to be used when creating clients.
// It is only intended to be used by methods in this package.
// It is not intended for SDK users to depend on.
// It might be removed in future update.
type GrpcClientConn interface {
grpc.ClientConnInterface
io.Closer
}
// Client is a client to several services, including remote execution and services used in
// conjunction with remote execution. A Client must be constructed by calling Dial() or NewClient()
// rather than attempting to assemble it directly.
//
// Unless specified otherwise, and provided the fields are not modified, a Client is safe for
// concurrent use.
type Client struct {
// InstanceName is the instance name for the targeted remote execution instance; e.g. for Google
// RBE: "projects/<foo>/instances/default_instance".
// It should NOT be used to construct resource names, but rather only for reusing the instance name as is.
// Use the ResourceName method to create correctly formatted resource names.
InstanceName string
actionCache regrpc.ActionCacheClient
byteStream bsgrpc.ByteStreamClient
cas regrpc.ContentAddressableStorageClient
execution regrpc.ExecutionClient
operations opgrpc.OperationsClient
// Retrier is the Retrier that is used for RPCs made by this client.
//
// These fields are logically "protected" and are intended for use by extensions of Client.
Retrier *Retrier
connection GrpcClientConn
casConnection GrpcClientConn
// StartupCapabilities denotes whether to load ServerCapabilities on startup.
StartupCapabilities StartupCapabilities
// LegacyExecRootRelativeOutputs denotes whether outputs are relative to the exec root.
LegacyExecRootRelativeOutputs LegacyExecRootRelativeOutputs
// ChunkMaxSize is maximum chunk size to use for CAS uploads/downloads.
ChunkMaxSize ChunkMaxSize
// CompressedBytestreamThreshold is the threshold in bytes for which blobs are read and written
// compressed. Use 0 for all writes being compressed, and a negative number for all operations being
// uncompressed.
CompressedBytestreamThreshold CompressedBytestreamThreshold
// UploadCompressionPredicate is a function called to decide whether a blob should be compressed for upload.
UploadCompressionPredicate UploadCompressionPredicate
// MaxBatchDigests is maximum amount of digests to batch in upload and download operations.
MaxBatchDigests MaxBatchDigests
// MaxQueryBatchDigests is maximum amount of digests to batch in CAS query operations.
MaxQueryBatchDigests MaxQueryBatchDigests
// MaxBatchSize is maximum size in bytes of a batch request for batch operations.
MaxBatchSize MaxBatchSize
// DirMode is mode used to create directories.
DirMode os.FileMode
// ExecutableMode is mode used to create executable files.
ExecutableMode os.FileMode
// RegularMode is mode used to create non-executable files.
RegularMode os.FileMode
// UtilizeLocality is to specify whether client downloads files utilizing disk access locality.
UtilizeLocality UtilizeLocality
// UnifiedUploads specifies whether the client uploads files in the background.
UnifiedUploads UnifiedUploads
// UnifiedUploadBufferSize specifies when the unified upload daemon flushes the pending requests.
UnifiedUploadBufferSize UnifiedUploadBufferSize
// UnifiedUploadTickDuration specifies how often the unified upload daemon flushes the pending requests.
UnifiedUploadTickDuration UnifiedUploadTickDuration
// UnifiedDownloads specifies whether the client downloads files in the background.
UnifiedDownloads UnifiedDownloads
// UnifiedDownloadBufferSize specifies when the unified download daemon flushes the pending requests.
UnifiedDownloadBufferSize UnifiedDownloadBufferSize
// UnifiedDownloadTickDuration specifies how often the unified download daemon flushes the pending requests.
UnifiedDownloadTickDuration UnifiedDownloadTickDuration
// TreeSymlinkOpts controls how symlinks are handled when constructing a tree.
TreeSymlinkOpts *TreeSymlinkOpts
serverCaps *repb.ServerCapabilities
useBatchOps UseBatchOps
casConcurrency int64
casUploaders *semaphore.Weighted
casUploadRequests chan *uploadRequest
casUploads map[digest.Digest]*uploadState
casDownloaders *semaphore.Weighted
casDownloadRequests chan *downloadRequest
rpcTimeouts RPCTimeouts
creds credentials.PerRPCCredentials
uploadOnce sync.Once
downloadOnce sync.Once
useBatchCompression UseBatchCompression
}
const (
// DefaultMaxBatchSize is the maximum size of a batch to upload with BatchWriteBlobs. We set it to slightly
// below 4 MB, because that is the limit of a message size in gRPC
DefaultMaxBatchSize = 4*1024*1024 - 1024
// DefaultMaxBatchDigests is a suggested approximate limit based on current RBE implementation.
// Above that BatchUpdateBlobs calls start to exceed a typical minute timeout.
DefaultMaxBatchDigests = 4000
// DefaultMaxQueryBatchDigests is a suggested limit for the number of items for in batch for a missing blobs query.
DefaultMaxQueryBatchDigests = 10_000
// DefaultDirMode is mode used to create directories.
DefaultDirMode = 0777
// DefaultExecutableMode is mode used to create executable files.
DefaultExecutableMode = 0777
// DefaultRegularMode is mode used to create non-executable files.
DefaultRegularMode = 0644
)
// Connection is meant to be used with generated methods that accept
// grpc.ClientConnInterface
func (c *Client) Connection() grpc.ClientConnInterface {
return c.connection
}
// CASConnection is meant to be used with generated methods that accept
// grpc.ClientConnInterface
func (c *Client) CASConnection() grpc.ClientConnInterface {
return c.casConnection
}
// Close closes the underlying gRPC connection(s).
func (c *Client) Close() error {
// Close the channels & stop background operations.
UnifiedUploads(false).Apply(c)
UnifiedDownloads(false).Apply(c)
err := c.connection.Close()
if err != nil {
return err
}
if c.casConnection != c.connection {
return c.casConnection.Close()
}
return nil
}
// Opt is an option that can be passed to Dial in order to configure the behaviour of the client.
type Opt interface {
Apply(*Client)
}
// ChunkMaxSize is maximum chunk size to use in Bytestream wrappers.
type ChunkMaxSize int
// Apply sets the client's maximal chunk size s.
func (s ChunkMaxSize) Apply(c *Client) {
c.ChunkMaxSize = s
}
// CompressedBytestreamThreshold is the threshold for compressing blobs when writing/reading.
// See comment in related field on the Client struct.
type CompressedBytestreamThreshold int64
// Apply sets the client's maximal chunk size s.
func (s CompressedBytestreamThreshold) Apply(c *Client) {
c.CompressedBytestreamThreshold = s
}
// An UploadCompressionPredicate determines whether to compress a blob on upload.
// Note that the CompressedBytestreamThreshold takes priority over this (i.e. if the blob to be uploaded
// is smaller than the threshold, this will not be called).
type UploadCompressionPredicate func(*uploadinfo.Entry) bool
// Apply sets the client's compression predicate.
func (cc UploadCompressionPredicate) Apply(c *Client) {
c.UploadCompressionPredicate = cc
}
// UtilizeLocality is to specify whether client downloads files utilizing disk access locality.
type UtilizeLocality bool
// Apply sets the client's UtilizeLocality.
func (s UtilizeLocality) Apply(c *Client) {
c.UtilizeLocality = s
}
// UnifiedUploads is to specify whether client uploads files in the background, unifying operations between different actions.
type UnifiedUploads bool
// Apply sets the client's UnifiedUploads.
func (s UnifiedUploads) Apply(c *Client) {
c.UnifiedUploads = s
}
// UnifiedUploadBufferSize is to tune when the daemon for UnifiedUploads flushes the pending requests.
type UnifiedUploadBufferSize int
// DefaultUnifiedUploadBufferSize is the default UnifiedUploadBufferSize.
const DefaultUnifiedUploadBufferSize = 10000
// Apply sets the client's UnifiedDownloadBufferSize.
func (s UnifiedUploadBufferSize) Apply(c *Client) {
c.UnifiedUploadBufferSize = s
}
// UnifiedUploadTickDuration is to tune how often the daemon for UnifiedUploads flushes the pending requests.
type UnifiedUploadTickDuration time.Duration
// DefaultUnifiedUploadTickDuration is the default UnifiedUploadTickDuration.
const DefaultUnifiedUploadTickDuration = UnifiedUploadTickDuration(50 * time.Millisecond)
// Apply sets the client's UnifiedUploadTickDuration.
func (s UnifiedUploadTickDuration) Apply(c *Client) {
c.UnifiedUploadTickDuration = s
}
// UnifiedDownloads is to specify whether client uploads files in the background, unifying operations between different actions.
type UnifiedDownloads bool
// Apply sets the client's UnifiedDownloads.
// Note: it is unsafe to change this property when connections are ongoing.
func (s UnifiedDownloads) Apply(c *Client) {
c.UnifiedDownloads = s
}
// UnifiedDownloadBufferSize is to tune when the daemon for UnifiedDownloads flushes the pending requests.
type UnifiedDownloadBufferSize int
// DefaultUnifiedDownloadBufferSize is the default UnifiedDownloadBufferSize.
const DefaultUnifiedDownloadBufferSize = 10000
// Apply sets the client's UnifiedDownloadBufferSize.
func (s UnifiedDownloadBufferSize) Apply(c *Client) {
c.UnifiedDownloadBufferSize = s
}
// UnifiedDownloadTickDuration is to tune how often the daemon for UnifiedDownloads flushes the pending requests.
type UnifiedDownloadTickDuration time.Duration
// DefaultUnifiedDownloadTickDuration is the default UnifiedDownloadTickDuration.
const DefaultUnifiedDownloadTickDuration = UnifiedDownloadTickDuration(50 * time.Millisecond)
// Apply sets the client's UnifiedDownloadTickDuration.
func (s UnifiedDownloadTickDuration) Apply(c *Client) {
c.UnifiedDownloadTickDuration = s
}
// Apply sets the client's TreeSymlinkOpts.
func (o *TreeSymlinkOpts) Apply(c *Client) {
c.TreeSymlinkOpts = o
}
// MaxBatchDigests is maximum amount of digests to batch in upload and download operations.
type MaxBatchDigests int
// Apply sets the client's maximal batch digests to s.
func (s MaxBatchDigests) Apply(c *Client) {
c.MaxBatchDigests = s
}
// MaxQueryBatchDigests is maximum amount of digests to batch in query operations.
type MaxQueryBatchDigests int
// Apply sets the client's maximal batch digests to s.
func (s MaxQueryBatchDigests) Apply(c *Client) {
c.MaxQueryBatchDigests = s
}
// MaxBatchSize is maximum size in bytes of a batch request for batch operations.
type MaxBatchSize int64
// Apply sets the client's maximum batch size to s.
func (s MaxBatchSize) Apply(c *Client) {
c.MaxBatchSize = s
}
// DirMode is mode used to create directories.
type DirMode os.FileMode
// Apply sets the client's DirMode to m.
func (m DirMode) Apply(c *Client) {
c.DirMode = os.FileMode(m)
}
// ExecutableMode is mode used to create executable files.
type ExecutableMode os.FileMode
// Apply sets the client's ExecutableMode to m.
func (m ExecutableMode) Apply(c *Client) {
c.ExecutableMode = os.FileMode(m)
}
// RegularMode is mode used to create non-executable files.
type RegularMode os.FileMode
// Apply sets the client's RegularMode to m.
func (m RegularMode) Apply(c *Client) {
c.RegularMode = os.FileMode(m)
}
// UseBatchOps can be set to true to use batch CAS operations when uploading multiple blobs, or
// false to always use individual ByteStream requests.
type UseBatchOps bool
// Apply sets the UseBatchOps flag on a client.
func (u UseBatchOps) Apply(c *Client) {
c.useBatchOps = u
}
// UseBatchCompression is currently set to true when the server has
// SupportedBatchUpdateCompressors capability and supports ZSTD compression.
type UseBatchCompression bool
// Apply sets the batchCompression flag on a client.
func (u UseBatchCompression) Apply(c *Client) {
c.useBatchCompression = u
}
// CASConcurrency is the number of simultaneous requests that will be issued for CAS upload and
// download operations.
type CASConcurrency int
// DefaultCASConcurrency is the default maximum number of concurrent upload and download operations.
const DefaultCASConcurrency = 500
// DefaultMaxConcurrentRequests specifies the default maximum number of concurrent requests on a single connection
// that the GRPC balancer can perform.
const DefaultMaxConcurrentRequests = 25
// DefaultMaxConcurrentStreams specifies the default threshold value at which the GRPC balancer should create
// new sub-connections.
const DefaultMaxConcurrentStreams = 25
// Apply sets the CASConcurrency flag on a client.
func (cy CASConcurrency) Apply(c *Client) {
c.casConcurrency = int64(cy)
c.casUploaders = semaphore.NewWeighted(c.casConcurrency)
c.casDownloaders = semaphore.NewWeighted(c.casConcurrency)
}
// StartupCapabilities controls whether the client should attempt to fetch the remote
// server capabilities on New. If set to true, some configuration such as MaxBatchSize
// is set according to the remote server capabilities instead of using the provided values.
type StartupCapabilities bool
// Apply sets the StartupCapabilities flag on a client.
func (s StartupCapabilities) Apply(c *Client) {
c.StartupCapabilities = s
}
// LegacyExecRootRelativeOutputs controls whether the client uses legacy behavior of
// treating output paths as relative to the exec root instead of the working directory.
type LegacyExecRootRelativeOutputs bool
// Apply sets the LegacyExecRootRelativeOutputs flag on a client.
func (l LegacyExecRootRelativeOutputs) Apply(c *Client) {
c.LegacyExecRootRelativeOutputs = l
}
// PerRPCCreds sets per-call options that will be set on all RPCs to the underlying connection.
type PerRPCCreds struct {
Creds credentials.PerRPCCredentials
}
// Apply saves the per-RPC creds in the Client.
func (p *PerRPCCreds) Apply(c *Client) {
c.creds = p.Creds
}
func getImpersonatedRPCCreds(ctx context.Context, actAs string, cred credentials.PerRPCCredentials) credentials.PerRPCCredentials {
// Wrap in a ReuseTokenSource to cache valid tokens in memory (i.e., non-nil, with a non-expired
// access token).
ts := oauth2.ReuseTokenSource(
nil, actas.NewTokenSource(ctx, cred, http.DefaultClient, actAs, []string{scopes}))
return oauth.TokenSource{
TokenSource: ts,
}
}
func getRPCCreds(ctx context.Context, credFile string, useApplicationDefault bool, useComputeEngine bool) (credentials.PerRPCCredentials, AuthType, error) {
if useApplicationDefault {
c, err := oauth.NewApplicationDefault(ctx, scopes)
return c, ApplicationDefaultCredsAuth, err
}
if useComputeEngine {
return oauth.NewComputeEngine(), GCECredsAuth, nil
}
rpcCreds, err := oauth.NewServiceAccountFromFile(credFile, scopes)
if err != nil {
return nil, CredsFileAuth, fmt.Errorf("couldn't create RPC creds from %s: %v", credFile, err)
}
return rpcCreds, CredsFileAuth, nil
}
// DialParams contains all the parameters that Dial needs.
type DialParams struct {
// Service contains the address of remote execution service.
Service string
// CASService contains the address of the CAS service, if it is separate from
// the remote execution service.
CASService string
// UseApplicationDefault indicates that the default credentials should be used.
UseApplicationDefault bool
// UseComputeEngine indicates that the default CE credentials should be used.
UseComputeEngine bool
// UseExternalAuthToken indicates whether an externally specified auth token should be used.
// If set to true, ExternalPerRPCCreds should also be non-nil.
UseExternalAuthToken bool
// ExternalPerRPCCreds refers to the per RPC credentials that should be used for each RPC.
ExternalPerRPCCreds *PerRPCCreds
// CredFile is the JSON file that contains the credentials for RPCs.
CredFile string
// ActAsAccount is the service account to act as when making RPC calls.
ActAsAccount string
// NoSecurity is true if there is no security: no credentials are configured
// (NoAuth is implied) and grpc.WithInsecure() is passed in. Should only be
// used in test code.
NoSecurity bool
// NoAuth is true if TLS is enabled (NoSecurity is false) but the client does
// not need to authenticate with the server.
NoAuth bool
// TransportCredsOnly is true if it's the caller's responsibility to set per-RPC credentials
// on individual calls. This overrides ActAsAccount, UseApplicationDefault, and UseComputeEngine.
// This is not the same as NoSecurity, as transport credentials will still be set.
TransportCredsOnly bool
// TLSCACertFile is the PEM file that contains TLS root certificates.
TLSCACertFile string
// TLSServerName overrides the server name sent in TLS, if set to a non-empty string.
TLSServerName string
// DialOpts defines the set of gRPC DialOptions to apply, in addition to any used internally.
DialOpts []grpc.DialOption
// MaxConcurrentRequests specifies the maximum number of concurrent RPCs on a single connection.
MaxConcurrentRequests uint32
// MaxConcurrentStreams specifies the maximum number of concurrent stream RPCs on a single connection.
MaxConcurrentStreams uint32
// TLSClientAuthCert specifies the public key in PEM format for using mTLS auth to connect to the RBE service.
//
// If this is specified, TLSClientAuthKey must also be specified.
TLSClientAuthCert string
// TLSClientAuthKey specifies the private key for using mTLS auth to connect to the RBE service.
//
// If this is specified, TLSClientAuthCert must also be specified.
TLSClientAuthKey string
// RoundRobinBalancer enables the simplified gRPC balancer instead of the default one.
RoundRobinBalancer bool
// RoundRobinPoolSize specifies the pool size for the round-robin load balancer.
RoundRobinPoolSize int
}
func createGRPCInterceptor(p DialParams) *balancer.GCPInterceptor {
apiConfig := &configpb.ApiConfig{
ChannelPool: &configpb.ChannelPoolConfig{
MaxSize: p.MaxConcurrentRequests,
MaxConcurrentStreamsLowWatermark: p.MaxConcurrentStreams,
},
Method: []*configpb.MethodConfig{
{
Name: []string{".*"},
Affinity: &configpb.AffinityConfig{
Command: configpb.AffinityConfig_BIND,
AffinityKey: "bind-affinity",
},
},
},
}
return balancer.NewGCPInterceptor(apiConfig)
}
func createTLSConfig(params DialParams) (*tls.Config, error) {
var certPool *x509.CertPool
if params.TLSCACertFile != "" {
certPool = x509.NewCertPool()
ca, err := os.ReadFile(params.TLSCACertFile)
if err != nil {
return nil, fmt.Errorf("failed to read %s: %w", params.TLSCACertFile, err)
}
if ok := certPool.AppendCertsFromPEM(ca); !ok {
return nil, fmt.Errorf("failed to load TLS CA certificates from %s", params.TLSCACertFile)
}
}
var mTLSCredentials []tls.Certificate
if params.TLSClientAuthCert != "" || params.TLSClientAuthKey != "" {
if params.TLSClientAuthCert == "" || params.TLSClientAuthKey == "" {
return nil, fmt.Errorf("TLSClientAuthCert and TLSClientAuthKey must both be empty or both be set, got TLSClientAuthCert='%v' and TLSClientAuthKey='%v'", params.TLSClientAuthCert, params.TLSClientAuthKey)
}
cert, err := tls.LoadX509KeyPair(params.TLSClientAuthCert, params.TLSClientAuthKey)
if err != nil {
return nil, fmt.Errorf("failed to read mTLS cert pair ('%v', '%v'): %v", params.TLSClientAuthCert, params.TLSClientAuthKey, err)
}
mTLSCredentials = append(mTLSCredentials, cert)
}
c := &tls.Config{
ServerName: params.TLSServerName,
RootCAs: certPool,
Certificates: mTLSCredentials,
}
return c, nil
}
// OptsFromParams prepares a set of grpc dial options based on the provided dial params.
func OptsFromParams(ctx context.Context, params DialParams) ([]grpc.DialOption, AuthType, error) {
var authUsed AuthType
var opts []grpc.DialOption
opts = append(opts, params.DialOpts...)
if params.MaxConcurrentRequests == 0 {
params.MaxConcurrentRequests = DefaultMaxConcurrentRequests
}
if params.MaxConcurrentStreams == 0 {
params.MaxConcurrentStreams = DefaultMaxConcurrentStreams
}
if params.NoSecurity {
authUsed = NoAuth
opts = append(opts, grpc.WithInsecure())
} else if params.NoAuth {
authUsed = NoAuth
// Set the ServerName and RootCAs fields, if needed.
tlsConfig, err := createTLSConfig(params)
if err != nil {
return nil, authUsed, fmt.Errorf("could not create TLS config: %v", err)
}
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
} else if params.UseExternalAuthToken {
authUsed = ExternalTokenAuth
if params.ExternalPerRPCCreds == nil {
return nil, authUsed, fmt.Errorf("ExternalPerRPCCreds unspecified when using external auth token mechanism")
}
opts = append(opts, grpc.WithPerRPCCredentials(params.ExternalPerRPCCreds.Creds))
// Set the ServerName and RootCAs fields, if needed.
tlsConfig, err := createTLSConfig(params)
if err != nil {
return nil, authUsed, fmt.Errorf("could not create TLS config: %v", err)
}
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
} else {
credFile := params.CredFile
if strings.Contains(credFile, HomeDirMacro) {
authUsed = CredsFileAuth
usr, err := user.Current()
if err != nil {
return nil, authUsed, fmt.Errorf("could not fetch home directory because of error determining current user: %v", err)
}
credFile = strings.Replace(credFile, HomeDirMacro, usr.HomeDir, -1 /* no limit */)
}
if !params.TransportCredsOnly {
var (
rpcCreds credentials.PerRPCCredentials
err error
)
rpcCreds, authUsed, err = getRPCCreds(ctx, credFile, params.UseApplicationDefault, params.UseComputeEngine)
if err != nil {
return nil, authUsed, fmt.Errorf("couldn't create RPC creds for %s: %v", scopes, err)
}
if params.ActAsAccount != "" {
rpcCreds = getImpersonatedRPCCreds(ctx, params.ActAsAccount, rpcCreds)
}
opts = append(opts, grpc.WithPerRPCCredentials(rpcCreds))
}
tlsConfig, err := createTLSConfig(params)
if err != nil {
return nil, authUsed, fmt.Errorf("could not create TLS config: %v", err)
}
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
}
grpcInt := createGRPCInterceptor(params)
opts = append(opts, grpc.WithDisableServiceConfig())
opts = append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, balancer.Name)))
opts = append(opts, grpc.WithUnaryInterceptor(grpcInt.GCPUnaryClientInterceptor))
opts = append(opts, grpc.WithStreamInterceptor(grpcInt.GCPStreamClientInterceptor))
return opts, authUsed, nil
}
// NewClient connects to a remote execution service and returns a client suitable for higher-level
// functionality.
func NewClient(ctx context.Context, instanceName string, params DialParams, opts ...Opt) (*Client, error) {
if instanceName == "" {
log.Warning("Instance name was not specified.")
}
if params.Service == "" {
return nil, &InitError{Err: fmt.Errorf("service needs to be specified")}
}
log.Infof("Connecting to remote execution instance %s", instanceName)
log.Infof("Connecting to remote execution service %s", params.Service)
dialOpts, authUsed, err := OptsFromParams(ctx, params)
if err != nil {
return nil, fmt.Errorf("failed to prepare gRPC dial options: %v", err)
}
var conn, casConn GrpcClientConn
if params.RoundRobinBalancer {
dial := func(ctx context.Context) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, params.Service, dialOpts...)
}
conn, err = balancer.NewRRConnPool(ctx, params.RoundRobinPoolSize, dial)
} else {
conn, err = grpc.Dial(params.Service, dialOpts...)
}
if err != nil {
return nil, fmt.Errorf("couldn't dial gRPC %q: %v", params.Service, err)
}
casConn = conn
if params.CASService != "" && params.CASService != params.Service {
log.Infof("Connecting to CAS service %s", params.CASService)
if params.RoundRobinBalancer {
dial := func(ctx context.Context) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, params.CASService, dialOpts...)
}
casConn, err = balancer.NewRRConnPool(ctx, params.RoundRobinPoolSize, dial)
} else {
casConn, err = grpc.Dial(params.CASService, dialOpts...)
}
}
if err != nil {
return nil, &InitError{Err: statusWrap(err), AuthUsed: authUsed}
}
client, err := NewClientFromConnection(ctx, instanceName, conn, casConn, opts...)
if err != nil {
return nil, &InitError{Err: err, AuthUsed: authUsed}
}
return client, nil
}
// NewClientFromConnection creates a client from gRPC connections to a remote execution service and a cas service.
func NewClientFromConnection(ctx context.Context, instanceName string, conn, casConn GrpcClientConn, opts ...Opt) (*Client, error) {
if conn == nil {
return nil, fmt.Errorf("connection to remote execution service may not be nil")
}
if casConn == nil {
return nil, fmt.Errorf("connection to CAS service may not be nil")
}
client := &Client{
InstanceName: instanceName,
actionCache: regrpc.NewActionCacheClient(casConn),
byteStream: bsgrpc.NewByteStreamClient(casConn),
cas: regrpc.NewContentAddressableStorageClient(casConn),
execution: regrpc.NewExecutionClient(conn),
operations: opgrpc.NewOperationsClient(conn),
rpcTimeouts: DefaultRPCTimeouts,
connection: conn,
casConnection: casConn,
CompressedBytestreamThreshold: DefaultCompressedBytestreamThreshold,
ChunkMaxSize: chunker.DefaultChunkSize,
MaxBatchDigests: DefaultMaxBatchDigests,
MaxQueryBatchDigests: DefaultMaxQueryBatchDigests,
MaxBatchSize: DefaultMaxBatchSize,
DirMode: DefaultDirMode,
ExecutableMode: DefaultExecutableMode,
RegularMode: DefaultRegularMode,
useBatchOps: true,
StartupCapabilities: true,
LegacyExecRootRelativeOutputs: false,
casConcurrency: DefaultCASConcurrency,
casUploaders: semaphore.NewWeighted(DefaultCASConcurrency),
casDownloaders: semaphore.NewWeighted(DefaultCASConcurrency),
casUploads: make(map[digest.Digest]*uploadState),
UnifiedUploadTickDuration: DefaultUnifiedUploadTickDuration,
UnifiedUploadBufferSize: DefaultUnifiedUploadBufferSize,
UnifiedDownloadTickDuration: DefaultUnifiedDownloadTickDuration,
UnifiedDownloadBufferSize: DefaultUnifiedDownloadBufferSize,
Retrier: RetryTransient(),
}
for _, o := range opts {
o.Apply(client)
}
if client.StartupCapabilities {
if err := client.CheckCapabilities(ctx); err != nil {
return nil, statusWrap(err)
}
}
if client.casConcurrency < 1 {
return nil, fmt.Errorf("CASConcurrency should be at least 1")
}
client.RunBackgroundTasks(ctx)
return client, nil
}
// RunBackgroundTasks starts background goroutines for the client.
func (c *Client) RunBackgroundTasks(ctx context.Context) {
if c.UnifiedUploads {
c.uploadOnce.Do(func() {
c.casUploadRequests = make(chan *uploadRequest, c.UnifiedUploadBufferSize)
go c.uploadProcessor(ctx)
})
}
if c.UnifiedDownloads {
c.downloadOnce.Do(func() {
c.casDownloadRequests = make(chan *downloadRequest, c.UnifiedDownloadBufferSize)
go c.downloadProcessor(ctx)
})
}
}
// RPCTimeouts is a Opt that sets the per-RPC deadline.
// The keys are RPC names. The "default" key, if present, is the default
// timeout. 0 values are valid and indicate no timeout.
type RPCTimeouts map[string]time.Duration
// Apply applies the timeouts to a Client. It overrides the provided values,
// but doesn't remove/alter any other present values.
func (d RPCTimeouts) Apply(c *Client) {
c.rpcTimeouts = map[string]time.Duration(d)
}
// DefaultRPCTimeouts contains the default timeout of various RPC calls to RBE.
var DefaultRPCTimeouts = map[string]time.Duration{
"default": 20 * time.Second,
"GetCapabilities": 5 * time.Second,
"BatchUpdateBlobs": time.Minute,
"BatchReadBlobs": time.Minute,
"GetTree": time.Minute,
// Note: due to an implementation detail, WaitExecution will use the same
// per-RPC timeout as Execute. It is extremely ill-advised to set the Execute
// timeout at above 0; most users should use the Action Timeout instead.
"Execute": 0,
"WaitExecution": 0,
}
// ResourceName constructs a correctly formatted resource name as defined in the spec.
// No keyword validation is performed since the semantics of the path are defined by the server.
// See: https://github.com/bazelbuild/remote-apis/blob/cb8058798964f0adf6dbab2f4c2176ae2d653447/build/bazel/remote/execution/v2/remote_execution.proto#L223
func (c *Client) ResourceName(segments ...string) (string, error) {
segs := make([]string, 0, len(segments)+1)
if c.InstanceName != "" {
segs = append(segs, c.InstanceName)
}
for _, s := range segments {
if s == "" {
return "", ErrEmptySegment
}
segs = append(segs, s)
}
return strings.Join(segs, "/"), nil
}
// RPCOpts returns the default RPC options that should be used for calls made with this client.
//
// This method is logically "protected" and is intended for use by extensions of Client.
func (c *Client) RPCOpts() []grpc.CallOption {
// Set a high limit on receiving large messages from the server.
opts := []grpc.CallOption{grpc.MaxCallRecvMsgSize(100 * 1024 * 1024)}
if c.creds == nil {
return opts
}
return append(opts, grpc.PerRPCCredentials(c.creds))
}
// CallWithTimeout executes the given function f with a context that times out after an RPC timeout.
//
// This method is logically "protected" and is intended for use by extensions of Client.
func (c *Client) CallWithTimeout(ctx context.Context, rpcName string, f func(ctx context.Context) error) error {
timeout, ok := c.rpcTimeouts[rpcName]
if !ok {
if timeout, ok = c.rpcTimeouts["default"]; !ok {
timeout = 0
}
}
if timeout == 0 {
return f(ctx)
}
childCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
e := f(childCtx)
if childCtx.Err() != nil {
return childCtx.Err()
}
return e
}
// Retrier applied to all client requests.
type Retrier struct {
Backoff retry.BackoffPolicy
ShouldRetry retry.ShouldRetry
}
// Apply sets the client's retrier function to r.
func (r *Retrier) Apply(c *Client) {
c.Retrier = r
}
// Do executes f() with retries.
// It can be called with a nil receiver; in that case no retries are done (just a passthrough call
// to f()).
func (r *Retrier) Do(ctx context.Context, f func() error) error {
if r == nil {
return f()
}
return retry.WithPolicy(ctx, r.ShouldRetry, r.Backoff, f)
}
// RetryTransient is a default retry policy for transient status codes.
func RetryTransient() *Retrier {
return &Retrier{
Backoff: retry.ExponentialBackoff(225*time.Millisecond, 2*time.Second, retry.Attempts(6)),
ShouldRetry: retry.TransientOnly,
}
}
// GetActionResult wraps the underlying call with specific client options.
func (c *Client) GetActionResult(ctx context.Context, req *repb.GetActionResultRequest) (res *repb.ActionResult, err error) {
opts := c.RPCOpts()
err = c.Retrier.Do(ctx, func() (e error) {
return c.CallWithTimeout(ctx, "GetActionResult", func(ctx context.Context) (e error) {
res, e = c.actionCache.GetActionResult(ctx, req, opts...)
return e
})
})
if err != nil {
return nil, statusWrap(err)
}
return res, nil
}
// UpdateActionResult wraps the underlying call with specific client options.
func (c *Client) UpdateActionResult(ctx context.Context, req *repb.UpdateActionResultRequest) (res *repb.ActionResult, err error) {
opts := c.RPCOpts()
err = c.Retrier.Do(ctx, func() (e error) {
return c.CallWithTimeout(ctx, "UpdateActionResult", func(ctx context.Context) (e error) {
res, e = c.actionCache.UpdateActionResult(ctx, req, opts...)
return e
})
})
if err != nil {
return nil, statusWrap(err)
}
return res, nil
}
// Read wraps the underlying call with specific client options.
// The wrapper is here for completeness to provide access to the low-level
// RPCs. Prefer using higher-level functions such as ReadBlob(ToFile) instead,
// as they include retries/timeouts handling.
func (c *Client) Read(ctx context.Context, req *bspb.ReadRequest) (res bsgrpc.ByteStream_ReadClient, err error) {
return c.byteStream.Read(ctx, req, c.RPCOpts()...)
}
// Write wraps the underlying call with specific client options.
// The wrapper is here for completeness to provide access to the low-level
// RPCs. Prefer using higher-level functions such as WriteBlob(s) instead,
// as they include retries/timeouts handling.
func (c *Client) Write(ctx context.Context) (res bsgrpc.ByteStream_WriteClient, err error) {
return c.byteStream.Write(ctx, c.RPCOpts()...)
}
// QueryWriteStatus wraps the underlying call with specific client options.
func (c *Client) QueryWriteStatus(ctx context.Context, req *bspb.QueryWriteStatusRequest) (res *bspb.QueryWriteStatusResponse, err error) {
opts := c.RPCOpts()
err = c.Retrier.Do(ctx, func() (e error) {
return c.CallWithTimeout(ctx, "QueryWriteStatus", func(ctx context.Context) (e error) {
res, e = c.byteStream.QueryWriteStatus(ctx, req, opts...)
return e
})
})
if err != nil {
return nil, statusWrap(err)
}
return res, nil
}
// FindMissingBlobs wraps the underlying call with specific client options.
func (c *Client) FindMissingBlobs(ctx context.Context, req *repb.FindMissingBlobsRequest) (res *repb.FindMissingBlobsResponse, err error) {
opts := c.RPCOpts()
err = c.Retrier.Do(ctx, func() (e error) {
return c.CallWithTimeout(ctx, "FindMissingBlobs", func(ctx context.Context) (e error) {
res, e = c.cas.FindMissingBlobs(ctx, req, opts...)
return e
})
})
if err != nil {
return nil, statusWrap(err)