forked from keybase/kbfs
/
disk_block_cache_remote.go
137 lines (119 loc) · 4.33 KB
/
disk_block_cache_remote.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// Copyright 2017 Keybase Inc. All rights reserved.
// Use of this source code is governed by a BSD
// license that can be found in the LICENSE file.
package libkbfs
import (
"context"
"net"
"github.com/keybase/client/go/libkb"
"github.com/keybase/go-framed-msgpack-rpc/rpc"
"github.com/keybase/kbfs/kbfsblock"
"github.com/keybase/kbfs/kbfscrypto"
kbgitkbfs "github.com/keybase/kbfs/protocol/kbgitkbfs1"
"github.com/keybase/kbfs/tlf"
)
type diskBlockCacheRemoteConfig interface {
logMaker
}
// DiskBlockCacheRemote implements a client to access a remote
// DiskBlockCacheService. It implements the DiskBlockCache interface.
type DiskBlockCacheRemote struct {
conn net.Conn
client kbgitkbfs.DiskBlockCacheClient
log traceLogger
}
var _ DiskBlockCache = (*DiskBlockCacheRemote)(nil)
// NewDiskBlockCacheRemote creates a new remote disk cache client.
func NewDiskBlockCacheRemote(kbCtx Context, config diskBlockCacheRemoteConfig) (
*DiskBlockCacheRemote, error) {
conn, xp, _, err := kbCtx.GetKBFSSocket(true)
if err != nil {
return nil, err
}
cli := rpc.NewClient(xp, KBFSErrorUnwrapper{},
libkb.LogTagsFromContext)
client := kbgitkbfs.DiskBlockCacheClient{Cli: cli}
return &DiskBlockCacheRemote{
conn: conn,
client: client,
log: traceLogger{config.MakeLogger("DBR")},
}, nil
}
// Get implements the DiskBlockCache interface for DiskBlockCacheRemote.
func (dbcr *DiskBlockCacheRemote) Get(ctx context.Context, tlfID tlf.ID,
blockID kbfsblock.ID) (buf []byte,
serverHalf kbfscrypto.BlockCryptKeyServerHalf,
prefetchStatus PrefetchStatus, err error) {
dbcr.log.LazyTrace(ctx, "DiskBlockCacheRemote: Get %s", blockID)
defer func() {
dbcr.log.LazyTrace(ctx, "DiskBlockCacheRemote: Get %s done (err=%+v)", blockID, err)
}()
res, err := dbcr.client.GetBlock(ctx, kbgitkbfs.GetBlockArg{
TlfID: tlfID.Bytes(),
BlockID: blockID.Bytes(),
})
if err != nil {
return nil, kbfscrypto.BlockCryptKeyServerHalf{}, NoPrefetch, err
}
err = serverHalf.UnmarshalBinary(res.ServerHalf)
if err != nil {
return nil, kbfscrypto.BlockCryptKeyServerHalf{}, NoPrefetch, err
}
prefetchStatus = PrefetchStatusFromProtocol(res.PrefetchStatus)
return res.Buf, serverHalf, prefetchStatus, nil
}
// Put implements the DiskBlockCache interface for DiskBlockCacheRemote.
func (dbcr *DiskBlockCacheRemote) Put(ctx context.Context, tlfID tlf.ID,
blockID kbfsblock.ID, buf []byte,
serverHalf kbfscrypto.BlockCryptKeyServerHalf) (err error) {
dbcr.log.LazyTrace(ctx, "DiskBlockCacheRemote: Put %s", blockID)
defer func() {
dbcr.log.LazyTrace(ctx, "DiskBlockCacheRemote: Put %s done (err=%+v)", blockID, err)
}()
return dbcr.client.PutBlock(ctx, kbgitkbfs.PutBlockArg{
TlfID: tlfID.Bytes(),
BlockID: blockID.Bytes(),
Buf: buf,
ServerHalf: serverHalf.Bytes(),
})
}
// Delete implements the DiskBlockCache interface for DiskBlockCacheRemote.
func (dbcr *DiskBlockCacheRemote) Delete(ctx context.Context,
blockIDs []kbfsblock.ID) (numRemoved int, sizeRemoved int64, err error) {
numBlocks := len(blockIDs)
dbcr.log.LazyTrace(ctx, "DiskBlockCacheRemote: Delete %s block(s)",
numBlocks)
defer func() {
dbcr.log.LazyTrace(ctx, "DiskBlockCacheRemote: Delete %s block(s) "+
"done (err=%+v)", numBlocks, err)
}()
blocks := make([][]byte, 0, len(blockIDs))
for _, b := range blockIDs {
blocks = append(blocks, b.Bytes())
}
res, err := dbcr.client.DeleteBlocks(ctx, blocks)
if err != nil {
return 0, 0, err
}
return res.NumRemoved, res.SizeRemoved, nil
}
// UpdateMetadata implements the DiskBlockCache interface for
// DiskBlockCacheRemote.
func (dbcr *DiskBlockCacheRemote) UpdateMetadata(ctx context.Context,
blockID kbfsblock.ID, prefetchStatus PrefetchStatus) error {
return dbcr.client.UpdateBlockMetadata(ctx,
kbgitkbfs.UpdateBlockMetadataArg{
BlockID: blockID.Bytes(),
PrefetchStatus: prefetchStatus.ToProtocol(),
})
}
// Status implements the DiskBlockCache interface for DiskBlockCacheRemote.
func (dbcr *DiskBlockCacheRemote) Status(ctx context.Context) map[string]DiskBlockCacheStatus {
// We don't return a status because it isn't needed in the contexts
// this block cache is used.
panic("Status() not implemented in DiskBlockCacheRemote")
}
// Shutdown implements the DiskBlockCache interface for DiskBlockCacheRemote.
func (dbcr *DiskBlockCacheRemote) Shutdown(ctx context.Context) {
dbcr.conn.Close()
}