From b05c4f728839f0de1b8386394c99d54c5fc03cc1 Mon Sep 17 00:00:00 2001 From: Kaviraj Kanagaraj Date: Mon, 13 May 2024 08:41:17 +0200 Subject: [PATCH] chore(logging): Add entry's timestamp when rejected with `too far behind` (#12933) Signed-off-by: Kaviraj --- pkg/chunkenc/interface.go | 10 +++++++--- pkg/chunkenc/interface_test.go | 4 +++- pkg/ingester/stream.go | 2 +- pkg/ingester/stream_test.go | 3 ++- 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index b96d9f705d09..8d6f5e1e8dd6 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -24,6 +24,10 @@ var ( ) type errTooFarBehind struct { + // original timestmap of the entry itself. + entryTs time.Time + + // cutoff is the oldest acceptable timstamp of the `stream` that entry belongs to. cutoff time.Time } @@ -32,12 +36,12 @@ func IsErrTooFarBehind(err error) bool { return ok } -func ErrTooFarBehind(cutoff time.Time) error { - return &errTooFarBehind{cutoff: cutoff} +func ErrTooFarBehind(entryTs, cutoff time.Time) error { + return &errTooFarBehind{entryTs: entryTs, cutoff: cutoff} } func (m *errTooFarBehind) Error() string { - return "entry too far behind, oldest acceptable timestamp is: " + m.cutoff.Format(time.RFC3339) + return fmt.Sprintf("entry too far behind, entry timestamp is: %s, oldest acceptable timestamp is: %s", m.entryTs.Format(time.RFC3339), m.cutoff.Format(time.RFC3339)) } func IsOutOfOrderErr(err error) bool { diff --git a/pkg/chunkenc/interface_test.go b/pkg/chunkenc/interface_test.go index daea36cb38e7..ed81c4d3604e 100644 --- a/pkg/chunkenc/interface_test.go +++ b/pkg/chunkenc/interface_test.go @@ -31,7 +31,9 @@ func TestParseEncoding(t *testing.T) { } func TestIsOutOfOrderErr(t *testing.T) { - for _, err := range []error{ErrOutOfOrder, ErrTooFarBehind(time.Now())} { + now := time.Now() + + for _, err := range []error{ErrOutOfOrder, ErrTooFarBehind(now, now)} { require.Equal(t, true, IsOutOfOrderErr(err)) } } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index d7a29b73e802..6bf75dfa1ac5 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -394,7 +394,7 @@ func (s *stream) validateEntries(entries []logproto.Entry, isReplay, rateLimitWh // The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age. cutoff := highestTs.Add(-s.cfg.MaxChunkAge / 2) if !isReplay && s.unorderedWrites && !highestTs.IsZero() && cutoff.After(entries[i].Timestamp) { - failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind(cutoff)}) + failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind(entries[i].Timestamp, cutoff)}) s.writeFailures.Log(s.tenant, fmt.Errorf("%w for stream %s", failedEntriesWithError[len(failedEntriesWithError)-1].e, s.labels)) outOfOrderSamples++ outOfOrderBytes += lineBytes diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 26eef4e3a793..af877bf88da9 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -84,8 +84,9 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { var expected bytes.Buffer for i := 0; i < tc.expectErrs; i++ { fmt.Fprintf(&expected, - "entry with timestamp %s ignored, reason: 'entry too far behind, oldest acceptable timestamp is: %s',\n", + "entry with timestamp %s ignored, reason: 'entry too far behind, entry timestamp is: %s, oldest acceptable timestamp is: %s',\n", time.Unix(int64(i), 0).String(), + newLines[i].Timestamp.Format(time.RFC3339), time.Unix(int64(numLogs), 0).Format(time.RFC3339), ) }