Skip to content

Commit

Permalink
Add an experimental flag to block samples with timestamp too far in t…
Browse files Browse the repository at this point in the history
…he future (thanos-io#6195)

* Add an experimental flag to block samples with timestamp too far in the future

Signed-off-by: Yi Jin <yi.jin@databricks.com>

* fix bug

Signed-off-by: Yi Jin <yi.jin@databricks.com>

* address comments

Signed-off-by: Yi Jin <yi.jin@databricks.com>

* fix docs CI errors

Signed-off-by: Yi Jin <yi.jin@databricks.com>

* resolve merge conflicts

Signed-off-by: Yi Jin <yi.jin@databricks.com>

* resolve merge conflicts

Signed-off-by: Yi Jin <yi.jin@databricks.com>

* retrigger checks

Signed-off-by: Yi Jin <yi.jin@databricks.com>

---------

Signed-off-by: Yi Jin <yi.jin@databricks.com>
  • Loading branch information
jnyi authored and HC Zhu committed Jun 27, 2023
1 parent bbcc23d commit 04b8e22
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 2 deletions.
4 changes: 2 additions & 2 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,8 +880,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

rc.tsdbTooFarInFutureTimeWindow = extkingpin.ModelDuration(cmd.Flag("tsdb.too-far-in-future.time-window",
"[EXPERIMENTAL] Configures the allowed time window for ingesting samples too far in the future. Disabled (0s) by default"+
"Please note enable this flag will reject samples in the future of receive local NTP time + configured duration.",
).Default("0s").Hidden())
"Please note enable this flag will reject samples in the future of receive local NTP time + configured duration due to clock skew in remote write clients.",
).Default("0s"))

rc.tsdbOutOfOrderTimeWindow = extkingpin.ModelDuration(cmd.Flag("tsdb.out-of-order.time-window",
"[EXPERIMENTAL] Configures the allowed time window for ingestion of out-of-order samples. Disabled (0s) by default"+
Expand Down
8 changes: 8 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,14 @@ Flags:
refer to the Tenant lifecycle management
section in the Receive documentation:
https://thanos.io/tip/components/receive.md/#tenant-lifecycle-management
--tsdb.too-far-in-future.time-window=0s
[EXPERIMENTAL] Configures the allowed time
window for ingesting samples too far in the
future. Disabled (0s) by defaultPlease note
enable this flag will reject samples in the
future of receive local NTP time + configured
duration due to clock skew in remote write
clients.
--tsdb.wal-compression Compress the tsdb WAL.
--version Show application version.
Expand Down
29 changes: 29 additions & 0 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"

Expand All @@ -28,6 +29,26 @@ type TenantStorage interface {
TenantAppendable(string) (Appendable, error)
}

// Wraps storage.Appender to add validation and logging.
type ReceiveAppender struct {
tLogger log.Logger
tooFarInFuture int64 // Unit: nanoseconds
storage.Appender
}

func (ra *ReceiveAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if ra.tooFarInFuture > 0 {
tooFar := model.Now().Add(time.Duration(ra.tooFarInFuture))
if tooFar.Before(model.Time(t)) {
level.Warn(ra.tLogger).Log("msg", "block metric too far in the future", "lset", lset,
"timestamp", t, "bound", tooFar)
// now + tooFarInFutureTimeWindow < sample timestamp
return 0, storage.ErrOutOfBounds
}
}
return ra.Appender.Append(ref, lset, t, v)
}

type WriterOptions struct {
Intern bool
TooFarInFutureTimeWindow int64 // Unit: nanoseconds
Expand Down Expand Up @@ -81,11 +102,19 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
return errors.Wrap(err, "get appender")
}
getRef := app.(storage.GetRef)
<<<<<<< HEAD
tooFarInFuture := model.Now().Add(time.Duration(r.opts.TooFarInFutureTimeWindow))
=======
>>>>>>> 5d5d39a3 (Add an experimental flag to block samples with timestamp too far in the future (#6195))
var (
ref storage.SeriesRef
errs writeErrors
)
app = &ReceiveAppender{
tLogger: tLogger,
tooFarInFuture: r.opts.TooFarInFutureTimeWindow,
Appender: app,
}
for _, t := range wreq.Timeseries {
// Check if time series labels are valid. If not, skip the time series
// and report the error.
Expand Down

0 comments on commit 04b8e22

Please sign in to comment.