Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Pusher interface {
Push(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
}

type pusherAppender struct {
type PusherAppender struct {
failedWrites prometheus.Counter
totalWrites prometheus.Counter

Expand All @@ -39,7 +39,7 @@ type pusherAppender struct {
evaluationDelay time.Duration
}

func (a *pusherAppender) Append(_ uint64, l labels.Labels, t int64, v float64) (uint64, error) {
func (a *PusherAppender) Append(_ uint64, l labels.Labels, t int64, v float64) (uint64, error) {
a.labels = append(a.labels, l)

// Adapt staleness markers for ruler evaluation delay. As the upstream code
Expand All @@ -61,11 +61,11 @@ func (a *pusherAppender) Append(_ uint64, l labels.Labels, t int64, v float64) (
return 0, nil
}

func (a *pusherAppender) AppendExemplar(_ uint64, _ labels.Labels, _ exemplar.Exemplar) (uint64, error) {
func (a *PusherAppender) AppendExemplar(_ uint64, _ labels.Labels, _ exemplar.Exemplar) (uint64, error) {
return 0, errors.New("exemplars are unsupported")
}

func (a *pusherAppender) Commit() error {
func (a *PusherAppender) Commit() error {
a.totalWrites.Inc()

// Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push.
Expand All @@ -84,7 +84,7 @@ func (a *pusherAppender) Commit() error {
return err
}

func (a *pusherAppender) Rollback() error {
func (a *PusherAppender) Rollback() error {
a.labels = nil
a.samples = nil
return nil
Expand Down Expand Up @@ -112,7 +112,7 @@ func NewPusherAppendable(pusher Pusher, userID string, limits RulesLimits, total

// Appender returns a storage.Appender
func (t *PusherAppendable) Appender(ctx context.Context) storage.Appender {
return &pusherAppender{
return &PusherAppender{
failedWrites: t.failedWrites,
totalWrites: t.totalWrites,

Expand All @@ -131,9 +131,9 @@ type RulesLimits interface {
RulerMaxRulesPerRuleGroup(userID string) int
}

// engineQueryFunc returns a new query function using the rules.EngineQueryFunc function
// EngineQueryFunc returns a new query function using the rules.EngineQueryFunc function
// and passing an altered timestamp.
func engineQueryFunc(engine *promql.Engine, q storage.Queryable, overrides RulesLimits, userID string) rules.QueryFunc {
func EngineQueryFunc(engine *promql.Engine, q storage.Queryable, overrides RulesLimits, userID string) rules.QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
orig := rules.EngineQueryFunc(engine, q)
// Delay the evaluation of all rules by a set interval to give a buffer
Expand All @@ -143,7 +143,7 @@ func engineQueryFunc(engine *promql.Engine, q storage.Queryable, overrides Rules
}
}

func metricsQueryFunc(qf rules.QueryFunc, queries, failedQueries prometheus.Counter) rules.QueryFunc {
func MetricsQueryFunc(qf rules.QueryFunc, queries, failedQueries prometheus.Counter) rules.QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
queries.Inc()

Expand Down Expand Up @@ -204,7 +204,7 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
return rules.NewManager(&rules.ManagerOptions{
Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites),
Queryable: q,
QueryFunc: metricsQueryFunc(engineQueryFunc(engine, q, overrides, userID), totalQueries, failedQueries),
QueryFunc: MetricsQueryFunc(EngineQueryFunc(engine, q, overrides, userID), totalQueries, failedQueries),
Context: user.InjectOrgID(ctx, userID),
ExternalURL: cfg.ExternalURL.URL,
NotifyFunc: SendAlerts(notifier, cfg.ExternalURL.URL.String()),
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func TestMetricsQueryFuncErrors(t *testing.T) {
return promql.Vector{}, tc.returnedError
}

qf := metricsQueryFunc(mockFunc, queries, failures)
qf := MetricsQueryFunc(mockFunc, queries, failures)

_, err := qf(context.Background(), "test", time.Now())
require.Equal(t, tc.returnedError, err)
Expand Down