Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log error message for invalid checksum #1713

Merged
merged 1 commit into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var (
ErrOutOfOrder = errors.New("entry out of order")
ErrInvalidSize = errors.New("invalid size")
ErrInvalidFlag = errors.New("invalid flag")
ErrInvalidChecksum = errors.New("invalid checksum")
ErrInvalidChecksum = errors.New("invalid chunk checksum")
)

// Encoding is the identifier for a chunk encoding.
Expand Down
1 change: 1 addition & 0 deletions pkg/chunkenc/lazy_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
// LazyChunk loads the chunk when it is accessed.
type LazyChunk struct {
Chunk chunk.Chunk
IsValid bool
Fetcher *chunk.Fetcher
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"io"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"

"github.com/grafana/loki/pkg/iter"
Expand Down Expand Up @@ -226,7 +228,8 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) {
// Verify checksums.
expCRC := binary.BigEndian.Uint32(b[blk.offset+l:])
if expCRC != crc32.Checksum(blk.b, castagnoliTable) {
return bc, ErrInvalidChecksum
level.Error(util.Logger).Log("msg", "Checksum does not match for a block in chunk, this block will be skipped", "err", ErrInvalidChecksum)
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
continue
}

bc.blocks = append(bc.blocks, blk)
Expand Down
35 changes: 31 additions & 4 deletions pkg/storage/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
Expand Down Expand Up @@ -199,7 +201,7 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
// │ # 47 │
// └──────────────┘
// ┌──────────────────────────┐
// │ # 48
// │ # 48 |
// └──────────────────────────┘
// ┌──────────────┐
// │ # 49 │
Expand Down Expand Up @@ -328,10 +330,12 @@ func buildHeapIterator(ctx context.Context, chks [][]*chunkenc.LazyChunk, filter

// __name__ is only used for upstream compatibility and is hardcoded within loki. Strip it from the return label set.
labels := dropLabels(chks[0][0].Chunk.Metric, labels.MetricName).String()

for i := range chks {
iterators := make([]iter.EntryIterator, 0, len(chks[i]))
for j := range chks[i] {
if !chks[i][j].IsValid {
continue
}
iterator, err := chks[i][j].Iterator(ctx, from, through, direction, filter)
if err != nil {
return nil, err
Expand Down Expand Up @@ -388,7 +392,6 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error {
errChan := make(chan error)
for fetcher, chunks := range chksByFetcher {
go func(fetcher *chunk.Fetcher, chunks []*chunkenc.LazyChunk) {

keys := make([]string, 0, len(chunks))
chks := make([]chunk.Chunk, 0, len(chunks))
index := make(map[string]*chunkenc.LazyChunk, len(chunks))
Expand All @@ -403,8 +406,14 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error {
}
chks, err := fetcher.FetchChunks(ctx, chks, keys)
if err != nil {
if isInvalidChunkError(err) {
level.Error(util.Logger).Log("msg", "checksum of chunks does not match", "err", chunk.ErrInvalidChecksum)
errChan <- nil
return
}
errChan <- err
return

}
// assign fetched chunk by key as FetchChunks doesn't guarantee the order.
for _, chk := range chks {
Expand All @@ -421,7 +430,25 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error {
lastErr = err
}
}
return lastErr

if lastErr != nil {
return lastErr
}

for _, c := range chunks {
if c.Chunk.Data != nil {
c.IsValid = true
}
}
return nil
}

func isInvalidChunkError(err error) bool {
err = errors.Cause(err)
if err, ok := err.(promql.ErrStorage); ok {
return err.Err == chunk.ErrInvalidChecksum || err.Err == chunkenc.ErrInvalidChecksum
}
return false
}

func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]*chunkenc.LazyChunk) error {
Expand Down
161 changes: 161 additions & 0 deletions pkg/storage/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
Expand Down Expand Up @@ -639,6 +643,130 @@ func TestPartitionOverlappingchunks(t *testing.T) {
}
}

func TestBuildHeapIterator(t *testing.T) {
var (
firstChunk = newLazyChunk(logproto.Stream{
Labels: "{foo=\"bar\"}",
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
{
Timestamp: from.Add(time.Millisecond),
Line: "2",
},
{
Timestamp: from.Add(2 * time.Millisecond),
Line: "3",
},
},
})
secondChunk = newLazyInvalidChunk(logproto.Stream{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
secondChunk = newLazyInvalidChunk(logproto.Stream{
invalid = newLazyInvalidChunk(logproto.Stream{

Labels: "{foo=\"bar\"}",
Entries: []logproto.Entry{
{
Timestamp: from.Add(3 * time.Millisecond),
Line: "4",
},
{
Timestamp: from.Add(4 * time.Millisecond),
Line: "5",
},
},
})
thirdChunk = newLazyChunk(logproto.Stream{
Labels: "{foo=\"bar\"}",
Entries: []logproto.Entry{
{
Timestamp: from.Add(5 * time.Millisecond),
Line: "6",
},
},
})
)

for i, tc := range []struct {
input [][]*chunkenc.LazyChunk
expected []*logproto.Stream
}{
{
[][]*chunkenc.LazyChunk{
{firstChunk},
{thirdChunk},
},
[]*logproto.Stream{
{
Labels: "{foo=\"bar\"}",
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
{
Timestamp: from.Add(time.Millisecond),
Line: "2",
},
{
Timestamp: from.Add(2 * time.Millisecond),
Line: "3",
},
{
Timestamp: from.Add(5 * time.Millisecond),
Line: "6",
},
},
},
},
},
{
[][]*chunkenc.LazyChunk{
{secondChunk},
{firstChunk, thirdChunk},
},
[]*logproto.Stream{
{
Labels: "{foo=\"bar\"}",
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
{
Timestamp: from.Add(time.Millisecond),
Line: "2",
},
{
Timestamp: from.Add(2 * time.Millisecond),
Line: "3",
},
{
Timestamp: from.Add(5 * time.Millisecond),
Line: "6",
},
},
},
},
},
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
ctx = user.InjectOrgID(context.Background(), "test-user")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See I think this test is sufficient. Remove any changes from the other tests in pkg/storage/store_test.go and other changes for that tests, let's keep only this one. And rename the chunk variable to invalidChunk.

it, err := buildHeapIterator(ctx, tc.input, nil, logproto.FORWARD, from, from.Add(6*time.Millisecond))
if err != nil {
t.Errorf("buildHeapIterator error = %v", err)
return
}
req := newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), logproto.FORWARD)
streams, _, err := iter.ReadBatch(it, req.Limit)
_ = it.Close()
if err != nil {
t.Fatalf("error reading batch %s", err)
}
assertStream(t, tc.expected, streams.Streams)
})
}
}

func TestDropLabels(t *testing.T) {

for i, tc := range []struct {
Expand Down Expand Up @@ -680,3 +808,36 @@ func TestDropLabels(t *testing.T) {
})
}
}

func Test_IsInvalidChunkError(t *testing.T) {
tests := []struct {
name string
err error
expectedResult bool
}{
{
"invalid chunk cheksum error from cortex",
promql.ErrStorage{Err: chunk.ErrInvalidChecksum},
true,
},
{
"invalid chunk cheksum error from loki",
promql.ErrStorage{Err: chunkenc.ErrInvalidChecksum},
true,
},
{
"cache error",
promql.ErrStorage{Err: errors.New("error fetching from cache")},
false,
},
{
"no error from cortex or loki",
nil,
false,
},
}
for _, tc := range tests {
result := isInvalidChunkError(tc.err)
require.Equal(t, tc.expectedResult, result)
}
}
2 changes: 2 additions & 0 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,14 @@ func Test_store_LazyQuery(t *testing.T) {
MaxChunkBatchSize: 10,
},
}

