Skip to content

Commit

Permalink
Log error message for invalid checksum (#1713)
Browse files Browse the repository at this point in the history
  • Loading branch information
adityacs authored Apr 24, 2020
1 parent 2374d48 commit c0a28cb
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var (
ErrOutOfOrder = errors.New("entry out of order")
ErrInvalidSize = errors.New("invalid size")
ErrInvalidFlag = errors.New("invalid flag")
ErrInvalidChecksum = errors.New("invalid checksum")
ErrInvalidChecksum = errors.New("invalid chunk checksum")
)

// Encoding is the identifier for a chunk encoding.
Expand Down
1 change: 1 addition & 0 deletions pkg/chunkenc/lazy_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
// LazyChunk loads the chunk when it is accessed.
type LazyChunk struct {
Chunk chunk.Chunk
IsValid bool
Fetcher *chunk.Fetcher
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"io"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"

"github.com/grafana/loki/pkg/iter"
Expand Down Expand Up @@ -226,7 +228,8 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) {
// Verify checksums.
expCRC := binary.BigEndian.Uint32(b[blk.offset+l:])
if expCRC != crc32.Checksum(blk.b, castagnoliTable) {
return bc, ErrInvalidChecksum
level.Error(util.Logger).Log("msg", "Checksum does not match for a block in chunk, this block will be skipped", "err", ErrInvalidChecksum)
continue
}

bc.blocks = append(bc.blocks, blk)
Expand Down
35 changes: 31 additions & 4 deletions pkg/storage/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
Expand Down Expand Up @@ -199,7 +201,7 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
// │ # 47 │
// └──────────────┘
// ┌──────────────────────────┐
// │ # 48
// │ # 48 |
// └──────────────────────────┘
// ┌──────────────┐
// │ # 49 │
Expand Down Expand Up @@ -328,10 +330,12 @@ func buildHeapIterator(ctx context.Context, chks [][]*chunkenc.LazyChunk, filter

// __name__ is only used for upstream compatibility and is hardcoded within loki. Strip it from the return label set.
labels := dropLabels(chks[0][0].Chunk.Metric, labels.MetricName).String()

for i := range chks {
iterators := make([]iter.EntryIterator, 0, len(chks[i]))
for j := range chks[i] {
if !chks[i][j].IsValid {
continue
}
iterator, err := chks[i][j].Iterator(ctx, from, through, direction, filter)
if err != nil {
return nil, err
Expand Down Expand Up @@ -388,7 +392,6 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error {
errChan := make(chan error)
for fetcher, chunks := range chksByFetcher {
go func(fetcher *chunk.Fetcher, chunks []*chunkenc.LazyChunk) {

keys := make([]string, 0, len(chunks))
chks := make([]chunk.Chunk, 0, len(chunks))
index := make(map[string]*chunkenc.LazyChunk, len(chunks))
Expand All @@ -403,8 +406,14 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error {
}
chks, err := fetcher.FetchChunks(ctx, chks, keys)
if err != nil {
if isInvalidChunkError(err) {
level.Error(util.Logger).Log("msg", "checksum of chunks does not match", "err", chunk.ErrInvalidChecksum)
errChan <- nil
return
}
errChan <- err
return

}
// assign fetched chunk by key as FetchChunks doesn't guarantee the order.
for _, chk := range chks {
Expand All @@ -421,7 +430,25 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error {
lastErr = err
}
}
return lastErr

if lastErr != nil {
return lastErr
}

for _, c := range chunks {
if c.Chunk.Data != nil {
c.IsValid = true
}
}
return nil
}

func isInvalidChunkError(err error) bool {
err = errors.Cause(err)
if err, ok := err.(promql.ErrStorage); ok {
return err.Err == chunk.ErrInvalidChecksum || err.Err == chunkenc.ErrInvalidChecksum
}
return false
}

func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]*chunkenc.LazyChunk) error {
Expand Down
161 changes: 161 additions & 0 deletions pkg/storage/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
Expand Down Expand Up @@ -639,6 +643,130 @@ func TestPartitionOverlappingchunks(t *testing.T) {
}
}

func TestBuildHeapIterator(t *testing.T) {
var (
firstChunk = newLazyChunk(logproto.Stream{
Labels: "{foo=\"bar\"}",
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
{
Timestamp: from.Add(time.Millisecond),
Line: "2",
},
{
Timestamp: from.Add(2 * time.Millisecond),
Line: "3",
},
},
})
secondChunk = newLazyInvalidChunk(logproto.Stream{
Labels: "{foo=\"bar\"}",
Entries: []logproto.Entry{
{
Timestamp: from.Add(3 * time.Millisecond),
Line: "4",
},
{
Timestamp: from.Add(4 * time.Millisecond),
Line: "5",
},
},
})
thirdChunk = newLazyChunk(logproto.Stream{
Labels: "{foo=\"bar\"}",
Entries: []logproto.Entry{
{
Timestamp: from.Add(5 * time.Millisecond),
Line: "6",
},
},
})
)

for i, tc := range []struct {
input [][]*chunkenc.LazyChunk
expected []*logproto.Stream
}{
{
[][]*chunkenc.LazyChunk{
{firstChunk},
{thirdChunk},
},
[]*logproto.Stream{
{
Labels: "{foo=\"bar\"}",
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
{
Timestamp: from.Add(time.Millisecond),
Line: "2",
},
{
Timestamp: from.Add(2 * time.Millisecond),
Line: "3",
},
{
Timestamp: from.Add(5 * time.Millisecond),
Line: "6",
},
},
},
},
},
{
[][]*chunkenc.LazyChunk{
{secondChunk},
{firstChunk, thirdChunk},
},
[]*logproto.Stream{
{
Labels: "{foo=\"bar\"}",
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
{
Timestamp: from.Add(time.Millisecond),
Line: "2",
},
{
Timestamp: from.Add(2 * time.Millisecond),
Line: "3",
},
{
Timestamp: from.Add(5 * time.Millisecond),
Line: "6",
},
},
},
},
},
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
ctx = user.InjectOrgID(context.Background(), "test-user")
it, err := buildHeapIterator(ctx, tc.input, nil, logproto.FORWARD, from, from.Add(6*time.Millisecond))
if err != nil {
t.Errorf("buildHeapIterator error = %v", err)
return
}
req := newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), logproto.FORWARD)
streams, _, err := iter.ReadBatch(it, req.Limit)
_ = it.Close()
if err != nil {
t.Fatalf("error reading batch %s", err)
}
assertStream(t, tc.expected, streams.Streams)
})
}
}

