/
client.go
1634 lines (1447 loc) · 53 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package cipd implements client side of Chrome Infra Package Deployer.
//
// Binary package file format (in free form representation):
// <binary package> := <zipped data>
// <zipped data> := DeterministicZip(<all input files> + <manifest json>)
// <manifest json> := File{
// name: ".cipdpkg/manifest.json",
// data: JSON({
// "FormatVersion": "1",
// "PackageName": <name of the package>
// }),
// }
// DeterministicZip = zip archive with deterministic ordering of files and stripped timestamps
//
// Main package data (<zipped data> above) is deterministic, meaning its content
// depends only on inputs used to built it (byte to byte): contents and names of
// all files added to the package (plus 'executable' file mode bit) and
// a package name (and all other data in the manifest).
//
// Binary package data MUST NOT depend on a timestamp, hostname of machine that
// built it, revision of the source code it was built from, etc. All that
// information will be distributed as a separate metadata packet associated with
// the package when it gets uploaded to the server.
//
// TODO: expand more when there's server-side package data model (labels
// and stuff).
package cipd
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path/filepath"
"sort"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/TriggerMail/luci-go/common/clock"
"github.com/TriggerMail/luci-go/common/errors"
"github.com/TriggerMail/luci-go/common/logging"
"github.com/TriggerMail/luci-go/common/proto/google"
"github.com/TriggerMail/luci-go/common/retry"
"github.com/TriggerMail/luci-go/common/retry/transient"
"github.com/TriggerMail/luci-go/grpc/prpc"
api "github.com/TriggerMail/luci-go/cipd/api/cipd/v1"
"github.com/TriggerMail/luci-go/cipd/client/cipd/deployer"
"github.com/TriggerMail/luci-go/cipd/client/cipd/digests"
"github.com/TriggerMail/luci-go/cipd/client/cipd/ensure"
"github.com/TriggerMail/luci-go/cipd/client/cipd/fs"
"github.com/TriggerMail/luci-go/cipd/client/cipd/internal"
"github.com/TriggerMail/luci-go/cipd/client/cipd/pkg"
"github.com/TriggerMail/luci-go/cipd/client/cipd/platform"
"github.com/TriggerMail/luci-go/cipd/client/cipd/template"
"github.com/TriggerMail/luci-go/cipd/common"
"github.com/TriggerMail/luci-go/cipd/version"
)
const (
// CASFinalizationTimeout is how long to wait for CAS service to finalize
// the upload in RegisterInstance.
CASFinalizationTimeout = 5 * time.Minute
// SetRefTimeout is how long to wait for an instance to be processed when
// setting a ref in SetRefWhenReady.
SetRefTimeout = 3 * time.Minute
// TagAttachTimeout is how long to wait for an instance to be processed when
// attaching tags in AttachTagsWhenReady.
TagAttachTimeout = 3 * time.Minute
)
// Environment variable definitions
const (
EnvCacheDir = "CIPD_CACHE_DIR"
EnvHTTPUserAgentPrefix = "CIPD_HTTP_USER_AGENT_PREFIX"
)
var (
// ErrFinalizationTimeout is returned if CAS service can not finalize upload
// fast enough.
ErrFinalizationTimeout = errors.New("timeout while waiting for CAS service to finalize the upload", transient.Tag)
// ErrBadUpload is returned when a package file is uploaded, but servers asks
// us to upload it again.
ErrBadUpload = errors.New("package file is uploaded, but servers asks us to upload it again", transient.Tag)
// ErrProcessingTimeout is returned by SetRefWhenReady or AttachTagsWhenReady
// if the instance processing on the backend takes longer than expected. Refs
// and tags can be attached only to processed instances.
ErrProcessingTimeout = errors.New("timeout while waiting for the instance to become ready", transient.Tag)
// ErrDownloadError is returned by FetchInstance on download errors.
ErrDownloadError = errors.New("failed to download the package file after multiple attempts", transient.Tag)
// ErrUploadError is returned by RegisterInstance on upload errors.
ErrUploadError = errors.New("failed to upload the package file after multiple attempts", transient.Tag)
// ErrEnsurePackagesFailed is returned by EnsurePackages if something is not
// right.
ErrEnsurePackagesFailed = errors.New("failed to update packages, see the log")
)
var (
// ClientPackage is a package with the CIPD client. Used during self-update.
ClientPackage = "infra/tools/cipd/${platform}"
// UserAgent is HTTP user agent string for CIPD client.
UserAgent = "cipd 2.2.9"
)
func init() {
ver, err := version.GetStartupVersion()
if err != nil || ver.InstanceID == "" {
return
}
UserAgent += fmt.Sprintf(" (%s@%s)", ver.PackageName, ver.InstanceID)
}
// UploadSession describes open CAS upload session.
type UploadSession struct {
// ID identifies upload session in the backend.
ID string
// URL is where to upload the data to.
URL string
}
// DescribeInstanceOpts is passed to DescribeInstance.
type DescribeInstanceOpts struct {
DescribeRefs bool // if true, will fetch all refs pointing to the instance
DescribeTags bool // if true, will fetch all tags attached to the instance
}
// Client provides high-level CIPD client interface. Thread safe.
type Client interface {
// BeginBatch makes the client enter into a "batch mode".
//
// In this mode various cleanup and cache updates, usually performed right
// away, are deferred until 'EndBatch' call.
//
// This is an optimization. Use it if you plan to call a bunch of Client
// methods in a short amount of time (parallel or sequentially).
//
// Batches can be nested.
BeginBatch(ctx context.Context)
// EndBatch ends a batch started with BeginBatch.
//
// EndBatch does various delayed maintenance tasks (like cache updates, trash
// cleanup and so on). This is best-effort operations, and thus this method
// doesn't return an errors.
//
// See also BeginBatch doc for more details.
EndBatch(ctx context.Context)
// FetchACL returns a list of PackageACL objects (parent paths first).
//
// Together they define the access control list for the given package prefix.
FetchACL(ctx context.Context, prefix string) ([]PackageACL, error)
// ModifyACL applies a set of PackageACLChanges to a package prefix ACL.
ModifyACL(ctx context.Context, prefix string, changes []PackageACLChange) error
// FetchRoles returns all roles the caller has in the given package prefix.
//
// Understands roles inheritance, e.g. if the caller is OWNER, the return
// value will list all roles implied by being an OWNER (e.g. READER, WRITER,
// ...).
FetchRoles(ctx context.Context, prefix string) ([]string, error)
// ResolveVersion converts an instance ID, a tag or a ref into a concrete Pin.
ResolveVersion(ctx context.Context, packageName, version string) (common.Pin, error)
// RegisterInstance makes the package instance available for clients.
//
// It uploads the instance to the storage, waits until the storage verifies
// its hash, and then registers the package in the repository, making it
// discoverable.
//
// 'timeout' specifies for how long to wait until the instance hash is
// verified by the storage backend. If 0, default CASFinalizationTimeout will
// be used.
RegisterInstance(ctx context.Context, instance pkg.Instance, timeout time.Duration) error
// DescribeInstance returns information about a package instance.
//
// May also be used as a simple instance presence check, if opts is nil. If
// the request succeeds, then the instance exists.
DescribeInstance(ctx context.Context, pin common.Pin, opts *DescribeInstanceOpts) (*InstanceDescription, error)
// DescribeClient returns information about a CIPD client binary matching the
// given client package pin.
DescribeClient(ctx context.Context, pin common.Pin) (*ClientDescription, error)
// SetRefWhenReady moves a ref to point to a package instance.
SetRefWhenReady(ctx context.Context, ref string, pin common.Pin) error
// AttachTagsWhenReady attaches tags to an instance.
AttachTagsWhenReady(ctx context.Context, pin common.Pin, tags []string) error
// FetchPackageRefs returns information about all refs defined for a package.
//
// The returned list is sorted by modification timestamp (newest first).
FetchPackageRefs(ctx context.Context, packageName string) ([]RefInfo, error)
// FetchInstance downloads a package instance file from the repository.
//
// It verifies that the package hash matches pin.InstanceID.
//
// It returns an InstanceFile pointing to the raw package data. The caller
// must close it when done.
FetchInstance(ctx context.Context, pin common.Pin) (pkg.Source, error)
// FetchInstanceTo downloads a package instance file into the given writer.
//
// This is roughly the same as getting a reader with 'FetchInstance' and
// copying its data into the writer, except this call skips unnecessary temp
// files if the client is not using cache.
//
// It verifies that the package hash matches pin.InstanceID, but does it while
// writing to 'output', so expect to discard all data there if FetchInstanceTo
// returns an error.
FetchInstanceTo(ctx context.Context, pin common.Pin, output io.WriteSeeker) error
// FetchAndDeployInstance fetches the package instance and deploys it.
//
// Deploys to the given subdir under the site root (see ClientOptions.Root).
// It doesn't check whether the instance is already deployed.
FetchAndDeployInstance(ctx context.Context, subdir string, pin common.Pin) error
// ListPackages returns a list packages and prefixes under the given prefix.
ListPackages(ctx context.Context, prefix string, recursive, includeHidden bool) ([]string, error)
// SearchInstances finds instances of some package with all given tags.
//
// Returns their concrete Pins. If the package doesn't exist at all, returns
// empty slice and nil error.
SearchInstances(ctx context.Context, packageName string, tags []string) (common.PinSlice, error)
// ListInstances enumerates instances of a package, most recent first.
//
// Returns an object that can be used to fetch the listing, page by page.
ListInstances(ctx context.Context, packageName string) (InstanceEnumerator, error)
// EnsurePackages installs, removes and updates packages in the site root.
//
// Given a description of what packages (and versions) should be installed it
// will do all necessary actions to bring the state of the site root to the
// desired one.
//
// Depending on the paranoia mode, will optionally verify that all installed
// packages are installed correctly and will attempt to fix ones that are not.
// See the enum for more info.
//
// If dryRun is true, will just check for changes and return them in Actions
// struct, but won't actually perform them.
//
// If the update was only partially applied, returns both Actions and error.
EnsurePackages(ctx context.Context, pkgs common.PinSliceBySubdir, paranoia ParanoidMode, dryRun bool) (ActionMap, error)
}
// ClientOptions is passed to NewClient factory function.
type ClientOptions struct {
// ServiceURL is root URL of the backend service.
//
// Default is ServiceURL const.
ServiceURL string
// Root is a site root directory.
//
// It is a directory where packages will be installed to. It also hosts
// .cipd/* directory that tracks internal state of installed packages and
// keeps various cache files. 'Root' can be an empty string if the client is
// not going to be used to deploy or remove local packages.
Root string
// CacheDir is a directory for shared cache.
//
// If empty, instances are not cached and tags are cached inside the site
// root. If both Root and CacheDir are empty, tag cache is disabled.
CacheDir string
// Versions is optional database of (pkg, version) => instance ID resolutions.
//
// If set, it will be used for all version resolutions done by the client.
// The client won't be consulting (or updating) the tag cache and won't make
// 'ResolveVersion' backend RPCs.
//
// This is primarily used to implement $ResolvedVersions ensure file feature.
Versions ensure.VersionsFile
// AnonymousClient is http.Client that doesn't attach authentication headers.
//
// Will be used when talking to the Google Storage. We use signed URLs that do
// not require additional authentication.
//
// Default is http.DefaultClient.
AnonymousClient *http.Client
// AuthenticatedClient is http.Client that attaches authentication headers.
//
// Will be used when talking to the backend.
//
// Default is same as AnonymousClient (it will probably not work for most
// packages, since the backend won't authorize an anonymous access).
AuthenticatedClient *http.Client
// UserAgent is put into User-Agent HTTP header with each request.
//
// Default is UserAgent const.
UserAgent string
// Mocks used by tests.
casMock api.StorageClient
repoMock api.RepositoryClient
storageMock storage
}
// LoadFromEnv loads supplied default values from an environment into opts.
//
// The supplied getEnv function is used to access named enviornment variables,
// and should return an empty string if the enviornment variable is not defined.
func (opts *ClientOptions) LoadFromEnv(getEnv func(string) string) error {
if opts.CacheDir == "" {
if v := getEnv(EnvCacheDir); v != "" {
if !filepath.IsAbs(v) {
return fmt.Errorf("bad %s: not an absolute path - %s", EnvCacheDir, v)
}
opts.CacheDir = v
}
}
if opts.UserAgent == "" {
if v := getEnv(EnvHTTPUserAgentPrefix); v != "" {
opts.UserAgent = fmt.Sprintf("%s/%s", v, UserAgent)
}
}
return nil
}
// NewClient initializes CIPD client object.
func NewClient(opts ClientOptions) (Client, error) {
if opts.AnonymousClient == nil {
opts.AnonymousClient = http.DefaultClient
}
if opts.AuthenticatedClient == nil {
opts.AuthenticatedClient = opts.AnonymousClient
}
if opts.UserAgent == "" {
opts.UserAgent = UserAgent
}
// Validate and normalize service URL.
if opts.ServiceURL == "" {
return nil, fmt.Errorf("ServiceURL is required")
}
parsed, err := url.Parse(opts.ServiceURL)
if err != nil {
return nil, fmt.Errorf("not a valid URL %q - %s", opts.ServiceURL, err)
}
if parsed.Path != "" && parsed.Path != "/" {
return nil, fmt.Errorf("expecting a root URL, not %q", opts.ServiceURL)
}
opts.ServiceURL = fmt.Sprintf("%s://%s", parsed.Scheme, parsed.Host)
prpcC := &prpc.Client{
C: opts.AuthenticatedClient,
Host: parsed.Host,
Options: &prpc.Options{
UserAgent: opts.UserAgent,
Insecure: parsed.Scheme == "http", // for testing with local dev server
Retry: func() retry.Iterator {
return &retry.ExponentialBackoff{
Limited: retry.Limited{
Delay: time.Second,
Retries: 10,
},
}
},
},
}
cas := opts.casMock
if cas == nil {
cas = api.NewStoragePRPCClient(prpcC)
}
repo := opts.repoMock
if repo == nil {
repo = api.NewRepositoryPRPCClient(prpcC)
}
storage := opts.storageMock
if storage == nil {
storage = &storageImpl{
chunkSize: uploadChunkSize,
userAgent: opts.UserAgent,
client: opts.AnonymousClient,
}
}
return &clientImpl{
ClientOptions: opts,
cas: cas,
repo: repo,
storage: storage,
deployer: deployer.New(opts.Root),
}, nil
}
// MaybeUpdateClient will update the client binary at clientExe (given as
// a native path) to targetVersion if it's out of date (based on its hash).
//
// This update is done from the "infra/tools/cipd/${platform}" package, see
// ClientPackage. The function will use the given ClientOptions to figure out
// how to establish a connection with the backend. Its Root and CacheDir values
// are ignored (values derived from clientExe are used instead).
//
// If given 'digests' is not nil, will make sure the hash of the downloaded
// client binary is in 'digests'.
//
// Note that this function make sense only in a context of a default CIPD CLI
// client. Other binaries that link to cipd package should not use it, they'll
// be "updated" to the CIPD client binary.
func MaybeUpdateClient(ctx context.Context, opts ClientOptions, targetVersion, clientExe string, digests *digests.ClientDigestsFile) (common.Pin, error) {
if err := common.ValidateInstanceVersion(targetVersion); err != nil {
return common.Pin{}, err
}
opts.Root = filepath.Dir(clientExe)
opts.CacheDir = filepath.Join(opts.Root, ".cipd_client_cache")
client, err := NewClient(opts)
if err != nil {
return common.Pin{}, err
}
impl := client.(*clientImpl)
fs := fs.NewFileSystem(opts.Root, filepath.Join(opts.CacheDir, "trash"))
defer fs.CleanupTrash(ctx)
pin, err := impl.maybeUpdateClient(ctx, fs, targetVersion, clientExe, digests)
if err == nil {
impl.ensureClientVersionInfo(ctx, fs, pin, clientExe)
}
return pin, err
}
type clientImpl struct {
ClientOptions
// pRPC API clients.
cas api.StorageClient
repo api.RepositoryClient
// batchLock protects guts of by BeginBatch/EndBatch implementation.
batchLock sync.Mutex
batchNesting int
batchPending map[batchAwareOp]struct{}
// storage knows how to upload and download raw binaries using signed URLs.
storage storage
// deployer knows how to install packages to local file system. Thread safe.
deployer deployer.Deployer
// tagCache is a file-system based cache of resolved tags.
tagCache *internal.TagCache
tagCacheInit sync.Once
// instanceCache is a file-system based cache of instances.
instanceCache *internal.InstanceCache
instanceCacheInit sync.Once
}
type batchAwareOp int
const (
batchAwareOpSaveTagCache batchAwareOp = iota
batchAwareOpCleanupTrash
)
// See https://golang.org/ref/spec#Method_expressions
var batchAwareOps = map[batchAwareOp]func(*clientImpl, context.Context){
batchAwareOpSaveTagCache: (*clientImpl).saveTagCache,
batchAwareOpCleanupTrash: (*clientImpl).cleanupTrash,
}
func (client *clientImpl) saveTagCache(ctx context.Context) {
if client.tagCache != nil {
if err := client.tagCache.Save(ctx); err != nil {
logging.Warningf(ctx, "cipd: failed to save tag cache - %s", err)
}
}
}
func (client *clientImpl) cleanupTrash(ctx context.Context) {
client.deployer.CleanupTrash(ctx)
}
// getTagCache lazy-initializes tagCache and returns it.
//
// May return nil if tag cache is disabled.
func (client *clientImpl) getTagCache() *internal.TagCache {
client.tagCacheInit.Do(func() {
var dir string
switch {
case client.CacheDir != "":
dir = client.CacheDir
case client.Root != "":
dir = filepath.Join(client.Root, fs.SiteServiceDir)
default:
return
}
parsed, err := url.Parse(client.ServiceURL)
if err != nil {
panic(err) // the URL has been validated in NewClient already
}
client.tagCache = internal.NewTagCache(fs.NewFileSystem(dir, ""), parsed.Host)
})
return client.tagCache
}
// getInstanceCache lazy-initializes instanceCache and returns it.
//
// May return nil if instance cache is disabled.
func (client *clientImpl) getInstanceCache(ctx context.Context) *internal.InstanceCache {
client.instanceCacheInit.Do(func() {
if client.CacheDir == "" {
return
}
path := filepath.Join(client.CacheDir, "instances")
client.instanceCache = internal.NewInstanceCache(fs.NewFileSystem(path, ""))
logging.Infof(ctx, "cipd: using instance cache at %q", path)
})
return client.instanceCache
}
func (client *clientImpl) BeginBatch(ctx context.Context) {
client.batchLock.Lock()
defer client.batchLock.Unlock()
client.batchNesting++
}
func (client *clientImpl) EndBatch(ctx context.Context) {
client.batchLock.Lock()
defer client.batchLock.Unlock()
if client.batchNesting <= 0 {
panic("EndBatch called without corresponding BeginBatch")
}
client.batchNesting--
if client.batchNesting == 0 {
// Execute all pending batch aware calls now.
for op := range client.batchPending {
batchAwareOps[op](client, ctx)
}
client.batchPending = nil
}
}
func (client *clientImpl) doBatchAwareOp(ctx context.Context, op batchAwareOp) {
client.batchLock.Lock()
defer client.batchLock.Unlock()
if client.batchNesting == 0 {
// Not inside a batch, execute right now.
batchAwareOps[op](client, ctx)
} else {
// Schedule to execute when 'EndBatch' is called.
if client.batchPending == nil {
client.batchPending = make(map[batchAwareOp]struct{}, 1)
}
client.batchPending[op] = struct{}{}
}
}
func (client *clientImpl) FetchACL(ctx context.Context, prefix string) ([]PackageACL, error) {
if _, err := common.ValidatePackagePrefix(prefix); err != nil {
return nil, err
}
resp, err := client.repo.GetInheritedPrefixMetadata(ctx, &api.PrefixRequest{
Prefix: prefix,
}, expectedCodes)
if err != nil {
return nil, humanErr(err)
}
return prefixMetadataToACLs(resp), nil
}
func (client *clientImpl) ModifyACL(ctx context.Context, prefix string, changes []PackageACLChange) error {
if _, err := common.ValidatePackagePrefix(prefix); err != nil {
return err
}
// Fetch existing metadata, if any.
meta, err := client.repo.GetPrefixMetadata(ctx, &api.PrefixRequest{
Prefix: prefix,
}, expectedCodes)
if code := grpc.Code(err); code != codes.OK && code != codes.NotFound {
return humanErr(err)
}
// Construct new empty metadata for codes.NotFound.
if meta == nil {
meta = &api.PrefixMetadata{Prefix: prefix}
}
// Apply mutations.
if dirty, err := mutateACLs(meta, changes); !dirty || err != nil {
return err
}
// Store the new metadata. This call will check meta.Fingerprint.
_, err = client.repo.UpdatePrefixMetadata(ctx, meta, expectedCodes)
return humanErr(err)
}
func (client *clientImpl) FetchRoles(ctx context.Context, prefix string) ([]string, error) {
if _, err := common.ValidatePackagePrefix(prefix); err != nil {
return nil, err
}
resp, err := client.repo.GetRolesInPrefix(ctx, &api.PrefixRequest{
Prefix: prefix,
}, expectedCodes)
if err != nil {
return nil, humanErr(err)
}
out := make([]string, len(resp.Roles))
for i, r := range resp.Roles {
out[i] = r.Role.String()
}
return out, nil
}
func (client *clientImpl) ListPackages(ctx context.Context, prefix string, recursive, includeHidden bool) ([]string, error) {
if _, err := common.ValidatePackagePrefix(prefix); err != nil {
return nil, err
}
resp, err := client.repo.ListPrefix(ctx, &api.ListPrefixRequest{
Prefix: prefix,
Recursive: recursive,
IncludeHidden: includeHidden,
}, expectedCodes)
if err != nil {
return nil, humanErr(err)
}
listing := resp.Packages
for _, pfx := range resp.Prefixes {
listing = append(listing, pfx+"/")
}
sort.Strings(listing)
return listing, nil
}
func (client *clientImpl) ResolveVersion(ctx context.Context, packageName, version string) (common.Pin, error) {
if err := common.ValidatePackageName(packageName); err != nil {
return common.Pin{}, err
}
// Is it instance ID already? Then it is already resolved.
if common.ValidateInstanceID(version, common.AnyHash) == nil {
return common.Pin{PackageName: packageName, InstanceID: version}, nil
}
if err := common.ValidateInstanceVersion(version); err != nil {
return common.Pin{}, err
}
// Use the preresolved version if configured to do so. Do NOT fallback to
// the backend calls. A missing version is an error.
if client.Versions != nil {
return client.Versions.ResolveVersion(packageName, version)
}
// Use a local cache when resolving tags to avoid round trips to the backend
// when calling same 'cipd ensure' command again and again.
var cache *internal.TagCache
if common.ValidateInstanceTag(version) == nil {
cache = client.getTagCache() // note: may be nil if the cache is disabled
}
if cache != nil {
cached, err := cache.ResolveTag(ctx, packageName, version)
if err != nil {
logging.Warningf(ctx, "cipd: could not query tag cache - %s", err)
}
if cached.InstanceID != "" {
logging.Debugf(ctx, "cipd: tag cache hit for %s:%s - %s", packageName, version, cached.InstanceID)
return cached, nil
}
}
// Either resolving a ref, or a tag cache miss? Hit the backend.
resp, err := client.repo.ResolveVersion(ctx, &api.ResolveVersionRequest{
Package: packageName,
Version: version,
}, expectedCodes)
if err != nil {
return common.Pin{}, humanErr(err)
}
pin := common.Pin{
PackageName: packageName,
InstanceID: common.ObjectRefToInstanceID(resp.Instance),
}
// If was resolving a tag, store it in the cache.
if cache != nil {
if err := cache.AddTag(ctx, pin, version); err != nil {
logging.Warningf(ctx, "cipd: could not add tag to the cache")
}
client.doBatchAwareOp(ctx, batchAwareOpSaveTagCache)
}
return pin, nil
}
// ensureClientVersionInfo is called only with the specially constructed client,
// see MaybeUpdateClient function.
func (client *clientImpl) ensureClientVersionInfo(ctx context.Context, fs fs.FileSystem, pin common.Pin, clientExe string) {
expect, err := json.Marshal(version.Info{
PackageName: pin.PackageName,
InstanceID: pin.InstanceID,
})
if err != nil {
// Should never occur; only error could be if version.Info is not JSON
// serializable.
logging.WithError(err).Errorf(ctx, "Unable to generate version file content")
return
}
verFile := version.GetVersionFile(clientExe)
if data, err := ioutil.ReadFile(verFile); err == nil && bytes.Equal(expect, data) {
return // up to date
}
// There was an error reading the existing version file, or its content does
// not match. Proceed with EnsureFile.
err = fs.EnsureFile(ctx, verFile, func(of *os.File) error {
_, err := of.Write(expect)
return err
})
if err != nil {
logging.WithError(err).Warningf(ctx, "Unable to update version info %q", verFile)
}
}
// maybeUpdateClient is called only with the specially constructed client, see
// MaybeUpdateClient function.
func (client *clientImpl) maybeUpdateClient(ctx context.Context, fs fs.FileSystem,
targetVersion, clientExe string, digests *digests.ClientDigestsFile) (common.Pin, error) {
// currentHashMatches calculates the existing client binary hash and compares
// it to 'obj'.
currentHashMatches := func(obj *api.ObjectRef) (yep bool, err error) {
hash, err := common.NewHash(obj.HashAlgo)
if err != nil {
return false, err
}
file, err := os.Open(clientExe)
if err != nil {
return false, err
}
defer file.Close()
if _, err := io.Copy(hash, file); err != nil {
return false, err
}
return common.HexDigest(hash) == obj.HexDigest, nil
}
client.BeginBatch(ctx)
defer client.EndBatch(ctx)
// Resolve the client version to a pin, to be able to later grab URL to the
// binary by querying info for that pin.
var pin common.Pin
clientPackage, err := template.DefaultExpander().Expand(ClientPackage)
if err != nil {
return common.Pin{}, err // shouldn't be happening in reality
}
if pin, err = client.ResolveVersion(ctx, clientPackage, targetVersion); err != nil {
return common.Pin{}, err
}
// The name of the client binary inside the client CIPD package. Acts only as
// a key inside the extracted refs cache, nothing more, so can technically be
// arbitrary.
clientFileName := "cipd"
if platform.CurrentOS() == "windows" {
clientFileName = "cipd.exe"
}
// rememberClientRef populates the extracted refs cache.
rememberClientRef := func(pin common.Pin, ref *api.ObjectRef) {
if cache := client.getTagCache(); cache != nil {
cache.AddExtractedObjectRef(ctx, pin, clientFileName, ref)
client.doBatchAwareOp(ctx, batchAwareOpSaveTagCache)
}
}
// Look up the hash corresponding to the pin in the extracted refs cache. See
// rememberClientRef calls below for where it is stored initially. A cache
// miss is fine, we'll reach to the backend to get the hash. A warm cache
// allows skipping RPCs to the backend on a "happy path", when the client is
// already up-to-date.
var clientRef *api.ObjectRef
if cache := client.getTagCache(); cache != nil {
if clientRef, err = cache.ResolveExtractedObjectRef(ctx, pin, clientFileName); err != nil {
return common.Pin{}, err
}
}
// If not using the tags cache or it is cold, ask the backend for an expected
// client ref. Note that we do it even if we have 'digests' file available,
// to handle the case when 'digests' file is stale (which can happen when
// updating the client version file).
var info *ClientDescription
if clientRef == nil {
if info, err = client.DescribeClient(ctx, pin); err != nil {
return common.Pin{}, err
}
clientRef = info.Digest
}
// If using pinned client digests, make sure the hash reported by the backend
// is mentioned there. In most cases a mismatch means the pinned digests file
// is just stale. The mismatch can also happen if the backend is compromised
// or the client package was forcefully replaced (this should never really
// happen...).
if digests != nil {
plat := platform.CurrentPlatform()
switch pinnedRef := digests.ClientRef(plat); {
case pinnedRef == nil:
return common.Pin{}, fmt.Errorf("there's no supported hash for %q in CIPD *.digests file", plat)
case !digests.Contains(plat, clientRef):
return common.Pin{}, fmt.Errorf(
"the CIPD client hash reported by the backend (%s) is not in *.digests file, "+
"if you changed CIPD client version recently most likely the *.digests "+
"file is just stale and needs to be regenerated via 'cipd selfupdate-roll ...'",
clientRef.HexDigest)
default:
clientRef = pinnedRef // pick the best supported hash algo from *.digests
}
}
// Is the client binary already up-to-date (has the expected hash)?
switch yep, err := currentHashMatches(clientRef); {
case err != nil:
return common.Pin{}, err // can't read clientExe
case yep:
// If we had to fetch the expected hash, store it in the cache to avoid
// fetching it again. Don't do it if we read it from the cache initially (to
// skip unnecessary cache write).
if info != nil {
rememberClientRef(pin, clientRef)
}
return pin, nil
}
if targetVersion == pin.InstanceID {
logging.Infof(ctx, "cipd: updating client to %s", pin)
} else {
logging.Infof(ctx, "cipd: updating client to %s (%s)", pin, targetVersion)
}
// Grab the signed URL of the client binary if we haven't done so already.
if info == nil {
if info, err = client.DescribeClient(ctx, pin); err != nil {
return common.Pin{}, err
}
}
// Here we know for sure that the current binary has wrong hash (most likely
// it is outdated). Fetch the new binary, verifying its hash matches the one
// we expect.
err = client.installClient(
ctx, fs,
common.MustNewHash(clientRef.HashAlgo),
info.SignedUrl,
clientExe,
clientRef.HexDigest)
if err != nil {
// Either a download error or hash mismatch.
return common.Pin{}, errors.Annotate(err, "when updating the CIPD client to %q", targetVersion).Err()
}
// The new fetched binary is valid.
rememberClientRef(pin, clientRef)
return pin, nil
}
func (client *clientImpl) RegisterInstance(ctx context.Context, instance pkg.Instance, timeout time.Duration) error {
if timeout == 0 {
timeout = CASFinalizationTimeout
}
pin := instance.Pin()
// attemptToRegister calls RegisterInstance RPC and logs the result.
attemptToRegister := func() (*api.UploadOperation, error) {
logging.Infof(ctx, "cipd: registering %s", pin)
resp, err := client.repo.RegisterInstance(ctx, &api.Instance{
Package: pin.PackageName,
Instance: common.InstanceIDToObjectRef(pin.InstanceID),
}, expectedCodes)
if err != nil {
return nil, humanErr(err)
}
switch resp.Status {
case api.RegistrationStatus_REGISTERED:
logging.Infof(ctx, "cipd: instance %s was successfully registered", pin)
return nil, nil
case api.RegistrationStatus_ALREADY_REGISTERED:
logging.Infof(
ctx, "cipd: instance %s is already registered by %s on %s",
pin, resp.Instance.RegisteredBy,
google.TimeFromProto(resp.Instance.RegisteredTs).Local())
return nil, nil
case api.RegistrationStatus_NOT_UPLOADED:
return resp.UploadOp, nil
default:
return nil, fmt.Errorf("unrecognized package registration status %s", resp.Status)
}
}
// Attempt to register. May be asked to actually upload the file first.
uploadOp, err := attemptToRegister()
switch {
case err != nil:
return err
case uploadOp == nil:
return nil // no need to upload, the instance is registered
}
// The backend asked us to upload the data to CAS. Do it.
if err := client.storage.upload(ctx, uploadOp.UploadUrl, instance.Source()); err != nil {
return err
}
if err := client.finalizeUpload(ctx, uploadOp.OperationId, timeout); err != nil {
return err
}
logging.Infof(ctx, "cipd: successfully uploaded and verified %s", pin)
// Try the registration again now that the file is uploaded to CAS. It should
// succeed.
switch uploadOp, err := attemptToRegister(); {
case uploadOp != nil:
return ErrBadUpload // welp, the upload didn't work for some reason, give up
default:
return err
}
}
// finalizeUpload repeatedly calls FinishUpload RPC until server reports that
// the uploaded file has been verified.
func (client *clientImpl) finalizeUpload(ctx context.Context, opID string, timeout time.Duration) error {
ctx, cancel := clock.WithTimeout(ctx, timeout)
defer cancel()
sleep := time.Second
for {
select {
case <-ctx.Done():
return ErrFinalizationTimeout
default:
}
op, err := client.cas.FinishUpload(ctx, &api.FinishUploadRequest{
UploadOperationId: opID,
})
switch {
case err == context.DeadlineExceeded:
continue // this may be short RPC deadline, try again
case err != nil:
return humanErr(err)
case op.Status == api.UploadStatus_PUBLISHED:
return nil // verified!
case op.Status == api.UploadStatus_ERRORED:
return errors.New(op.ErrorMessage) // fatal verification error
case op.Status == api.UploadStatus_UPLOADING || op.Status == api.UploadStatus_VERIFYING: