-
Notifications
You must be signed in to change notification settings - Fork 18
/
cache.go
148 lines (126 loc) · 3.92 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package rpc
import (
"context"
"github.com/cirruslabs/cirrus-ci-agent/api"
"github.com/cirruslabs/cirrus-cli/internal/executor/cache"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"io"
)
const sendBufSize = 1024 * 1024
func (r *RPC) UploadCache(stream api.CirrusCIService_UploadCacheServer) error {
var putOp *cache.PutOperation
var bytesSaved int64
for {
cacheEntry, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
r.logger.Warnf("error stream errored out while uploading cache: %v", err)
return err
}
switch x := cacheEntry.Value.(type) {
case *api.CacheEntry_Key:
if putOp != nil {
r.logger.Warnf("received multiple cache entries in a single method call")
return status.Error(codes.FailedPrecondition, "received multiple cache entries in a single method call")
}
_, err := r.build.GetTaskFromIdentification(x.Key.TaskIdentification, r.clientSecret)
if err != nil {
return err
}
putOp, err = r.build.Cache.Put(x.Key.CacheKey)
if err != nil {
r.logger.Debugf("error while initializing cache put operation: %v", err)
return status.Error(codes.Internal, "failed to initialize cache put operation")
}
r.logger.Debugf("receiving cache with key %s", x.Key.CacheKey)
case *api.CacheEntry_Chunk:
if putOp == nil {
return status.Error(codes.PermissionDenied, "not authenticated")
}
n, err := putOp.Write(x.Chunk.Data)
if err != nil {
r.logger.Debugf("error while processing cache chunk: %v", err)
return status.Error(codes.Internal, "failed to process cache chunk")
}
bytesSaved += int64(n)
}
}
if putOp == nil {
r.logger.Warnf("attempted to upload cache without actually sending anything")
return status.Error(codes.FailedPrecondition, "attempted to upload cache without actually sending anything")
}
if err := putOp.Finalize(); err != nil {
r.logger.Debugf("while finalizing cache put operation")
return status.Error(codes.Internal, "failed to finalize cache put operation")
}
response := api.UploadCacheResponse{
BytesSaved: bytesSaved,
}
if err := stream.SendAndClose(&response); err != nil {
r.logger.Warnf("error while closing cache upload stream: %v", err)
return err
}
return nil
}
func (r *RPC) DownloadCache(req *api.DownloadCacheRequest, stream api.CirrusCIService_DownloadCacheServer) error {
_, err := r.build.GetTaskFromIdentification(req.TaskIdentification, r.clientSecret)
if err != nil {
return err
}
file, err := r.build.Cache.Get(req.CacheKey)
if err != nil {
r.logger.Debugf("error while getting cache blob with key %s: %v", req.CacheKey, err)
return status.Errorf(codes.NotFound, "cache blob with the specified key not found")
}
defer file.Close()
r.logger.Debugf("sending cache with key %s", req.CacheKey)
buf := make([]byte, sendBufSize)
for {
n, err := file.Read(buf)
if err == io.EOF {
break
}
if err != nil {
return status.Errorf(codes.Internal, "failed to read cache blob")
}
chunk := api.DataChunk{
Data: buf[:n],
}
err = stream.Send(&chunk)
if err == io.EOF {
break
}
if err != nil {
r.logger.Warnf("error while sending cache chunk of size %d: %v", n, err)
return err
}
}
return nil
}
func (r *RPC) CacheInfo(ctx context.Context, req *api.CacheInfoRequest) (*api.CacheInfoResponse, error) {
_, err := r.build.GetTaskFromIdentification(req.TaskIdentification, r.clientSecret)
if err != nil {
return nil, err
}
r.logger.Debugf("sending info about cache key %s", req.CacheKey)
file, err := r.build.Cache.Get(req.CacheKey)
if err != nil {
return nil, status.Errorf(codes.NotFound, "cache blob with the specified key not found")
}
defer file.Close()
fileInfo, err := file.Stat()
if err != nil {
return nil, err
}
response := api.CacheInfoResponse{
Info: &api.CacheInfo{
Key: req.CacheKey,
SizeInBytes: fileInfo.Size(),
CreationTimestamp: fileInfo.ModTime().Unix(),
},
}
return &response, nil
}