func TestDropLabels(t *testing.T) {

for i, tc := range []struct {
Expand Down Expand Up @@ -680,3 +808,36 @@ func TestDropLabels(t *testing.T) {
})
}
}

func Test_IsInvalidChunkError(t *testing.T) {
tests := []struct {
name string
err error
expectedResult bool
}{
{
"invalid chunk cheksum error from cortex",
promql.ErrStorage{Err: chunk.ErrInvalidChecksum},
true,
},
{
"invalid chunk cheksum error from loki",
promql.ErrStorage{Err: chunkenc.ErrInvalidChecksum},
true,
},
{
"cache error",
promql.ErrStorage{Err: errors.New("error fetching from cache")},
false,
},
{
"no error from cortex or loki",
nil,
false,
},
}
for _, tc := range tests {
result := isInvalidChunkError(tc.err)
require.Equal(t, tc.expectedResult, result)
}
}
2 changes: 2 additions & 0 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,14 @@ func Test_store_LazyQuery(t *testing.T) {
MaxChunkBatchSize: 10,
},
}

ctx = user.InjectOrgID(context.Background(), "test-user")
it, err := s.LazyQuery(ctx, logql.SelectParams{QueryRequest: tt.req})
if err != nil {
t.Errorf("store.LazyQuery() error = %v", err)
return
}

streams, _, err := iter.ReadBatch(it, tt.req.Limit)
_ = it.Close()
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ func assertStream(t *testing.T, expected, actual []*logproto.Stream) {
func newLazyChunk(stream logproto.Stream) *chunkenc.LazyChunk {
return &chunkenc.LazyChunk{
Fetcher: nil,
IsValid: true,
Chunk: newChunk(stream),
}
}

func newLazyInvalidChunk(stream logproto.Stream) *chunkenc.LazyChunk {
return &chunkenc.LazyChunk{
Fetcher: nil,
IsValid: false,
Chunk: newChunk(stream),
}
}
Expand Down Expand Up @@ -119,6 +128,7 @@ func newMockChunkStore(streams []*logproto.Stream) *mockChunkStore {
}
return &mockChunkStore{chunks: chunks, client: &mockChunkStoreClient{chunks: chunks}}
}

func (m *mockChunkStore) Put(ctx context.Context, chunks []chunk.Chunk) error { return nil }
func (m *mockChunkStore) PutOne(ctx context.Context, from, through model.Time, chunk chunk.Chunk) error {
return nil
Expand Down

0 comments on commit c0a28cb

Please sign in to comment.