Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compress boltdb files to gzip while uploading from shipper #2507

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"go.etcd.io/bbolt"

"github.com/grafana/loki/pkg/chunkenc"
)

// timeout for downloading initial files for a table to avoid leaking resources by allowing it to take all the time.
Expand Down Expand Up @@ -391,7 +393,15 @@ func (t *Table) getFileFromStorage(ctx context.Context, objectKey, destination s
return err
}

_, err = io.Copy(f, readCloser)
var objectReader io.Reader = readCloser
if strings.HasSuffix(objectKey, ".gz") {
decompressedReader := chunkenc.Gzip.GetReader(readCloser)
defer chunkenc.Gzip.PutReader(decompressedReader)

objectReader = decompressedReader
}

_, err = io.Copy(f, objectReader)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/downloads/table_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestTableManager_QueryPages(t *testing.T) {
var queries []chunk.IndexQuery
for name, dbs := range tables {
queries = append(queries, chunk.IndexQuery{TableName: name})
testutil.SetupDBTablesAtPath(t, name, objectStoragePath, dbs)
testutil.SetupDBTablesAtPath(t, name, objectStoragePath, dbs, true)
}

tableManager, _, stopFunc := buildTestTableManager(t, tempDir)
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/stores/shipper/downloads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestTable_Query(t *testing.T) {
},
}

testutil.SetupDBTablesAtPath(t, "test", objectStoragePath, testDBs)
testutil.SetupDBTablesAtPath(t, "test", objectStoragePath, testDBs, true)

table, _, stopFunc := buildTestTable(t, "test", tempDir)
defer func() {
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestTable_Sync(t *testing.T) {
}

// setup the table in storage with some records
testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, testDBs)
testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, testDBs, false)

// create table instance
table, boltdbClient, stopFunc := buildTestTable(t, "test", tempDir)
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestTable_doParallelDownload(t *testing.T) {
}
}

testutil.SetupDBTablesAtPath(t, fmt.Sprint(tc), objectStoragePath, testDBs)
testutil.SetupDBTablesAtPath(t, fmt.Sprint(tc), objectStoragePath, testDBs, true)

table, _, stopFunc := buildTestTable(t, fmt.Sprint(tc), tempDir)
defer func() {
Expand Down
29 changes: 28 additions & 1 deletion pkg/storage/stores/shipper/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package testutil

import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"sync"
Expand All @@ -10,6 +13,7 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
"github.com/klauspost/compress/gzip"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)
Expand Down Expand Up @@ -131,7 +135,7 @@ type DBRecords struct {
Start, NumRecords int
}

func SetupDBTablesAtPath(t *testing.T, tableName, path string, dbs map[string]DBRecords) string {
func SetupDBTablesAtPath(t *testing.T, tableName, path string, dbs map[string]DBRecords, compressRandomFiles bool) string {
boltIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: path})
require.NoError(t, err)

Expand All @@ -140,9 +144,32 @@ func SetupDBTablesAtPath(t *testing.T, tableName, path string, dbs map[string]DB
tablePath := filepath.Join(path, tableName)
require.NoError(t, chunk_util.EnsureDirectory(tablePath))

var i int
for name, dbRecords := range dbs {
AddRecordsToDB(t, filepath.Join(tablePath, name), boltIndexClient, dbRecords.Start, dbRecords.NumRecords)
if compressRandomFiles && i%2 == 0 {
compressFile(t, filepath.Join(tablePath, name))
}
i++
}

return tablePath
}

func compressFile(t *testing.T, filepath string) {
uncompressedFile, err := os.Open(filepath)
require.NoError(t, err)

compressedFile, err := os.Create(fmt.Sprintf("%s.gz", filepath))
require.NoError(t, err)

compressedWriter := gzip.NewWriter(compressedFile)

_, err = io.Copy(compressedWriter, uncompressedFile)
require.NoError(t, err)

require.NoError(t, compressedWriter.Close())
require.NoError(t, uncompressedFile.Close())
require.NoError(t, compressedFile.Close())
require.NoError(t, os.Remove(filepath))
}
20 changes: 16 additions & 4 deletions pkg/storage/stores/shipper/uploads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"go.etcd.io/bbolt"

"github.com/grafana/loki/pkg/chunkenc"
)

