Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[store] one hashset for child ref checks #7654

Merged
merged 3 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions go/libraries/doltcore/remotestorage/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,8 +808,9 @@ func (dcs *DoltChunkStore) errorIfDangling(ctx context.Context, addrs hash.HashS
// subsequent Get and Has calls, but must not be persistent until a call
// to Flush(). Put may be called concurrently with other calls to Put(),
// Get(), GetMany(), Has() and HasMany().
func (dcs *DoltChunkStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCb) error {
addrs, err := getAddrs(ctx, c)
func (dcs *DoltChunkStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCurry) error {
addrs := hash.NewHashSet()
err := getAddrs(c)(ctx, addrs)
if err != nil {
return err
}
Expand Down
10 changes: 8 additions & 2 deletions go/store/chunks/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,13 @@ const (

var ErrNothingToCollect = errors.New("no changes since last gc")

type GetAddrsCb func(ctx context.Context, c Chunk) (hash.HashSet, error)
// GetAddrsCurry returns a function that will add a chunk's child
// references to a HashSet. The intermediary lets us build a single
// HashSet per memTable.
type GetAddrsCurry func(c Chunk) GetAddrsCb

// GetAddrsCb adds the refs for a pre-specified chunk to |addrs|
type GetAddrsCb func(ctx context.Context, addrs hash.HashSet) error

// ChunkStore is the core storage abstraction in noms. We can put data
// anyplace we have a ChunkStore implementation for.
Expand All @@ -82,7 +88,7 @@ type ChunkStore interface {
// to Flush(). Put may be called concurrently with other calls to Put(),
// Get(), GetMany(), Has() and HasMany(). Will return an error if the
// addrs returned by `getAddrs` are absent from the chunk store.
Put(ctx context.Context, c Chunk, getAddrs GetAddrsCb) error
Put(ctx context.Context, c Chunk, getAddrs GetAddrsCurry) error

// Returns the NomsBinFormat with which this ChunkSource is compatible.
Version() string
Expand Down
13 changes: 9 additions & 4 deletions go/store/chunks/chunk_store_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ type ChunkStoreTestSuite struct {
Factory *memoryStoreFactory
}

func noopGetAddrs(ctx context.Context, c Chunk) (hash.HashSet, error) {
return nil, nil
func noopGetAddrs(c Chunk) GetAddrsCb {
return func(ctx context.Context, addrs hash.HashSet) error {
return nil
}
}

func (suite *ChunkStoreTestSuite) TestChunkStorePut() {
Expand All @@ -55,8 +57,11 @@ func (suite *ChunkStoreTestSuite) TestChunkStorePut() {
// Put chunk with dangling ref should error on Commit
data := []byte("bcd")
nc := NewChunk(data)
err = store.Put(ctx, nc, func(ctx context.Context, c Chunk) (hash.HashSet, error) {
return hash.NewHashSet(hash.Of([]byte("nonsense"))), nil
err = store.Put(ctx, nc, func(c Chunk) GetAddrsCb {
return func(ctx context.Context, addrs hash.HashSet) error {
addrs.Insert(hash.Of([]byte("nonsense")))
return nil
}
})
suite.NoError(err)
root, err := store.Root(ctx)
Expand Down
2 changes: 1 addition & 1 deletion go/store/chunks/cs_metrics_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (csMW *CSMetricWrapper) HasMany(ctx context.Context, hashes hash.HashSet) (
// subsequent Get and Has calls, but must not be persistent until a call
// to Flush(). Put may be called concurrently with other calls to Put(),
// Get(), GetMany(), Has() and HasMany().
func (csMW *CSMetricWrapper) Put(ctx context.Context, c Chunk, getAddrs GetAddrsCb) error {
func (csMW *CSMetricWrapper) Put(ctx context.Context, c Chunk, getAddrs GetAddrsCurry) error {
atomic.AddInt32(&csMW.TotalChunkPuts, 1)
return csMW.cs.Put(ctx, c, getAddrs)
}
Expand Down
6 changes: 4 additions & 2 deletions go/store/chunks/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,13 @@ func (ms *MemoryStoreView) errorIfDangling(ctx context.Context, addrs hash.HashS
return nil
}

func (ms *MemoryStoreView) Put(ctx context.Context, c Chunk, getAddrs GetAddrsCb) error {
func (ms *MemoryStoreView) Put(ctx context.Context, c Chunk, getAddrs GetAddrsCurry) error {
if err := ctx.Err(); err != nil {
return err
}
addrs, err := getAddrs(ctx, c)

addrs := hash.NewHashSet()
err := getAddrs(c)(ctx, addrs)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go/store/chunks/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *TestStoreView) HasMany(ctx context.Context, hashes hash.HashSet) (hash.
return s.ChunkStore.HasMany(ctx, hashes)
}

func (s *TestStoreView) Put(ctx context.Context, c Chunk, getAddrs GetAddrsCb) error {
func (s *TestStoreView) Put(ctx context.Context, c Chunk, getAddrs GetAddrsCurry) error {
atomic.AddInt32(&s.writes, 1)
return s.ChunkStore.Put(ctx, c, getAddrs)
}
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/benchmarks/block_store_benchmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func benchmarkNovelWrite(refreshStore storeOpenFn, src *dataSource, t assert.Tes
return true
}

func noopGetAddrs(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
return nil, nil
func noopGetAddrs(c chunks.Chunk) chunks.GetAddrsCb {
return func(_ context.Context, _ hash.HashSet) error { return nil }
}

func writeToEmptyStore(store chunks.ChunkStore, src *dataSource, t assert.TestingT) {
Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/benchmarks/file_block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (fb fileBlockStore) HasMany(ctx context.Context, hashes hash.HashSet) (pres
panic("not impl")
}

func (fb fileBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCb) error {
func (fb fileBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCurry) error {
_, err := io.Copy(fb.bw, bytes.NewReader(c.Data()))
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/benchmarks/null_block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (nb nullBlockStore) HasMany(ctx context.Context, hashes hash.HashSet) (pres
panic("not impl")
}

func (nb nullBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCb) error {
func (nb nullBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCurry) error {
return nil
}

Expand Down
13 changes: 9 additions & 4 deletions go/store/nbs/block_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ func (suite *BlockStoreSuite) TestChunkStoreNotDir() {
suite.Error(err)
}

func noopGetAddrs(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
return nil, nil
func noopGetAddrs(c chunks.Chunk) chunks.GetAddrsCb {
return func(ctx context.Context, addrs hash.HashSet) error {
return nil
}
}

func (suite *BlockStoreSuite) TestChunkStorePut() {
Expand Down Expand Up @@ -159,8 +161,11 @@ func (suite *BlockStoreSuite) TestChunkStorePut() {

// Put chunk with dangling ref should error on Commit
nc := chunks.NewChunk([]byte("bcd"))
err = suite.store.Put(context.Background(), nc, func(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
return hash.NewHashSet(hash.Of([]byte("lorem ipsum"))), nil
err = suite.store.Put(context.Background(), nc, func(c chunks.Chunk) chunks.GetAddrsCb {
return func(ctx context.Context, addrs hash.HashSet) error {
addrs.Insert(hash.Of([]byte("lorem ipsum")))
return nil
}
})
suite.NoError(err)
root, err := suite.store.Root(context.Background())
Expand Down
6 changes: 3 additions & 3 deletions go/store/nbs/generational_chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (gcs *GenerationalNBS) hasMany(recs []hasRecord) (absent hash.HashSet, err
// subsequent Get and Has calls, but must not be persistent until a call
// to Flush(). Put may be called concurrently with other calls to Put(),
// Get(), GetMany(), Has() and HasMany().
func (gcs *GenerationalNBS) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCb) error {
func (gcs *GenerationalNBS) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCurry) error {
return gcs.newGen.putChunk(ctx, c, getAddrs, gcs.hasMany)
}

Expand Down Expand Up @@ -306,8 +306,8 @@ func (gcs *GenerationalNBS) copyToOldGen(ctx context.Context, hashes hash.HashSe
var putErr error
err = gcs.newGen.GetMany(ctx, notInOldGen, func(ctx context.Context, chunk *chunks.Chunk) {
if putErr == nil {
putErr = gcs.oldGen.Put(ctx, *chunk, func(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
return nil, nil
putErr = gcs.oldGen.Put(ctx, *chunk, func(c chunks.Chunk) chunks.GetAddrsCb {
return func(ctx context.Context, addrs hash.HashSet) error { return nil }
})
}
})
Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/ghost_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (g GhostBlockStore) hasMany(hashes hash.HashSet) (absent hash.HashSet, err
return absent, nil
}

func (g GhostBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCb) error {
func (g GhostBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCurry) error {
panic("GhostBlockStore does not support Put")
}

Expand Down
5 changes: 5 additions & 0 deletions go/store/nbs/mem_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type memTable struct {
chunks map[hash.Hash][]byte
order []hasRecord // Must maintain the invariant that these are sorted by rec.order
pendingRefs []hasRecord
getChildAddrs []chunks.GetAddrsCb
maxData, totalData uint64

snapper snappyEncoder
Expand Down Expand Up @@ -111,6 +112,10 @@ func (mt *memTable) addChunk(h hash.Hash, data []byte) addChunkResult {
return chunkAdded
}

func (mt *memTable) addGetChildRefs(getAddrs chunks.GetAddrsCb) {
mt.getChildAddrs = append(mt.getChildAddrs, getAddrs)
}

func (mt *memTable) addChildRefs(addrs hash.HashSet) {
for h := range addrs {
h := h
Expand Down
8 changes: 2 additions & 6 deletions go/store/nbs/root_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ func TestChunkStoreVersion(t *testing.T) {

assert.Equal(constants.FormatLD1String, store.Version())
newChunk := chunks.NewChunk([]byte("new root"))
require.NoError(t, store.Put(context.Background(), newChunk, func(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
return nil, nil
}))
require.NoError(t, store.Put(context.Background(), newChunk, noopGetAddrs))
newRoot := newChunk.Hash()

if assert.True(store.Commit(context.Background(), newRoot, hash.Hash{})) {
Expand Down Expand Up @@ -213,9 +211,7 @@ func TestChunkStoreCommitOptimisticLockFail(t *testing.T) {
require.NoError(t, err)

newChunk := chunks.NewChunk([]byte("new root 2"))
require.NoError(t, store.Put(context.Background(), newChunk, func(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
return nil, nil
}))
require.NoError(t, store.Put(context.Background(), newChunk, noopGetAddrs))
newRoot2 := newChunk.Hash()
success, err := store.Commit(context.Background(), newRoot2, hash.Hash{})
require.NoError(t, err)
Expand Down
14 changes: 5 additions & 9 deletions go/store/nbs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,18 +717,14 @@ func (nbs *NomsBlockStore) waitForGC(ctx context.Context) error {
return ctx.Err()
}

func (nbs *NomsBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCb) error {
func (nbs *NomsBlockStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCurry) error {
return nbs.putChunk(ctx, c, getAddrs, nbs.hasMany)
}

func (nbs *NomsBlockStore) putChunk(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCb, checker refCheck) error {
func (nbs *NomsBlockStore) putChunk(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCurry, checker refCheck) error {
t1 := time.Now()
addrs, err := getAddrs(ctx, c)
if err != nil {
return err
}

success, err := nbs.addChunk(ctx, c, addrs, checker)
success, err := nbs.addChunk(ctx, c, getAddrs, checker)
if err != nil {
return err
} else if !success {
Expand Down Expand Up @@ -771,7 +767,7 @@ func (nbs *NomsBlockStore) addPendingRefsToHasCache() {
}
}

func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs hash.HashSet, checker refCheck) (bool, error) {
func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, getAddrs chunks.GetAddrsCurry, checker refCheck) (bool, error) {
if err := ctx.Err(); err != nil {
return false, err
}
Expand Down Expand Up @@ -808,7 +804,7 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, ch chunks.Chunk, addrs
}
}
if addChunkRes == chunkAdded {
nbs.mt.addChildRefs(addrs)
nbs.mt.addGetChildRefs(getAddrs(ch))
}
}

Expand Down
8 changes: 7 additions & 1 deletion go/store/nbs/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,13 @@ func TestNBSPruneTableFiles(t *testing.T) {

// add a chunk and flush to trigger a conjoin
c := chunks.NewChunk([]byte("it's a boy!"))
ok, err := st.addChunk(ctx, c, hash.NewHashSet(), st.hasMany)
addrs := hash.NewHashSet()
ok, err := st.addChunk(ctx, c, func(c chunks.Chunk) chunks.GetAddrsCb {
return func(ctx context.Context, _ hash.HashSet) error {
addrs.Insert(c.Hash())
return nil
}
}, st.hasMany)
require.NoError(t, err)
require.True(t, ok)
ok, err = st.Commit(ctx, st.upstream.root, st.upstream.root)
Expand Down
6 changes: 6 additions & 0 deletions go/store/nbs/table_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,12 @@ func (ts tableSet) Size() int {
// append adds a memTable to an existing tableSet, compacting |mt| and
// returning a new tableSet with newly compacted table added.
func (ts tableSet) append(ctx context.Context, mt *memTable, checker refCheck, hasCache *lru.TwoQueueCache[hash.Hash, struct{}], stats *Stats) (tableSet, error) {
addrs := hash.NewHashSet()
for _, getAddrs := range mt.getChildAddrs {
getAddrs(ctx, addrs)
}
mt.addChildRefs(addrs)

for i := range mt.pendingRefs {
if !mt.pendingRefs[i].has && hasCache.Contains(*mt.pendingRefs[i].a) {
mt.pendingRefs[i].has = true
Expand Down
15 changes: 8 additions & 7 deletions go/store/prolly/tree/node_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,14 @@ func (ns nodeStore) Write(ctx context.Context, nd Node) (hash.Hash, error) {
c := chunks.NewChunk(nd.bytes())
assertTrue(c.Size() > 0, "cannot write empty chunk to ChunkStore")

getAddrs := func(ctx context.Context, ch chunks.Chunk) (addrs hash.HashSet, err error) {
addrs = hash.NewHashSet()
err = message.WalkAddresses(ctx, ch.Data(), func(ctx context.Context, a hash.Hash) error {
addrs.Insert(a)
return nil
})
return
getAddrs := func(ch chunks.Chunk) chunks.GetAddrsCb {
return func(ctx context.Context, addrs hash.HashSet) (err error) {
err = message.WalkAddresses(ctx, ch.Data(), func(ctx context.Context, a hash.Hash) error {
addrs.Insert(a)
return nil
})
return
}
}

if err := ns.store.Put(ctx, c, getAddrs); err != nil {
Expand Down
6 changes: 4 additions & 2 deletions go/store/spec/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,10 @@ func (t *testProtocol) NewDatabase(sp Spec) (datas.Database, error) {
return datas.NewDatabase(cs), nil
}

func noopGetAddrs(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
return nil, nil
func noopGetAddrs(c chunks.Chunk) chunks.GetAddrsCb {
return func(ctx context.Context, addrs hash.HashSet) error {
return nil
}
}

func TestExternalProtocol(t *testing.T) {
Expand Down
9 changes: 5 additions & 4 deletions go/store/types/value_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ const (
gcState_Finalizing
)

func AddrsFromNomsValue(ctx context.Context, c chunks.Chunk, nbf *NomsBinFormat) (addrs hash.HashSet, err error) {
addrs = hash.NewHashSet()
func AddrsFromNomsValue(c chunks.Chunk, nbf *NomsBinFormat, addrs hash.HashSet) (err error) {
if NomsKind(c.Data()[0]) == SerialMessageKind {
err = SerialMessage(c.Data()).WalkAddrs(nbf, func(a hash.Hash) error {
addrs.Insert(a)
Expand All @@ -112,8 +111,10 @@ func AddrsFromNomsValue(ctx context.Context, c chunks.Chunk, nbf *NomsBinFormat)
return
}

func (lvs *ValueStore) getAddrs(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
return AddrsFromNomsValue(ctx, c, lvs.nbf)
func (lvs *ValueStore) getAddrs(c chunks.Chunk) chunks.GetAddrsCb {
return func(ctx context.Context, addrs hash.HashSet) error {
return AddrsFromNomsValue(c, lvs.nbf, addrs)
}
}

const (
Expand Down
11 changes: 7 additions & 4 deletions go/store/valuefile/file_value_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ func (f *FileValueStore) WriteValue(ctx context.Context, v types.Value) (types.R
return types.Ref{}, err
}

err = f.Put(ctx, c, func(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
return types.AddrsFromNomsValue(ctx, c, f.nbf)
err = f.Put(ctx, c, func(c chunks.Chunk) chunks.GetAddrsCb {
return func(ctx context.Context, addrs hash.HashSet) error {
return types.AddrsFromNomsValue(c, f.nbf, addrs)
}
})

if err != nil {
Expand Down Expand Up @@ -184,8 +186,9 @@ func (f *FileValueStore) errorIfDangling(ctx context.Context, addrs hash.HashSet
}

// Put puts a chunk into the store
func (f *FileValueStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCb) error {
addrs, err := getAddrs(ctx, c)
func (f *FileValueStore) Put(ctx context.Context, c chunks.Chunk, getAddrs chunks.GetAddrsCurry) error {
addrs := hash.NewHashSet()
err := getAddrs(c)(ctx, addrs)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions go/store/valuefile/value_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,8 @@ func read(ctx context.Context, rd io.Reader) (hash.Hash, *FileValueStore, error)
return hash.Hash{}, nil, errors.New("data corrupted")
}

err = store.Put(ctx, ch, func(ctx context.Context, c chunks.Chunk) (hash.HashSet, error) {
return nil, nil
err = store.Put(ctx, ch, func(c chunks.Chunk) chunks.GetAddrsCb {
return func(_ context.Context, _ hash.HashSet) error { return nil }
})

if err != nil {
Expand Down