Skip to content

Commit

Permalink
Add an experimental flag to block samples with timestamp too far in t…
Browse files Browse the repository at this point in the history
…he future (thanos-io#6195)

* Add an experimental flag to block samples with timestamp too far in the future

Signed-off-by: Yi Jin <yi.jin@databricks.com>

* fix bug

Signed-off-by: Yi Jin <yi.jin@databricks.com>

* address comments

Signed-off-by: Yi Jin <yi.jin@databricks.com>

* fix docs CI errors

Signed-off-by: Yi Jin <yi.jin@databricks.com>

* resolve merge conflicts

Signed-off-by: Yi Jin <yi.jin@databricks.com>

* resolve merge conflicts

Signed-off-by: Yi Jin <yi.jin@databricks.com>

* retrigger checks

Signed-off-by: Yi Jin <yi.jin@databricks.com>

---------

Signed-off-by: Yi Jin <yi.jin@databricks.com>
  • Loading branch information
jnyi authored and HC Zhu committed Jun 27, 2023
1 parent 00aac20 commit c413855
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 0 deletions.
5 changes: 5 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,8 +880,13 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

rc.tsdbTooFarInFutureTimeWindow = extkingpin.ModelDuration(cmd.Flag("tsdb.too-far-in-future.time-window",
"[EXPERIMENTAL] Configures the allowed time window for ingesting samples too far in the future. Disabled (0s) by default"+
<<<<<<< HEAD
"Please note enable this flag will reject samples in the future of receive local NTP time + configured duration.",
).Default("0s").Hidden())
=======
"Please note enable this flag will reject samples in the future of receive local NTP time + configured duration due to clock skew in remote write clients.",
).Default("0s"))
>>>>>>> 5d5d39a3 (Add an experimental flag to block samples with timestamp too far in the future (#6195))

rc.tsdbOutOfOrderTimeWindow = extkingpin.ModelDuration(cmd.Flag("tsdb.out-of-order.time-window",
"[EXPERIMENTAL] Configures the allowed time window for ingestion of out-of-order samples. Disabled (0s) by default"+
Expand Down
8 changes: 8 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,14 @@ Flags:
refer to the Tenant lifecycle management
section in the Receive documentation:
https://thanos.io/tip/components/receive.md/#tenant-lifecycle-management
--tsdb.too-far-in-future.time-window=0s
[EXPERIMENTAL] Configures the allowed time
window for ingesting samples too far in the
future. Disabled (0s) by defaultPlease note
enable this flag will reject samples in the
future of receive local NTP time + configured
duration due to clock skew in remote write
clients.
--tsdb.wal-compression Compress the tsdb WAL.
--version Show application version.
Expand Down
32 changes: 32 additions & 0 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"

Expand All @@ -28,6 +29,29 @@ type TenantStorage interface {
TenantAppendable(string) (Appendable, error)
}

<<<<<<< HEAD
=======
// Wraps storage.Appender to add validation and logging.
type ReceiveAppender struct {
tLogger log.Logger
tooFarInFuture int64 // Unit: nanoseconds
storage.Appender
}

func (ra *ReceiveAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if ra.tooFarInFuture > 0 {
tooFar := model.Now().Add(time.Duration(ra.tooFarInFuture))
if tooFar.Before(model.Time(t)) {
level.Warn(ra.tLogger).Log("msg", "block metric too far in the future", "lset", lset,
"timestamp", t, "bound", tooFar)
// now + tooFarInFutureTimeWindow < sample timestamp
return 0, storage.ErrOutOfBounds
}
}
return ra.Appender.Append(ref, lset, t, v)
}

>>>>>>> 5d5d39a3 (Add an experimental flag to block samples with timestamp too far in the future (#6195))
type WriterOptions struct {
Intern bool
TooFarInFutureTimeWindow int64 // Unit: nanoseconds
Expand Down Expand Up @@ -81,11 +105,19 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
return errors.Wrap(err, "get appender")
}
getRef := app.(storage.GetRef)
<<<<<<< HEAD
tooFarInFuture := model.Now().Add(time.Duration(r.opts.TooFarInFutureTimeWindow))
=======
>>>>>>> 5d5d39a3 (Add an experimental flag to block samples with timestamp too far in the future (#6195))
var (
ref storage.SeriesRef
errs writeErrors
)
app = &ReceiveAppender{
tLogger: tLogger,
tooFarInFuture: r.opts.TooFarInFutureTimeWindow,
Appender: app,
}
for _, t := range wreq.Timeseries {
// Check if time series labels are valid. If not, skip the time series
// and report the error.
Expand Down

0 comments on commit c413855

Please sign in to comment.