const (
Expand Down Expand Up @@ -235,9 +237,19 @@ func (lt *Table) uploadDB(ctx context.Context, name string, db *bbolt.DB) error
}
}()

err = db.View(func(tx *bbolt.Tx) error {
_, err := tx.WriteTo(f)
return err
err = db.View(func(tx *bbolt.Tx) (err error) {
compressedWriter := chunkenc.Gzip.GetWriter(f)
defer chunkenc.Gzip.PutWriter(compressedWriter)

defer func() {
cerr := compressedWriter.Close()
if err == nil {
err = cerr
}
}()

_, err = tx.WriteTo(compressedWriter)
return
})
if err != nil {
return err
Expand Down Expand Up @@ -304,7 +316,7 @@ func (lt *Table) buildObjectKey(dbName string) string {
objectKey = fmt.Sprintf("%s/%s", lt.name, lt.uploader)
}

return objectKey
return fmt.Sprintf("%s.gz", objectKey)
}

func loadBoltDBsFromDir(dir string) (map[string]*bbolt.DB, error) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/stores/shipper/uploads/table_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package uploads

import (
"context"
"github.com/cortexproject/cortex/pkg/chunk/local"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
"github.com/grafana/loki/pkg/storage/stores/shipper/testutil"
"github.com/stretchr/testify/require"
)
Expand All @@ -27,7 +27,7 @@ func buildTestTableManager(t *testing.T, testDir string) (*TableManager, *local.
IndexDir: indexPath,
UploadInterval: time.Hour,
}
tm, err := NewTableManager(cfg, boltDBIndexClient, storageClient, nil)
tm, err := NewTableManager(cfg, boltDBIndexClient, storageClient, nil)
require.NoError(t, err)

return tm, boltDBIndexClient, func() {
Expand Down Expand Up @@ -64,7 +64,7 @@ func TestLoadTables(t *testing.T) {
Start: 20,
NumRecords: 10,
},
})
}, false)

// table2 with 2 dbs
testutil.SetupDBTablesAtPath(t, "table2", indexPath, map[string]testutil.DBRecords{
Expand All @@ -76,7 +76,7 @@ func TestLoadTables(t *testing.T) {
Start: 40,
NumRecords: 10,
},
})
}, false)

expectedTables := map[string]struct {
start, numRecords int
Expand Down
41 changes: 37 additions & 4 deletions pkg/storage/stores/shipper/uploads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ package uploads
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"

"github.com/grafana/loki/pkg/storage/stores/shipper/testutil"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
"github.com/klauspost/compress/gzip"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/stores/shipper/testutil"
)

const (
Expand Down Expand Up @@ -74,7 +76,7 @@ func TestLoadTable(t *testing.T) {
Start: 10,
NumRecords: 10,
},
})
}, false)

// try loading the table.
table, err := LoadTable(tablePath, "test", nil, boltDBIndexClient)
Expand Down Expand Up @@ -176,13 +178,44 @@ func TestTable_Upload(t *testing.T) {
}

func compareTableWithStorage(t *testing.T, table *Table, storageDir string) {
// use a temp dir for decompressing the files before comparison.
tempDir, err := ioutil.TempDir("", "compare-table")
require.NoError(t, err)

defer func() {
require.NoError(t, os.RemoveAll(tempDir))
}()

for name, db := range table.dbs {
objectKey := table.buildObjectKey(name)
storageDB, err := local.OpenBoltdbFile(filepath.Join(storageDir, objectKey))

// open compressed file from storage
compressedFile, err := os.Open(filepath.Join(storageDir, objectKey))
require.NoError(t, err)

// get a compressed reader
compressedReader, err := gzip.NewReader(compressedFile)
require.NoError(t, err)

// create a temp file for writing decompressed file
decompressedFilePath := filepath.Join(tempDir, filepath.Base(objectKey))
decompressedFile, err := os.Create(decompressedFilePath)
require.NoError(t, err)

// do the decompression
_, err = io.Copy(decompressedFile, compressedReader)
require.NoError(t, err)

// close the references
require.NoError(t, compressedFile.Close())
require.NoError(t, decompressedFile.Close())

storageDB, err := local.OpenBoltdbFile(decompressedFilePath)
require.NoError(t, err)

testutil.CompareDBs(t, db, storageDB)
require.NoError(t, storageDB.Close())
require.NoError(t, os.Remove(decompressedFilePath))
}
}

Expand Down