Skip to content

Commit

Permalink
fix(share/eds): add custom inverted index for badger as storage backe…
Browse files Browse the repository at this point in the history
…nd (#2053)

This PR closes #2045, more specifically it closes #2057 . It introduces
a custom inverted index plugin for the dagstore that is more compatible
with our usage of badger as a storage backend, as well as more
compatible with our usecase (we only need to index one shard per
multihash).

The issue with the previous implementation was that many keys were being
updated very often. This is probably due to padding, but there is more
investigation necessary to determine why we have so many CIDs that
appear in a lot of shards. Because we use badger as a storage backend,
these updates were filling the value log, duplicating the value and
adding an extra shard key every time. This resulted in explosive data
usage on blockspacerace. At the time of writing, without this fix, the
size of the store balloons past 40gb (even more depending on how lucky
you are with garbage collection), even though the size of the underlying
data is ~4gb and almost all header storage.

GC was effective in cleaning up the value log to a point, until they
were cleaned to a point where they no longer pass the GC ratio of new to
old data (20% is the default from go-badger-ds). This means that the old
update logs would just accumulate and not be cleaned further once they
are old enough.

In this graph, the lines including the fix finished syncing much earlier
(unrelated issue), but you can see the data usage stays low and
increases monotonically. Without the fix, you can see the explosion of
data and the effects of garbage collection. You can also see the size of
the /blocks/ storage is not much in comparison to the /data/. This gives
us more time to refine the approach in #2038.

![image](https://user-images.githubusercontent.com/16523232/231383399-450d6cc5-4263-4cad-a1cd-c4a7bf043488.png)

Upon investigation of which CIDs are being stored in multiple shards, we
see that the overwhelming majority only point to a single shard. (The
value is an array of shard keys)

![image](https://user-images.githubusercontent.com/16523232/231383998-d299a889-b558-4f1c-8a77-e72f97c8910d.png)

By removing the first two buckets, we can see there are still a
significant amount of CIDs shared between multiple shards. A lot of
these CIDs probably just represent blocks that make up padding, but
further investigation is needed to see why there are so many.

![image](https://user-images.githubusercontent.com/16523232/231384457-d820e840-d474-4c4b-a323-8cecd7136ec3.png)

---------

Co-authored-by: rene <41963722+renaynay@users.noreply.github.com>
  • Loading branch information
distractedm1nd and renaynay committed Apr 13, 2023
1 parent e264069 commit 24ea082
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 1 deletion.
88 changes: 88 additions & 0 deletions share/eds/inverted_index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package eds

import (
"context"
"encoding/json"
"fmt"

"github.com/filecoin-project/dagstore/index"
"github.com/filecoin-project/dagstore/shard"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/multiformats/go-multihash"
)

// simpleInvertedIndex is an inverted index that only stores a single shard key per multihash. Its
// implementation is modified from the default upstream implementation in dagstore/index.
type simpleInvertedIndex struct {
ds ds.Batching
}

// newSimpleInvertedIndex returns a new inverted index that only stores a single shard key per
// multihash. This is because we use badger as a storage backend, so updates are expensive, and we
// don't care which shard is used to serve a cid.
func newSimpleInvertedIndex(dts ds.Batching) *simpleInvertedIndex {
return &simpleInvertedIndex{
ds: namespace.Wrap(dts, ds.NewKey("/inverted/index")),
}
}

func (s *simpleInvertedIndex) AddMultihashesForShard(
ctx context.Context,
mhIter index.MultihashIterator,
sk shard.Key,
) error {
// in the original implementation, a mutex is used here to prevent unnecessary updates to the
// key. The amount of extra data produced by this is negligible, and the performance benefits
// from removing the lock are significant (indexing is a hot path during sync).
batch, err := s.ds.Batch(ctx)
if err != nil {
return fmt.Errorf("failed to create ds batch: %w", err)
}

if err := mhIter.ForEach(func(mh multihash.Multihash) error {
key := ds.NewKey(string(mh))
ok, err := s.ds.Has(ctx, key)
if err != nil {
return fmt.Errorf("failed to check if value for multihash exists %s, err: %w", mh, err)
}

if !ok {
bz, err := json.Marshal(sk)
if err != nil {
return fmt.Errorf("failed to marshal shard key to bytes: %w", err)
}
if err := batch.Put(ctx, key, bz); err != nil {
return fmt.Errorf("failed to put mh=%s, err=%w", mh, err)
}
}

return nil
}); err != nil {
return fmt.Errorf("failed to add index entry: %w", err)
}

if err := batch.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}

if err := s.ds.Sync(ctx, ds.Key{}); err != nil {
return fmt.Errorf("failed to sync puts: %w", err)
}
return nil
}

func (s *simpleInvertedIndex) GetShardsForMultihash(ctx context.Context, mh multihash.Multihash) ([]shard.Key, error) {
key := ds.NewKey(string(mh))
sbz, err := s.ds.Get(ctx, key)
if err != nil {
return nil, fmt.Errorf("failed to lookup index for mh %s, err: %w", mh, err)
}

var shardKey shard.Key
if err := json.Unmarshal(sbz, &shardKey); err != nil {
return nil, fmt.Errorf("failed to unmarshal shard key for mh=%s, err=%w", mh, err)
}

return []shard.Key{shardKey}, nil
}
55 changes: 55 additions & 0 deletions share/eds/inverted_index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package eds

import (
"context"
"testing"

"github.com/filecoin-project/dagstore/shard"
"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
)

type mockIterator struct {
mhs []multihash.Multihash
}

func (m *mockIterator) ForEach(f func(mh multihash.Multihash) error) error {
for _, mh := range m.mhs {
if err := f(mh); err != nil {
return err
}
}
return nil
}

// TestMultihashesForShard ensures that the inverted index correctly stores a single shard key per duplicate multihash
func TestMultihashesForShard(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

mhs := []multihash.Multihash{
multihash.Multihash("mh1"),
multihash.Multihash("mh2"),
multihash.Multihash("mh3"),
}

mi := &mockIterator{mhs: mhs}
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
invertedIndex := newSimpleInvertedIndex(ds)

// 1. Add all 3 multihashes to shard1
err := invertedIndex.AddMultihashesForShard(ctx, mi, shard.KeyFromString("shard1"))
require.NoError(t, err)
shardKeys, err := invertedIndex.GetShardsForMultihash(ctx, mhs[0])
require.NoError(t, err)
require.Equal(t, []shard.Key{shard.KeyFromString("shard1")}, shardKeys)

// 2. Add mh1 to shard2, and ensure that mh1 still points to shard1
err = invertedIndex.AddMultihashesForShard(ctx, &mockIterator{mhs: mhs[:1]}, shard.KeyFromString("shard2"))
require.NoError(t, err)
shardKeys, err = invertedIndex.GetShardsForMultihash(ctx, mhs[0])
require.NoError(t, err)
require.Equal(t, []shard.Key{shard.KeyFromString("shard1")}, shardKeys)
}
2 changes: 1 addition & 1 deletion share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) {
return nil, fmt.Errorf("failed to create index repository: %w", err)
}

invertedRepo := index.NewInverted(ds)
invertedRepo := newSimpleInvertedIndex(ds)
dagStore, err := dagstore.NewDAGStore(
dagstore.Config{
TransientsDir: basepath + transientsPath,
Expand Down

0 comments on commit 24ea082

Please sign in to comment.