Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StoragePacker V2 #6176

Open
wants to merge 60 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
3a843b9
storagepacker V2
vishalnayak May 7, 2018
c62dc6b
Added packer walk
vishalnayak May 7, 2018
bb2ae17
test for walk func
vishalnayak May 7, 2018
55d4ce7
tests for bits needed
vishalnayak May 7, 2018
d2aeb5e
add comment
vishalnayak May 7, 2018
ff7593a
Merge branch 'master-oss' into storagepacker_v2
vishalnayak May 7, 2018
01c42b1
Merge branch 'master-oss' into storagepacker_v2
vishalnayak Jul 10, 2018
d879717
Merge branch 'master-oss' into storagepacker_v2
jefferai Jan 30, 2019
56ec335
Merge branch 'master-oss' into storagepacker_v2
jefferai Jan 30, 2019
b26a916
Update max size
jefferai Jan 30, 2019
e783980
First cut at migrating things over
jefferai Jan 30, 2019
abed3b3
Readd old version as legacy
jefferai Jan 30, 2019
3475755
Merge branch 'master-oss' into storagepacker_v2
jefferai Jan 31, 2019
4595b3b
More interim work
jefferai Jan 31, 2019
6effe9e
Use ItemMap instead of Items
jefferai Jan 31, 2019
60a0f13
Fix some bugs and tests
jefferai Jan 31, 2019
49276fc
Switch locksutil to blake
jefferai Jan 31, 2019
3a31a58
Merge branch 'master-oss' into storagepacker_v2
jefferai Jan 31, 2019
50db521
Add queue funcs and rename something back
jefferai Feb 1, 2019
7c26b99
Add entity upgrade code, still need group
jefferai Feb 1, 2019
f0b83e3
Only do upgrade if primary
jefferai Feb 1, 2019
0ca0228
Add upgrade test and fix bugs
jefferai Feb 1, 2019
a139a57
Add a step to reseal and reload
jefferai Feb 2, 2019
cbf10a5
Merge branch 'master-oss' into storagepacker_v2
jefferai Feb 2, 2019
fe78cfa
Port some changes over
jefferai Feb 3, 2019
1133067
Fix location of never become active
jefferai Feb 3, 2019
31f2987
Change initial test to use constructor swapping
jefferai Feb 3, 2019
187ba69
Add some testhelpers and fix some bugs
jefferai Feb 4, 2019
2cb709d
Merge remote-tracking branch 'oss/master' into storagepacker_v2
jefferai Feb 5, 2019
ad6cf66
Move things over from ent branch
jefferai Feb 5, 2019
d9af12c
Remove extra debuggging statements
jefferai Feb 5, 2019
9c9a8c4
Merge branch 'master-oss' into storagepacker_v2
jefferai Feb 6, 2019
d754c6e
Rename StoragePacker to v2
jefferai Feb 6, 2019
ba91e9f
Remove some debug output
jefferai Feb 6, 2019
61f9e12
Merge branch 'master-oss' into storagepacker_v2
jefferai Feb 7, 2019
05ace97
Add ErrValueTooLarge to physical
jefferai Feb 7, 2019
a5d4326
Interim, to move machines
jefferai Feb 7, 2019
92d8bbd
Checking in initial sharding impl
jefferai Feb 8, 2019
14380d7
Merge branch 'master-oss' into storagepacker_v2
jefferai Feb 12, 2019
2712ac0
Merge branch 'storagepacker_v2' into storagepacker_sharding
jefferai Feb 12, 2019
27e0c60
Fix compile
jefferai Feb 13, 2019
5c53c95
Merge branch 'storagepacker_v2' into storagepacker_sharding
jefferai Feb 13, 2019
c72e6ae
Sharding works. Lots of debug still in
jefferai Feb 14, 2019
3b869ed
Re add some test output code
jefferai Feb 14, 2019
8bd03b7
Relevel some debugging
jefferai Feb 14, 2019
aa2ccf6
Remove some debugging
jefferai Feb 14, 2019
311b86b
Merge branch 'master-oss' into storagepacker_v2
jefferai Feb 15, 2019
0be0070
Merge branch 'storagepacker_v2' into storagepacker_sharding
jefferai Feb 15, 2019
636bfb8
Merge branch 'master-oss' into storagepacker_v2
jefferai Mar 12, 2019
d03aada
remove unneeded code and move a cleanup code line outside the loop (#…
vishalnayak Mar 25, 2019
856add8
Fix tests
vishalnayak Mar 26, 2019
42318cc
Merge branch 'master-oss' into storagepacker_v2
vishalnayak Mar 26, 2019
453dd0f
Merge branch 'master-oss' into storagepacker_v2
vishalnayak Mar 28, 2019
a689aa3
Merge branch 'master-oss' into storagepacker_v2
jefferai Apr 2, 2019
7bb3528
Update generated proto
jefferai Apr 2, 2019
7f9f931
Merge branch 'master-oss' into storagepacker_v2
vishalnayak May 8, 2019
3fa22d4
fix tests
vishalnayak May 9, 2019
f4d8668
Merge branch 'master-oss' into storagepacker_v2
vishalnayak May 9, 2019
0e08c11
StoragePackerV2: Item storage from proto.Any to []byte (#6715)
vishalnayak May 10, 2019
5f5ac7e
Merge branch 'master' into storagepacker_v2
jefferai May 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
334 changes: 16 additions & 318 deletions helper/storagepacker/storagepacker.go
Expand Up @@ -2,326 +2,24 @@ package storagepacker

import (
"context"
"crypto/md5"
"fmt"
"strconv"
"strings"

"github.com/golang/protobuf/proto"
"github.com/hashicorp/errwrap"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/sdk/helper/compressutil"
"github.com/hashicorp/vault/sdk/helper/locksutil"
"github.com/hashicorp/vault/sdk/logical"
)

const (
bucketCount = 256
// StoragePackerBucketsPrefix is the default storage key prefix under which
// bucket data will be stored.
StoragePackerBucketsPrefix = "packer/buckets/"
)

// StoragePacker packs items into a specific number of buckets by hashing
// its identifier and indexing on it. Currently this supports only 256 bucket entries and
// hence relies on the first byte of the hash value for indexing.
type StoragePacker struct {
view logical.Storage
logger log.Logger
storageLocks []*locksutil.LockEntry
viewPrefix string
}

// View returns the storage view configured to be used by the packer
func (s *StoragePacker) View() logical.Storage {
return s.view
}

// GetBucket returns a bucket for a given key
func (s *StoragePacker) GetBucket(key string) (*Bucket, error) {
if key == "" {
return nil, fmt.Errorf("missing bucket key")
}

lock := locksutil.LockForKey(s.storageLocks, key)
lock.RLock()
defer lock.RUnlock()

// Read from storage
storageEntry, err := s.view.Get(context.Background(), key)
if err != nil {
return nil, errwrap.Wrapf("failed to read packed storage entry: {{err}}", err)
}
if storageEntry == nil {
return nil, nil
}

uncompressedData, notCompressed, err := compressutil.Decompress(storageEntry.Value)
if err != nil {
return nil, errwrap.Wrapf("failed to decompress packed storage entry: {{err}}", err)
}
if notCompressed {
uncompressedData = storageEntry.Value
}

var bucket Bucket
err = proto.Unmarshal(uncompressedData, &bucket)
if err != nil {
return nil, errwrap.Wrapf("failed to decode packed storage entry: {{err}}", err)
}

return &bucket, nil
}

// upsert either inserts a new item into the bucket or updates an existing one
// if an item with a matching key is already present.
func (s *Bucket) upsert(item *Item) error {
if s == nil {
return fmt.Errorf("nil storage bucket")
}

if item == nil {
return fmt.Errorf("nil item")
}

if item.ID == "" {
return fmt.Errorf("missing item ID")
}

// Look for an item with matching key and don't modify the collection while
// iterating
foundIdx := -1
for itemIdx, bucketItems := range s.Items {
if bucketItems.ID == item.ID {
foundIdx = itemIdx
break
}
}

// If there is no match, append the item, otherwise update it
if foundIdx == -1 {
s.Items = append(s.Items, item)
} else {
s.Items[foundIdx] = item
}

return nil
}

// BucketKey returns the storage key of the bucket where the given item will be
// stored.
func (s *StoragePacker) BucketKey(itemID string) string {
hf := md5.New()
input := []byte(itemID)
n, err := hf.Write(input)
// Make linter happy
if err != nil || n != len(input) {
return ""
}
index := uint8(hf.Sum(nil)[0])
return s.viewPrefix + strconv.Itoa(int(index))
}

// DeleteItem removes the item from the respective bucket
func (s *StoragePacker) DeleteItem(_ context.Context, itemID string) error {
if itemID == "" {
return fmt.Errorf("empty item ID")
}

bucketKey := s.BucketKey(itemID)

// Read from storage
storageEntry, err := s.view.Get(context.Background(), bucketKey)
if err != nil {
return errwrap.Wrapf("failed to read packed storage value: {{err}}", err)
}
if storageEntry == nil {
return nil
}

uncompressedData, notCompressed, err := compressutil.Decompress(storageEntry.Value)
if err != nil {
return errwrap.Wrapf("failed to decompress packed storage value: {{err}}", err)
}
if notCompressed {
uncompressedData = storageEntry.Value
}

var bucket Bucket
err = proto.Unmarshal(uncompressedData, &bucket)
if err != nil {
return errwrap.Wrapf("failed decoding packed storage entry: {{err}}", err)
}

// Look for a matching storage entry
foundIdx := -1
for itemIdx, item := range bucket.Items {
if item.ID == itemID {
foundIdx = itemIdx
break
}
}

// If there is a match, remove it from the collection and persist the
// resulting collection
if foundIdx != -1 {
bucket.Items = append(bucket.Items[:foundIdx], bucket.Items[foundIdx+1:]...)

// Persist bucket entry only if there is an update
err = s.PutBucket(&bucket)
if err != nil {
return err
}
}

return nil
}

func (s *StoragePacker) PutBucket(bucket *Bucket) error {
if bucket == nil {
return fmt.Errorf("nil bucket entry")
}

if bucket.Key == "" {
return fmt.Errorf("missing key")
}

if !strings.HasPrefix(bucket.Key, s.viewPrefix) {
return fmt.Errorf("incorrect prefix; bucket entry key should have %q prefix", s.viewPrefix)
}

marshaledBucket, err := proto.Marshal(bucket)
if err != nil {
return errwrap.Wrapf("failed to marshal bucket: {{err}}", err)
}

compressedBucket, err := compressutil.Compress(marshaledBucket, &compressutil.CompressionConfig{
Type: compressutil.CompressionTypeSnappy,
})
if err != nil {
return errwrap.Wrapf("failed to compress packed bucket: {{err}}", err)
}

// Store the compressed value
err = s.view.Put(context.Background(), &logical.StorageEntry{
Key: bucket.Key,
Value: compressedBucket,
})
if err != nil {
return errwrap.Wrapf("failed to persist packed storage entry: {{err}}", err)
}

return nil
}

// GetItem fetches the storage entry for a given key from its corresponding
// bucket.
func (s *StoragePacker) GetItem(itemID string) (*Item, error) {
if itemID == "" {
return nil, fmt.Errorf("empty item ID")
}

bucketKey := s.BucketKey(itemID)

// Fetch the bucket entry
bucket, err := s.GetBucket(bucketKey)
if err != nil {
return nil, errwrap.Wrapf("failed to read packed storage item: {{err}}", err)
}
if bucket == nil {
return nil, nil
}

// Look for a matching storage entry in the bucket items
for _, item := range bucket.Items {
if item.ID == itemID {
return item, nil
}
}

return nil, nil
}

// PutItem stores the given item in its respective bucket
func (s *StoragePacker) PutItem(_ context.Context, item *Item) error {
if item == nil {
return fmt.Errorf("nil item")
}

if item.ID == "" {
return fmt.Errorf("missing ID in item")
}

var err error
bucketKey := s.BucketKey(item.ID)

bucket := &Bucket{
Key: bucketKey,
}

// In this case, we persist the storage entry regardless of the read
// storageEntry below is nil or not. Hence, directly acquire write lock
// even to read the entry.
lock := locksutil.LockForKey(s.storageLocks, bucketKey)
lock.Lock()
defer lock.Unlock()

// Check if there is an existing bucket for a given key
storageEntry, err := s.view.Get(context.Background(), bucketKey)
if err != nil {
return errwrap.Wrapf("failed to read packed storage bucket entry: {{err}}", err)
}

if storageEntry == nil {
// If the bucket entry does not exist, this will be the only item the
// bucket that is going to be persisted.
bucket.Items = []*Item{
item,
}
} else {
uncompressedData, notCompressed, err := compressutil.Decompress(storageEntry.Value)
if err != nil {
return errwrap.Wrapf("failed to decompress packed storage entry: {{err}}", err)
}
if notCompressed {
uncompressedData = storageEntry.Value
}

err = proto.Unmarshal(uncompressedData, bucket)
if err != nil {
return errwrap.Wrapf("failed to decode packed storage entry: {{err}}", err)
}

err = bucket.upsert(item)
if err != nil {
return errwrap.Wrapf("failed to update entry in packed storage entry: {{err}}", err)
}
}

return s.PutBucket(bucket)
}

// NewStoragePacker creates a new storage packer for a given view
func NewStoragePacker(view logical.Storage, logger log.Logger, viewPrefix string) (*StoragePacker, error) {
if view == nil {
return nil, fmt.Errorf("nil view")
}

if viewPrefix == "" {
viewPrefix = StoragePackerBucketsPrefix
}

if !strings.HasSuffix(viewPrefix, "/") {
viewPrefix = viewPrefix + "/"
}

// Create a new packer object for the given view
packer := &StoragePacker{
view: view,
viewPrefix: viewPrefix,
logger: logger,
storageLocks: locksutil.CreateLocks(),
}

return packer, nil
type StoragePackerFactory func(context.Context, *Config) (StoragePacker, error)

type StoragePacker interface {
BucketsView() *logical.StorageView
BucketKey(string) string
GetCacheKey(string) string
BucketKeys(context.Context) ([]string, error)
GetBucket(context.Context, string, bool) (*LockedBucket, error)
DecodeBucket(*logical.StorageEntry) (*LockedBucket, error)
PutBucket(context.Context, *LockedBucket) error
DeleteBucket(context.Context, string) error
DeleteItem(context.Context, string) error
GetItem(context.Context, string) (*Item, error)
PutItem(context.Context, *Item) error
SetQueueMode(enabled bool)
FlushQueue(context.Context) error
}