Skip to content

Commit

Permalink
chore(logging): Add entry's timestamp when rejected with `too far beh…
Browse files Browse the repository at this point in the history
…ind` (#12933)

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
  • Loading branch information
kavirajk authored May 13, 2024
1 parent 5ada92b commit b05c4f7
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 6 deletions.
10 changes: 7 additions & 3 deletions pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ var (
)

type errTooFarBehind struct {
// original timestmap of the entry itself.
entryTs time.Time

// cutoff is the oldest acceptable timstamp of the `stream` that entry belongs to.
cutoff time.Time
}

Expand All @@ -32,12 +36,12 @@ func IsErrTooFarBehind(err error) bool {
return ok
}

func ErrTooFarBehind(cutoff time.Time) error {
return &errTooFarBehind{cutoff: cutoff}
func ErrTooFarBehind(entryTs, cutoff time.Time) error {
return &errTooFarBehind{entryTs: entryTs, cutoff: cutoff}
}

func (m *errTooFarBehind) Error() string {
return "entry too far behind, oldest acceptable timestamp is: " + m.cutoff.Format(time.RFC3339)
return fmt.Sprintf("entry too far behind, entry timestamp is: %s, oldest acceptable timestamp is: %s", m.entryTs.Format(time.RFC3339), m.cutoff.Format(time.RFC3339))
}

func IsOutOfOrderErr(err error) bool {
Expand Down
4 changes: 3 additions & 1 deletion pkg/chunkenc/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ func TestParseEncoding(t *testing.T) {
}

func TestIsOutOfOrderErr(t *testing.T) {
for _, err := range []error{ErrOutOfOrder, ErrTooFarBehind(time.Now())} {
now := time.Now()

for _, err := range []error{ErrOutOfOrder, ErrTooFarBehind(now, now)} {
require.Equal(t, true, IsOutOfOrderErr(err))
}
}
2 changes: 1 addition & 1 deletion pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (s *stream) validateEntries(entries []logproto.Entry, isReplay, rateLimitWh
// The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age.
cutoff := highestTs.Add(-s.cfg.MaxChunkAge / 2)
if !isReplay && s.unorderedWrites && !highestTs.IsZero() && cutoff.After(entries[i].Timestamp) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind(cutoff)})
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind(entries[i].Timestamp, cutoff)})
s.writeFailures.Log(s.tenant, fmt.Errorf("%w for stream %s", failedEntriesWithError[len(failedEntriesWithError)-1].e, s.labels))
outOfOrderSamples++
outOfOrderBytes += lineBytes
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
var expected bytes.Buffer
for i := 0; i < tc.expectErrs; i++ {
fmt.Fprintf(&expected,
"entry with timestamp %s ignored, reason: 'entry too far behind, oldest acceptable timestamp is: %s',\n",
"entry with timestamp %s ignored, reason: 'entry too far behind, entry timestamp is: %s, oldest acceptable timestamp is: %s',\n",
time.Unix(int64(i), 0).String(),
newLines[i].Timestamp.Format(time.RFC3339),
time.Unix(int64(numLogs), 0).Format(time.RFC3339),
)
}
Expand Down

0 comments on commit b05c4f7

Please sign in to comment.