ctx = user.InjectOrgID(context.Background(), "test-user")
it, err := s.LazyQuery(ctx, logql.SelectParams{QueryRequest: tt.req})
if err != nil {
t.Errorf("store.LazyQuery() error = %v", err)
return
}

streams, _, err := iter.ReadBatch(it, tt.req.Limit)
_ = it.Close()
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ func assertStream(t *testing.T, expected, actual []*logproto.Stream) {
func newLazyChunk(stream logproto.Stream) *chunkenc.LazyChunk {
return &chunkenc.LazyChunk{
Fetcher: nil,
IsValid: true,
Chunk: newChunk(stream),
}
}

func newLazyInvalidChunk(stream logproto.Stream) *chunkenc.LazyChunk {
return &chunkenc.LazyChunk{
Fetcher: nil,
IsValid: false,
Chunk: newChunk(stream),
}
}
Expand Down Expand Up @@ -119,6 +128,7 @@ func newMockChunkStore(streams []*logproto.Stream) *mockChunkStore {
}
return &mockChunkStore{chunks: chunks, client: &mockChunkStoreClient{chunks: chunks}}
}

func (m *mockChunkStore) Put(ctx context.Context, chunks []chunk.Chunk) error { return nil }
func (m *mockChunkStore) PutOne(ctx context.Context, from, through model.Time, chunk chunk.Chunk) error {
return nil
Expand Down