Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed Downsampling process; Fixed runutil.CloseAndCaptureErr #1070

Merged
merged 2 commits into from
Apr 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ We use *breaking* word for marking changes that are not backward compatible (rel

## Unreleased


### Fixed

- [#1070](https://github.com/improbable-eng/thanos/pull/1070) Downsampling works back again. Deferred closer errors are now properly captured.


## [v0.4.0-rc.0](https://github.com/improbable-eng/thanos/releases/tag/v0.4.0-rc.0) - 2019.04.18

:warning: **IMPORTANT** :warning: This is the last release that supports gossip. From Thanos v0.5.0, gossip will be completely removed.
Expand Down
12 changes: 6 additions & 6 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func GatherIndexIssueStats(logger log.Logger, fn string, minTime int64, maxTime
if err != nil {
return stats, errors.Wrap(err, "open index file")
}
defer runutil.CloseWithErrCapture(logger, &err, r, "gather index issue file reader")
defer runutil.CloseWithErrCapture(&err, r, "gather index issue file reader")

p, err := r.Postings(index.AllPostingsKey())
if err != nil {
Expand Down Expand Up @@ -460,33 +460,33 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT
if err != nil {
return resid, errors.Wrap(err, "open block")
}
defer runutil.CloseWithErrCapture(logger, &err, b, "repair block reader")
defer runutil.CloseWithErrCapture(&err, b, "repair block reader")

indexr, err := b.Index()
if err != nil {
return resid, errors.Wrap(err, "open index")
}
defer runutil.CloseWithErrCapture(logger, &err, indexr, "repair index reader")
defer runutil.CloseWithErrCapture(&err, indexr, "repair index reader")

chunkr, err := b.Chunks()
if err != nil {
return resid, errors.Wrap(err, "open chunks")
}
defer runutil.CloseWithErrCapture(logger, &err, chunkr, "repair chunk reader")
defer runutil.CloseWithErrCapture(&err, chunkr, "repair chunk reader")

resdir := filepath.Join(dir, resid.String())

chunkw, err := chunks.NewWriter(filepath.Join(resdir, ChunksDirname))
if err != nil {
return resid, errors.Wrap(err, "open chunk writer")
}
defer runutil.CloseWithErrCapture(logger, &err, chunkw, "repair chunk writer")
defer runutil.CloseWithErrCapture(&err, chunkw, "repair chunk writer")

indexw, err := index.NewWriter(filepath.Join(resdir, IndexFilename))
if err != nil {
return resid, errors.Wrap(err, "open index writer")
}
defer runutil.CloseWithErrCapture(logger, &err, indexw, "repair index writer")
defer runutil.CloseWithErrCapture(&err, indexw, "repair index writer")

// TODO(fabxc): adapt so we properly handle the version once we update to an upstream
// that has multiple.
Expand Down
7 changes: 4 additions & 3 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ func Downsample(
if err != nil {
return id, errors.Wrap(err, "open index reader")
}
defer runutil.CloseWithErrCapture(logger, &err, indexr, "downsample index reader")
defer runutil.CloseWithErrCapture(&err, indexr, "downsample index reader")

chunkr, err := b.Chunks()
if err != nil {
return id, errors.Wrap(err, "open chunk reader")
}
defer runutil.CloseWithErrCapture(logger, &err, chunkr, "downsample chunk reader")
defer runutil.CloseWithErrCapture(&err, chunkr, "downsample chunk reader")

// Generate new block id.
uid := ulid.MustNew(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano())))
Expand Down Expand Up @@ -81,12 +81,13 @@ func Downsample(
if err != nil {
return id, errors.Wrap(err, "get streamed block writer")
}
defer runutil.CloseWithErrCapture(logger, &err, streamedBlockWriter, "close stream block writer")
defer runutil.CloseWithErrCapture(&err, streamedBlockWriter, "close stream block writer")

postings, err := indexr.Postings(index.AllPostingsKey())
if err != nil {
return id, errors.Wrap(err, "get all postings list")
}

var (
aggrChunks []*AggrChunk
all []sample
Expand Down
3 changes: 3 additions & 0 deletions pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ func testDownsample(t *testing.T, data []*downsampleTestSet, meta *metadata.Meta
id, err := Downsample(log.NewNopLogger(), meta, mb, dir, resolution)
testutil.Ok(t, err)

_, err = metadata.Read(filepath.Join(dir, id.String()))
testutil.Ok(t, err)

exp := map[uint64]map[AggrType][]sample{}
got := map[uint64]map[AggrType][]sample{}

Expand Down
52 changes: 25 additions & 27 deletions pkg/compact/downsample/streamed_block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,26 +167,20 @@ func (w *streamedBlockWriter) Close() error {
if w.finalized {
return nil
}

var merr tsdb.MultiError
w.finalized = true

// Finalise data block only if there wasn't any internal errors.
if !w.ignoreFinalize {
merr.Add(w.finalize())
}
merr := tsdb.MultiError{}

for _, cl := range w.closers {
merr.Add(cl.Close())
if w.ignoreFinalize {
// Close open file descriptors anyway.
for _, cl := range w.closers {
merr.Add(cl.Close())
}
return merr.Err()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forgot to wrap an error

}

return errors.Wrap(merr.Err(), "close closers")
}
// Finalize saves prepared index and metadata to corresponding files.

// finalize saves prepared index and meta data to corresponding files.
// It is called on Close. Even if an error happened outside of StreamWriter, it will finalize the block anyway,
// so it's a caller's responsibility to remove the block's directory.
func (w *streamedBlockWriter) finalize() error {
if err := w.writeLabelSets(); err != nil {
return errors.Wrap(err, "write label sets")
}
Expand All @@ -195,7 +189,15 @@ func (w *streamedBlockWriter) finalize() error {
return errors.Wrap(err, "write mem postings")
}

if err := w.writeIndexCache(); err != nil {
for _, cl := range w.closers {
merr.Add(cl.Close())
}

if err := block.WriteIndexCache(
w.logger,
filepath.Join(w.blockDir, block.IndexFilename),
filepath.Join(w.blockDir, block.IndexCacheFilename),
); err != nil {
return errors.Wrap(err, "write index cache")
}

Expand All @@ -207,8 +209,14 @@ func (w *streamedBlockWriter) finalize() error {
return errors.Wrap(err, "sync blockDir")
}

if err := merr.Err(); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to put it before 196 line to avoid wasting time/resources on writing index cache

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Do you have time for small PR on this? (:

return errors.Wrap(err, "finalize")
}

// No error, claim success.

level.Info(w.logger).Log(
"msg", "write downsampled block",
"msg", "finalized downsampled block",
"mint", w.meta.MinTime,
"maxt", w.meta.MaxTime,
"ulid", w.meta.ULID,
Expand All @@ -224,7 +232,7 @@ func (w *streamedBlockWriter) syncDir() (err error) {
return errors.Wrap(err, "open temporary block blockDir")
}

defer runutil.CloseWithErrCapture(w.logger, &err, df, "close temporary block blockDir")
defer runutil.CloseWithErrCapture(&err, df, "close temporary block blockDir")

if err := fileutil.Fsync(df); err != nil {
return errors.Wrap(err, "sync temporary blockDir")
Expand Down Expand Up @@ -257,16 +265,6 @@ func (w *streamedBlockWriter) writeMemPostings() error {
return nil
}

func (w *streamedBlockWriter) writeIndexCache() error {
indexFile := filepath.Join(w.blockDir, block.IndexFilename)
indexCacheFile := filepath.Join(w.blockDir, block.IndexCacheFilename)
if err := block.WriteIndexCache(w.logger, indexFile, indexCacheFile); err != nil {
return errors.Wrap(err, "write index cache")
}

return nil
}

// writeMetaFile writes meta file.
func (w *streamedBlockWriter) writeMetaFile() error {
w.meta.Version = metadata.MetaVersion1
Expand Down
30 changes: 9 additions & 21 deletions pkg/runutil/runutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
// For capturing error, use CloseWithErrCapture:
//
// var err error
// defer runutil.CloseWithErrCapture(logger, &err, closer, "log format message")
// defer runutil.CloseWithErrCapture(&err, closer, "log format message")
//
// // ...
//
Expand All @@ -49,6 +49,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/tsdb"
)

// Repeat executes f every interval seconds until stopc is closed.
Expand Down Expand Up @@ -107,26 +108,13 @@ func CloseWithLogOnErr(logger log.Logger, closer io.Closer, format string, a ...
level.Warn(logger).Log("msg", "detected close error", "err", errors.Wrap(err, fmt.Sprintf(format, a...)))
}

// CloseWithErrCapture runs function and on error tries to return error by argument.
// If error is already there we assume that error has higher priority and we just log the function error.
func CloseWithErrCapture(logger log.Logger, err *error, closer io.Closer, format string, a ...interface{}) {
closeErr := closer.Close()
if closeErr == nil {
return
}

if *err == nil {
err = &closeErr
return
}
// CloseWithErrCapture runs function and on error return error by argument including the given error (usually
// from caller function).
func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...interface{}) {
merr := tsdb.MultiError{}

// There is already an error, let's log this one.
if logger == nil {
logger = log.NewLogfmtLogger(os.Stderr)
}
merr.Add(*err)
merr.Add(errors.Wrapf(closer.Close(), format, a...))

level.Warn(logger).Log(
"msg", "detected best effort close error that was preempted from the more important one",
"err", errors.Wrap(closeErr, fmt.Sprintf(format, a...)),
)
*err = merr.Err()
}
70 changes: 70 additions & 0 deletions pkg/runutil/runutil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package runutil

import (
"github.com/pkg/errors"
"io"
"testing"
)

type testCloser struct {
err error
}

func (c testCloser) Close() error {
return c.err
}

func TestCloseWithErrCapture(t *testing.T) {
for _, tcase := range []struct{
err error
closer io.Closer

expectedErrStr string
}{
{
err: nil,
closer: testCloser{err:nil},
expectedErrStr: "",
},
{
err: errors.New("test"),
closer: testCloser{err:nil},
expectedErrStr: "test",
},
{
err: nil,
closer: testCloser{err:errors.New("test")},
expectedErrStr: "close: test",
},
{
err: errors.New("test"),
closer: testCloser{err:errors.New("test")},
expectedErrStr: "2 errors: test; close: test",
},
}{
if ok := t.Run("", func(t *testing.T) {
ret := tcase.err
CloseWithErrCapture(&ret, tcase.closer, "close")

if tcase.expectedErrStr == "" {
if ret != nil {
t.Error("Expected error to be nil")
t.Fail()
}
} else {
if ret == nil {
t.Error("Expected error to be not nil")
t.Fail()
}

if tcase.expectedErrStr != ret.Error() {
t.Errorf("%s != %s", tcase.expectedErrStr, ret.Error())
t.Fail()
}
}

}); !ok {
return
}
}
}
4 changes: 2 additions & 2 deletions pkg/testutil/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (p *Prometheus) SetConfig(s string) (err error) {
if err != nil {
return err
}
defer runutil.CloseWithErrCapture(nil, &err, f, "prometheus config")
defer runutil.CloseWithErrCapture(&err, f, "prometheus config")

_, err = f.Write([]byte(s))
return err
Expand Down Expand Up @@ -302,7 +302,7 @@ func createBlock(
if err != nil {
return id, errors.Wrap(err, "create head block")
}
defer runutil.CloseWithErrCapture(log.NewNopLogger(), &err, h, "TSDB Head")
defer runutil.CloseWithErrCapture(&err, h, "TSDB Head")

var g errgroup.Group
var timeStepSize = (maxt - mint) / int64(numSamples+1)
Expand Down