forked from sensu/sensu-go
/
boltdb_manager.go
158 lines (133 loc) · 3.81 KB
/
boltdb_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package asset
import (
"context"
"encoding/json"
"os"
"path/filepath"
bolt "go.etcd.io/bbolt"
"github.com/sensu/sensu-go/types"
)
var (
assetBucketName = []byte("assets")
)
// NewBoltDBGetter returns a new default asset Getter. If fetcher, verifier, or
// expander are nil, the getter will use the built-in components.
func NewBoltDBGetter(db *bolt.DB,
localStorage string,
fetcher Fetcher,
verifier Verifier,
expander Expander) Getter {
if fetcher == nil {
fetcher = defaultFetcher
}
if expander == nil {
expander = defaultExpander
}
if verifier == nil {
verifier = defaultVerifier
}
return &boltDBAssetManager{
localStorage: localStorage,
db: db,
fetcher: fetcher,
expander: expander,
verifier: verifier,
}
}
// boltDBAssetManager is responsible for the installing and storing the metadata
// for assets backed by an instance of BoltDB on the local filesystem. BoltDB
// provides the serialization guarantee that the asset contract specifies.
// We rely on long-lived BoltDB transactions during Get to provide this
// mechanism for blocking.
type boltDBAssetManager struct {
localStorage string
db *bolt.DB
fetcher Fetcher
expander Expander
verifier Verifier
}
// Get opens a transaction to BoltDB, causing subsequent calls to
// Get to block. During this transaction, we attempt to determine if the asset
// is installed by querying BoltDB for the asset's SHA (which we use as an ID).
//
// If a value is returned, we return the deserialized asset stored in BoltDB.
// If deserialization fails, we assume there is some level of corruption and
// attempt to re-install the asset.
//
// If a value is not returned, the asset is not installed or not installed
// correctly. We then proceed to attempt asset installation.
func (b *boltDBAssetManager) Get(ctx context.Context, asset *types.Asset) (*RuntimeAsset, error) {
key := []byte(asset.Sha512)
var localAsset *RuntimeAsset
// Concurrent calls to View are allowed, but a concurrent call that has
// has proceeded to Update below will block here.
if err := b.db.View(func(tx *bolt.Tx) error {
// If the key exists, the bucket should already exist.
bucket := tx.Bucket(assetBucketName)
if bucket == nil {
return nil
}
value := bucket.Get(key)
if value != nil {
// deserialize asset
if err := json.Unmarshal(value, &localAsset); err == nil {
return nil
}
}
return nil
}); err != nil {
return nil, err
}
// Check to see if the view was successful.
if localAsset != nil {
localAsset.SHA512 = asset.Sha512
return localAsset, nil
}
if err := b.db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(assetBucketName)
if err != nil {
return err
}
// Though we've already attempted to do this, it's possible that a previous
// call completed installation of the asset while this transaction
// was blocked on serialization. Re-attempt to get the key in case that is
// what happened.
value := bucket.Get(key)
if value != nil {
// deserialize asset
if err := json.Unmarshal(value, &localAsset); err == nil {
return nil
}
}
// install the asset
tmpFile, err := b.fetcher.Fetch(ctx, asset.URL)
if err != nil {
return err
}
defer tmpFile.Close()
defer os.Remove(tmpFile.Name())
// verify
if err := b.verifier.Verify(tmpFile, asset.Sha512); err != nil {
return err
}
// expand
assetPath := filepath.Join(b.localStorage, asset.Sha512)
if err := b.expander.Expand(tmpFile, assetPath); err != nil {
return err
}
localAsset = &RuntimeAsset{
Path: assetPath,
}
assetJSON, err := json.Marshal(localAsset)
if err != nil {
panic(err)
}
return bucket.Put(key, assetJSON)
}); err != nil {
return nil, err
}
if localAsset != nil {
localAsset.SHA512 = asset.Sha512
}
return localAsset, nil
}