Skip to content

Commit

Permalink
Merge pull request #4832 from dolthub/taylor/shallow-gc
Browse files Browse the repository at this point in the history
Online shallow gc
  • Loading branch information
tbantle22 committed Nov 23, 2022
2 parents 5be1532 + cab0997 commit 1e7287e
Show file tree
Hide file tree
Showing 20 changed files with 317 additions and 36 deletions.
6 changes: 3 additions & 3 deletions go/libraries/doltcore/doltdb/doltdb.go
Expand Up @@ -1149,8 +1149,8 @@ func (ddb *DoltDB) Rebase(ctx context.Context) error {
return datas.ChunkStoreFromDatabase(ddb.db).Rebase(ctx)
}

// GC performs garbage collection on this ddb. Values passed in |uncommitedVals| will be temporarily saved during gc.
func (ddb *DoltDB) GC(ctx context.Context, uncommitedVals ...hash.Hash) error {
// GC performs garbage collection on this ddb. Values passed in |uncommittedVals| will be temporarily saved during gc.
func (ddb *DoltDB) GC(ctx context.Context, uncommittedVals ...hash.Hash) error {
collector, ok := ddb.db.Database.(datas.GarbageCollector)
if !ok {
return fmt.Errorf("this database does not support garbage collection")
Expand All @@ -1165,7 +1165,7 @@ func (ddb *DoltDB) GC(ctx context.Context, uncommitedVals ...hash.Hash) error {
if err != nil {
return err
}
newGen := hash.NewHashSet(uncommitedVals...)
newGen := hash.NewHashSet(uncommittedVals...)
oldGen := make(hash.HashSet)
err = datasets.IterAll(ctx, func(keyStr string, h hash.Hash) error {
var isOldGen bool
Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/env/environment.go
Expand Up @@ -1129,7 +1129,7 @@ func (dEnv *DoltEnv) TempTableFilesDir() (string, error) {
return absPath, nil
}

// GetGCKeepers returns the hashes of all the objects in the environment provided that should be perserved during GC.
// GetGCKeepers returns the hashes of all the objects in the environment provided that should be preserved during GC.
// TODO: this should be unnecessary since we now store the working set in a noms dataset, remove it
func GetGCKeepers(ctx context.Context, env *DoltEnv) ([]hash.Hash, error) {
workingRoot, err := env.WorkingRoot(ctx)
Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/remotesrv/grpc.go
Expand Up @@ -388,7 +388,7 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe
err = cs.Rebase(ctx)

if err != nil {
logger.Printf("error occurred during processing of Rebace rpc of %s details: %v", repoPath, err)
logger.Printf("error occurred during processing of Rebase rpc of %s details: %v", repoPath, err)
return nil, status.Errorf(codes.Internal, "failed to rebase: %v", err)
}

Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/remotestorage/chunk_store.go
Expand Up @@ -1136,7 +1136,7 @@ func (dcs *DoltChunkStore) downloadChunks(ctx context.Context, dlLocs dlLocation
return concurrentExec(work[0:largeCutoff+1], dcs.concurrency.ConcurrentLargeFetches)
})
eg.Go(func() error {
return concurrentExec(work[largeCutoff+1:len(work)], dcs.concurrency.ConcurrentSmallFetches)
return concurrentExec(work[largeCutoff+1:], dcs.concurrency.ConcurrentSmallFetches)
})

defer func() {
Expand Down
62 changes: 62 additions & 0 deletions go/libraries/doltcore/sqle/dprocedures/dolt_gc.go
@@ -0,0 +1,62 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dprocedures

import (
"fmt"

"github.com/dolthub/go-mysql-server/sql"

"github.com/dolthub/dolt/go/libraries/doltcore/branch_control"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
)

const (
cmdFailure = 0
cmdSuccess = 1
)

// doltGC is the stored procedure to run online garbage collection on a database.
func doltGC(ctx *sql.Context, args ...string) (sql.RowIter, error) {
res, err := doDoltGC(ctx, args)
if err != nil {
return nil, err
}
return rowToIter(int64(res)), nil
}

func doDoltGC(ctx *sql.Context, args []string) (int, error) {
dbName := ctx.GetCurrentDatabase()

if len(dbName) == 0 {
return cmdFailure, fmt.Errorf("Empty database name.")
}
if err := branch_control.CheckAccess(ctx, branch_control.Permissions_Write); err != nil {
return cmdFailure, err
}

dSess := dsess.DSessFromSess(ctx.Session)
ddb, ok := dSess.GetDoltDB(ctx, dbName)
if !ok {
return cmdFailure, fmt.Errorf("Could not load database %s", dbName)
}

err := ddb.ShallowGC(ctx)
if err != nil {
return cmdFailure, err
}

return cmdSuccess, nil
}
2 changes: 2 additions & 0 deletions go/libraries/doltcore/sqle/dprocedures/init.go
Expand Up @@ -26,6 +26,7 @@ var DoltProcedures = []sql.ExternalStoredProcedureDetails{
{Name: "dolt_commit", Schema: stringSchema("hash"), Function: doltCommit},
{Name: "dolt_conflicts_resolve", Schema: int64Schema("status"), Function: doltConflictsResolve},
{Name: "dolt_fetch", Schema: int64Schema("success"), Function: doltFetch},
{Name: "dolt_gc", Schema: int64Schema("success"), Function: doltGC},
{Name: "dolt_merge", Schema: int64Schema("fast_forward", "conflicts"), Function: doltMerge},
{Name: "dolt_pull", Schema: int64Schema("fast_forward", "conflicts"), Function: doltPull},
{Name: "dolt_push", Schema: int64Schema("success"), Function: doltPush},
Expand All @@ -43,6 +44,7 @@ var DoltProcedures = []sql.ExternalStoredProcedureDetails{
{Name: "dclone", Schema: int64Schema("status"), Function: doltClone},
{Name: "dcommit", Schema: stringSchema("hash"), Function: doltCommit},
{Name: "dfetch", Schema: int64Schema("success"), Function: doltFetch},
{Name: "dgc", Schema: int64Schema("status"), Function: doltGC},
{Name: "dmerge", Schema: int64Schema("fast_forward", "conflicts"), Function: doltMerge},
{Name: "dpull", Schema: int64Schema("fast_forward", "conflicts"), Function: doltPull},
{Name: "dpush", Schema: int64Schema("success"), Function: doltPush},
Expand Down
11 changes: 10 additions & 1 deletion go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go
Expand Up @@ -46,7 +46,7 @@ var skipPrepared bool
// SkipPreparedsCount is used by the "ci-check-repo CI workflow
// as a reminder to consider prepareds when adding a new
// enginetest suite.
const SkipPreparedsCount = 84
const SkipPreparedsCount = 85

const skipPreparedFlag = "DOLT_SKIP_PREPARED_ENGINETESTS"

Expand Down Expand Up @@ -973,6 +973,15 @@ func TestDoltReset(t *testing.T) {
}
}

func TestDoltGC(t *testing.T) {
// TODO: This does not work because `db.chunkStore().(nbs.TableFileStore)`
// returns not ok in PruneTableFiles
t.Skip()
for _, script := range DoltGC {
enginetest.TestScript(t, newDoltHarness(t), script)
}
}

func TestDoltBranch(t *testing.T) {
for _, script := range DoltBranchScripts {
enginetest.TestScript(t, newDoltHarness(t), script)
Expand Down
28 changes: 28 additions & 0 deletions go/libraries/doltcore/sqle/enginetest/dolt_queries.go
Expand Up @@ -3914,6 +3914,34 @@ var DoltReset = []queries.ScriptTest{
},
}

func gcSetup() []string {
queries := []string{
"create table t (pk int primary key);",
"call dolt_commit('-Am', 'create table');",
}
for i := 0; i < 250; i++ {
queries = append(
queries,
fmt.Sprintf("INSERT INTO t VALUES (%d);", i),
fmt.Sprintf("CALL DOLT_COMMIT('-am', 'added pk %d')", i),
)
}
return queries
}

var DoltGC = []queries.ScriptTest{
{
Name: "base case: shallow gc",
SetUpScript: gcSetup(),
Assertions: []queries.ScriptTestAssertion{
{
Query: "CALL DOLT_GC();",
Expected: []sql.Row{{0}},
},
},
},
}

var DiffSystemTableScriptTests = []queries.ScriptTest{
{
Name: "base case: added rows",
Expand Down
10 changes: 5 additions & 5 deletions go/store/datas/pull/pull_test.go
Expand Up @@ -547,27 +547,27 @@ func TestClone(t *testing.T) {
src := &TestTableFileStore{
root: hash.Of(hashBytes[:]),
tableFiles: map[string]*TestTableFile{
"file1": &TestTableFile{
"file1": {
fileID: "file1",
numChunks: 1,
data: []byte("Call me Ishmael. Some years ago—never mind how long precisely—having little or no money in my purse, "),
},
"file2": &TestTableFile{
"file2": {
fileID: "file2",
numChunks: 2,
data: []byte("and nothing particular to interest me on shore, I thought I would sail about a little and see the watery "),
},
"file3": &TestTableFile{
"file3": {
fileID: "file3",
numChunks: 3,
data: []byte("part of the world. It is a way I have of driving off the spleen and regulating the "),
},
"file4": &TestTableFile{
"file4": {
fileID: "file4",
numChunks: 4,
data: []byte("circulation. Whenever I find myself growing grim about the mouth; whenever it is a damp, drizzly "),
},
"file5": &TestTableFile{
"file5": {
fileID: "file5",
numChunks: 5,
data: []byte("November in my soul; whenever I find myself involuntarily pausing before coffin warehouses, and bringing "),
Expand Down
33 changes: 33 additions & 0 deletions go/store/nbs/aws_chunk_source.go
Expand Up @@ -27,6 +27,39 @@ import (
"time"
)

func tableExistsInChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (bool, error) {
idxSz := int(indexSize(chunkCount) + footerSize)
offsetSz := int((chunkCount - (chunkCount / 2)) * offsetSize)
buf, err := q.AcquireQuotaBytes(ctx, uint64(idxSz+offsetSz))
if err != nil {
return false, err
}
p := buf[:idxSz]

if al.tableMayBeInDynamo(chunkCount) {
data, err := ddb.ReadTable(ctx, name, nil)
if err != nil {
return false, err
}
if data == nil {
return false, nil
}
if len(p) > len(data) {
return false, errors.New("not enough data for chunk count")
}
return true, nil
}

n, _, err := s3.ReadFromEnd(ctx, name, p, stats)
if err != nil {
return false, err
}
if len(p) != n {
return false, errors.New("failed to read all data")
}
return true, nil
}

func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
var tra tableReaderAt
index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error {
Expand Down
15 changes: 14 additions & 1 deletion go/store/nbs/aws_table_persister.go
Expand Up @@ -93,6 +93,19 @@ func (s3p awsTablePersister) Open(ctx context.Context, name addr, chunkCount uin
)
}

func (s3p awsTablePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
return tableExistsInChunkSource(
ctx,
s3p.ddb,
&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns},
s3p.limits,
name,
chunkCount,
s3p.q,
stats,
)
}

type s3UploadedPart struct {
idx int64
etag string
Expand Down Expand Up @@ -572,6 +585,6 @@ func (s3p awsTablePersister) uploadPart(ctx context.Context, data []byte, key, u
return
}

func (s3p awsTablePersister) PruneTableFiles(ctx context.Context, contents manifestContents) error {
func (s3p awsTablePersister) PruneTableFiles(ctx context.Context, contents manifestContents, t time.Time) error {
return chunks.ErrUnsupportedOperation
}
11 changes: 8 additions & 3 deletions go/store/nbs/bs_persister.go
Expand Up @@ -17,6 +17,7 @@ package nbs
import (
"context"
"io"
"time"

"github.com/dolthub/dolt/go/store/blobstore"
"github.com/dolthub/dolt/go/store/chunks"
Expand Down Expand Up @@ -59,7 +60,11 @@ func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSour

// Open a table named |name|, containing |chunkCount| chunks.
func (bsp *blobstorePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
return newBSChunkSource(ctx, bsp.bs, name, chunkCount, bsp.blockSize, bsp.q, stats)
return newBSChunkSource(ctx, bsp.bs, name, chunkCount, bsp.q, stats)
}

func (bsp *blobstorePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
return bsp.bs.Exists(ctx, name.String())
}

type bsTableReaderAt struct {
Expand Down Expand Up @@ -95,7 +100,7 @@ func (bsTRA *bsTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off
return totalRead, nil
}

func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, chunkCount uint32, blockSize uint64, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {

index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error {
rc, _, err := bs.Get(ctx, name.String(), blobstore.NewBlobRange(-int64(len(p)), 0))
Expand Down Expand Up @@ -123,6 +128,6 @@ func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, ch
return &chunkSourceAdapter{tr, name}, nil
}

func (bsp *blobstorePersister) PruneTableFiles(ctx context.Context, contents manifestContents) error {
func (bsp *blobstorePersister) PruneTableFiles(ctx context.Context, contents manifestContents, t time.Time) error {
return chunks.ErrUnsupportedOperation
}
18 changes: 17 additions & 1 deletion go/store/nbs/file_table_persister.go
Expand Up @@ -54,6 +54,10 @@ func (ftp *fsTablePersister) Open(ctx context.Context, name addr, chunkCount uin
return newFileTableReader(ctx, ftp.dir, name, chunkCount, ftp.q, ftp.fc)
}

func (ftp *fsTablePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
return tableFileExists(ctx, ftp.dir, name)
}

func (ftp *fsTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) {
t1 := time.Now()
defer stats.PersistLatency.SampleTimeSince(t1)
Expand Down Expand Up @@ -185,7 +189,7 @@ func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource
return ftp.Open(ctx, name, plan.chunkCount, stats)
}

func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, contents manifestContents) error {
func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, contents manifestContents, mtime time.Time) error {
ss := contents.getSpecSet()

fileInfos, err := os.ReadDir(ftp.dir)
Expand Down Expand Up @@ -229,6 +233,18 @@ func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, contents manif
continue // file is referenced in the manifest
}

i, err := info.Info()

if err != nil {
ea.add(filePath, err)
}

ctime := i.ModTime()

if ctime.After(mtime) {
continue // file has been updated more recently than manifest
}

err = file.Remove(filePath)
if err != nil {
ea.add(filePath, err)
Expand Down
11 changes: 11 additions & 0 deletions go/store/nbs/file_table_reader.go
Expand Up @@ -41,6 +41,17 @@ const (
fileBlockSize = 1 << 12
)

func tableFileExists(ctx context.Context, dir string, h addr) (bool, error) {
path := filepath.Join(dir, h.String())
_, err := os.Stat(path)

if os.IsNotExist(err) {
return false, nil
}

return err == nil, err
}

func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint32, q MemoryQuotaProvider, fc *fdCache) (cs chunkSource, err error) {
path := filepath.Join(dir, h.String())

Expand Down

0 comments on commit 1e7287e

Please sign in to comment.