Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minor fixes and improvements for boltdb shipper #2530

Merged
merged 1 commit into from
Aug 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pkg/storage/stores/shipper/uploads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"

Expand All @@ -26,6 +27,9 @@ const (
// retain dbs for specified duration after they are modified to avoid keeping them locally forever.
// this period should be big enough than shardDBsByDuration to avoid any conflicts
dbRetainPeriod = time.Hour

// a snapshot file is created during uploads with name of the db + snapshotFileSuffix
snapshotFileSuffix = ".temp"
)

type BoltDBIndexClient interface {
Expand Down Expand Up @@ -219,7 +223,7 @@ func (lt *Table) Upload(ctx context.Context) error {
func (lt *Table) uploadDB(ctx context.Context, name string, db *bbolt.DB) error {
level.Debug(util.Logger).Log("msg", fmt.Sprintf("uploading db %s from table %s", name, lt.name))

filePath := path.Join(lt.path, fmt.Sprintf("%s.%s", name, "temp"))
filePath := path.Join(lt.path, fmt.Sprintf("%s%s", name, snapshotFileSuffix))
f, err := os.Create(filePath)
if err != nil {
return err
Expand Down Expand Up @@ -319,6 +323,15 @@ func loadBoltDBsFromDir(dir string) (map[string]*bbolt.DB, error) {
continue
}

if strings.HasSuffix(fileInfo.Name(), snapshotFileSuffix) {
// If an ingester is killed abruptly in the middle of an upload operation it could leave out a temp file which holds the snapshot of db for uploading.
// Cleaning up those temp files to avoid problems.
if err := os.Remove(filepath.Join(dir, fileInfo.Name())); err != nil {
level.Error(util.Logger).Log("msg", "failed to remove temp file", "name", fileInfo.Name(), "err", err)
}
continue
}

db, err := local.OpenBoltdbFile(filepath.Join(dir, fileInfo.Name()))
if err != nil {
return nil, err
Expand Down
33 changes: 14 additions & 19 deletions pkg/storage/stores/shipper/uploads/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ func (tm *TableManager) loop() {
for {
select {
case <-syncTicker.C:
err := tm.uploadTables(context.Background())
if err != nil {
level.Error(pkg_util.Logger).Log("msg", "error uploading local boltdb files to the storage", "err", err)
}
tm.uploadTables(context.Background())
case <-tm.ctx.Done():
return
}
Expand All @@ -87,10 +84,7 @@ func (tm *TableManager) Stop() {
tm.cancel()
tm.wg.Wait()

err := tm.uploadTables(context.Background())
if err != nil {
level.Error(pkg_util.Logger).Log("msg", "error uploading local boltdb files to the storage before stopping", "err", err)
}
tm.uploadTables(context.Background())
}

func (tm *TableManager) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
Expand Down Expand Up @@ -157,24 +151,20 @@ func (tm *TableManager) getOrCreateTable(tableName string) (*Table, error) {
return table, nil
}

func (tm *TableManager) uploadTables(ctx context.Context) (err error) {
func (tm *TableManager) uploadTables(ctx context.Context) {
tm.tablesMtx.RLock()
defer tm.tablesMtx.RUnlock()

level.Info(pkg_util.Logger).Log("msg", "uploading tables")

defer func() {
status := statusSuccess
if err != nil {
status = statusFailure
}
tm.metrics.tablesUploadOperationTotal.WithLabelValues(status).Inc()
}()

status := statusSuccess
for _, table := range tm.tables {
err := table.Upload(ctx)
if err != nil {
return err
// continue uploading other tables while skipping cleanup for a failed one.
status = statusFailure
level.Error(pkg_util.Logger).Log("msg", "failed to upload dbs", "table", table.name, "err", err)
continue
}

// cleanup unwanted dbs from the table
Expand All @@ -185,7 +175,7 @@ func (tm *TableManager) uploadTables(ctx context.Context) (err error) {
}
}

return
tm.metrics.tablesUploadOperationTotal.WithLabelValues(status).Inc()
}

func (tm *TableManager) loadTables() (map[string]*Table, error) {
Expand Down Expand Up @@ -236,6 +226,11 @@ func (tm *TableManager) loadTables() (map[string]*Table, error) {
}

if table == nil {
// if table is nil it means it has no files in it so remove the folder for that table.
err := os.Remove(filepath.Join(tm.cfg.IndexDir, fileInfo.Name()))
if err != nil {
level.Error(pkg_util.Logger).Log("msg", "failed to remove empty table folder", "table", fileInfo.Name(), "err", err)
}
continue
}

Expand Down
39 changes: 39 additions & 0 deletions pkg/storage/stores/shipper/uploads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,42 @@ func TestTable_Cleanup(t *testing.T) {
require.NoError(t, err)
}
}

func Test_LoadBoltDBsFromDir(t *testing.T) {
indexPath, err := ioutil.TempDir("", "load-dbs-from-dir")
require.NoError(t, err)

defer func() {
require.NoError(t, os.RemoveAll(indexPath))
}()

// setup some dbs with a snapshot file.
tablePath := testutil.SetupDBTablesAtPath(t, "test-table", indexPath, map[string]testutil.DBRecords{
"db1": {
Start: 0,
NumRecords: 10,
},
"db1" + snapshotFileSuffix: { // a snapshot file which should be ignored.
Start: 0,
NumRecords: 10,
},
"db2": {
Start: 10,
NumRecords: 10,
},
})

// try loading the dbs
dbs, err := loadBoltDBsFromDir(tablePath)
require.NoError(t, err)

// check that we have just 2 dbs
require.Len(t, dbs, 2)
require.NotNil(t, dbs["db1"])
require.NotNil(t, dbs["db2"])

// close all the open dbs
for _, boltdb := range dbs {
require.NoError(t, boltdb.Close())
}
}