Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ruler (take 2) #2458

Merged
merged 42 commits into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
ed55330
begins speccing out ruler
owen-d May 27, 2020
1f261a7
upstream conflicts
owen-d Jun 9, 2020
1157df5
implicit ast impls, parser for ruler
owen-d Jun 9, 2020
6beb78d
/api/prom ruler routes, ruler enabled in single binary
owen-d Jun 10, 2020
9f785db
registers ruler flags, doesnt double instantiate metrics
owen-d Jun 10, 2020
728436b
cleanup for old samples in ruler
owen-d Jun 23, 2020
22f764b
begins ruler tests
owen-d Jun 23, 2020
4e2f97b
ForStateAppenderQuerier tests
owen-d Jun 23, 2020
26fe293
memhistory stop
owen-d Jun 24, 2020
f753eb4
RestoreForState test
owen-d Jun 24, 2020
70cd160
upstream querier ifc
owen-d Jun 24, 2020
68eab85
introducing loki ruler metrics
owen-d Jun 24, 2020
9c83bae
removes rule granularity metric -- to be discussed in pr
owen-d Jun 24, 2020
5e90c13
validates ruler cfg
owen-d Jun 25, 2020
e575b15
renames gauge metrics to not use total
owen-d Jun 25, 2020
4f30817
removes unnecessary logs
owen-d Jun 25, 2020
30e9b52
logs synthetic restoreforstate
owen-d Jun 26, 2020
272043d
logs tenant in ruler
owen-d Jun 26, 2020
9d299e2
sets cortex to owen's unmerged fork
owen-d Jun 29, 2020
026abb9
begins porting rules pkg
owen-d Jul 7, 2020
2d9ce8b
memstore work
owen-d Jul 8, 2020
e825231
work on queryable based in memory series store
owen-d Jul 9, 2020
33570eb
removes unused pkgs, adds memstore test
owen-d Jul 23, 2020
82f2e79
MemStore must be started after construction
owen-d Jul 23, 2020
5f7e57e
MemstoreTenantManager
owen-d Jul 23, 2020
a254a9a
ruler loading
owen-d Jul 23, 2020
367e18a
ruler instantiation
owen-d Jul 29, 2020
7c9eca4
better metrics & logging in ruler
owen-d Jul 30, 2020
25ef5aa
grpc cortex compatibility in go.mod
owen-d Jul 30, 2020
94b80ca
cortex vendoring compat
owen-d Jul 30, 2020
72dbbd8
increments memory cache hits only if cached
owen-d Jul 30, 2020
04e6950
loki in memory metrics use prometheus default registerer
owen-d Jul 30, 2020
262e394
ruler only depends on ring
owen-d Jul 30, 2020
cabf036
managerfactory rename
owen-d Jul 31, 2020
f384d8e
revendors cortex
owen-d Jul 31, 2020
5f0ec00
Merge remote-tracking branch 'upstream/master' into feature/ruler-v3
owen-d Jul 31, 2020
d872100
ignore emacs stashing
owen-d Jul 31, 2020
91a71e2
adds comments
owen-d Aug 20, 2020
7bad731
ruler /loki/api/v1 prefix
owen-d Aug 25, 2020
20e8d2d
Merge remote-tracking branch 'upstream/master' into feature/ruler-v3
owen-d Aug 25, 2020
cff0efa
revendoring compat
owen-d Aug 25, 2020
2a8662a
comment
owen-d Aug 25, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ dist
coverage.txt
.DS_Store
.aws-sam

# emacs
.#*
188 changes: 13 additions & 175 deletions go.sum

Large diffs are not rendered by default.

24 changes: 10 additions & 14 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ type QueryParams interface {
GetShards() []string
}

// implicit holds default implementations
type implicit struct{}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: these changes are unrelated, but I snuck them in 😈


func (implicit) logQLExpr() {}

// SelectParams specifies parameters passed to data selections.
type SelectLogParams struct {
*logproto.QueryRequest
Expand Down Expand Up @@ -75,6 +80,7 @@ type LogSelectorExpr interface {

type matchersExpr struct {
matchers []*labels.Matcher
implicit
}

func newMatcherExpr(matchers []*labels.Matcher) LogSelectorExpr {
Expand Down Expand Up @@ -102,13 +108,11 @@ func (e *matchersExpr) Filter() (LineFilter, error) {
return nil, nil
}

// impl Expr
func (e *matchersExpr) logQLExpr() {}

type filterExpr struct {
left LogSelectorExpr
ty labels.MatchType
match string
implicit
}

// NewFilterExpr wraps an existing Expr with a next filter expression.
Expand Down Expand Up @@ -163,9 +167,6 @@ func (e *filterExpr) Filter() (LineFilter, error) {
return f, nil
}

// impl Expr
func (e *filterExpr) logQLExpr() {}

func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher {
m, err := labels.NewMatcher(t, n, v)
if err != nil {
Expand Down Expand Up @@ -275,6 +276,7 @@ type SampleExpr interface {
type rangeAggregationExpr struct {
left *logRange
operation string
implicit
}

func newRangeAggregationExpr(left *logRange, operation string) SampleExpr {
Expand All @@ -288,9 +290,6 @@ func (e *rangeAggregationExpr) Selector() LogSelectorExpr {
return e.left.left
}

// impl Expr
func (e *rangeAggregationExpr) logQLExpr() {}

// impls Stringer
func (e *rangeAggregationExpr) String() string {
return formatOperation(e.operation, nil, e.left.String())
Expand Down Expand Up @@ -330,6 +329,7 @@ type vectorAggregationExpr struct {
grouping *grouping
params int
operation string
implicit
}

func mustNewVectorAggregationExpr(left SampleExpr, operation string, gr *grouping, params *string) SampleExpr {
Expand Down Expand Up @@ -368,9 +368,6 @@ func (e *vectorAggregationExpr) Extractor() (SampleExtractor, error) {
return e.left.Extractor()
}

// impl Expr
func (e *vectorAggregationExpr) logQLExpr() {}

func (e *vectorAggregationExpr) String() string {
var params []string
if e.params != 0 {
Expand Down Expand Up @@ -479,6 +476,7 @@ func reduceBinOp(op string, left, right *literalExpr) *literalExpr {

type literalExpr struct {
value float64
implicit
}

func mustNewLiteralExpr(s string, invert bool) *literalExpr {
Expand All @@ -496,8 +494,6 @@ func mustNewLiteralExpr(s string, invert bool) *literalExpr {
}
}

func (e *literalExpr) logQLExpr() {}

func (e *literalExpr) String() string {
return fmt.Sprintf("%f", e.value)
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
cortex_ruler "github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/ruler/rules"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
"github.com/cortexproject/cortex/pkg/util/services"
Expand All @@ -35,6 +37,7 @@ import (
"github.com/grafana/loki/pkg/lokifrontend"
"github.com/grafana/loki/pkg/querier"
"github.com/grafana/loki/pkg/querier/queryrange"
"github.com/grafana/loki/pkg/ruler"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/tracing"
serverutil "github.com/grafana/loki/pkg/util/server"
Expand All @@ -59,6 +62,7 @@ type Config struct {
TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"`
Worker frontend.WorkerConfig `yaml:"frontend_worker,omitempty"`
Frontend lokifrontend.Config `yaml:"frontend,omitempty"`
Ruler ruler.Config `yaml:"ruler,omitempty"`
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
RuntimeConfig runtimeconfig.ManagerConfig `yaml:"runtime_config,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
Expand All @@ -85,6 +89,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.LimitsConfig.RegisterFlags(f)
c.TableManager.RegisterFlags(f)
c.Frontend.RegisterFlags(f)
c.Ruler.RegisterFlags(f)
c.Worker.RegisterFlags(f)
c.QueryRange.RegisterFlags(f)
c.RuntimeConfig.RegisterFlags(f)
Expand Down Expand Up @@ -115,6 +120,9 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.TableManager.Validate(); err != nil {
return errors.Wrap(err, "invalid tablemanager config")
}
if err := c.Ruler.Validate(); err != nil {
return errors.Wrap(err, "invalid ruler config")
}
return nil
}

Expand All @@ -135,6 +143,8 @@ type Loki struct {
store storage.Store
tableManager *chunk.TableManager
frontend *frontend.Frontend
ruler *cortex_ruler.Ruler
RulerStorage rules.RuleStore
stopper queryrange.Stopper
runtimeConfig *runtimeconfig.Manager
memberlistKV *memberlist.KVInitService
Expand Down Expand Up @@ -308,6 +318,8 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(Ingester, t.initIngester)
mm.RegisterModule(Querier, t.initQuerier)
mm.RegisterModule(QueryFrontend, t.initQueryFrontend)
mm.RegisterModule(RulerStorage, t.initRulerStorage, modules.UserInvisibleModule)
mm.RegisterModule(Ruler, t.initRuler)
mm.RegisterModule(TableManager, t.initTableManager)
mm.RegisterModule(Compactor, t.initCompactor)
mm.RegisterModule(All, nil)
Expand All @@ -321,6 +333,7 @@ func (t *Loki) setupModuleManager() error {
Ingester: {Store, Server, MemberlistKV},
Querier: {Store, Ring, Server},
QueryFrontend: {Server, Overrides},
Ruler: {Ring, Server, Store, RulerStorage},
TableManager: {Server},
Compactor: {Server},
All: {Querier, Ingester, Distributor, TableManager},
Expand Down
71 changes: 71 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
cortex_ruler "github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
"github.com/cortexproject/cortex/pkg/util/services"
Expand All @@ -34,8 +35,10 @@ import (
"github.com/grafana/loki/pkg/distributor"
"github.com/grafana/loki/pkg/ingester"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/querier"
"github.com/grafana/loki/pkg/querier/queryrange"
"github.com/grafana/loki/pkg/ruler"
loki_storage "github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper"
serverutil "github.com/grafana/loki/pkg/util/server"
Expand All @@ -54,6 +57,8 @@ const (
Ingester string = "ingester"
Querier string = "querier"
QueryFrontend string = "query-frontend"
RulerStorage string = "ruler-storage"
Ruler string = "ruler"
Store string = "store"
TableManager string = "table-manager"
MemberlistKV string = "memberlist-kv"
Expand Down Expand Up @@ -353,6 +358,72 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
}), nil
}

func (t *Loki) initRulerStorage() (_ services.Service, err error) {
// if the ruler is not configured and we're in single binary then let's just log an error and continue.
// unfortunately there is no way to generate a "default" config and compare default against actual
// to determine if it's unconfigured. the following check, however, correctly tests this.
// Single binary integration tests will break if this ever drifts
if t.cfg.Target == All && t.cfg.Ruler.StoreConfig.IsDefaults() {
level.Info(util.Logger).Log("msg", "RulerStorage is not configured in single binary mode and will not be started.")
slim-bean marked this conversation as resolved.
Show resolved Hide resolved
return
}

t.RulerStorage, err = cortex_ruler.NewRuleStorage(t.cfg.Ruler.StoreConfig)

return
}

func (t *Loki) initRuler() (_ services.Service, err error) {
if t.RulerStorage == nil {
level.Info(util.Logger).Log("msg", "RulerStorage is nil. Not starting the ruler.")
return nil, nil
}

t.cfg.Ruler.Ring.ListenPort = t.cfg.Server.GRPCListenPort
t.cfg.Ruler.Ring.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
q, err := querier.New(t.cfg.Querier, t.cfg.IngesterClient, t.ring, t.store, t.overrides)
if err != nil {
return nil, err
}

engine := logql.NewEngine(t.cfg.Querier.Engine, q)

t.ruler, err = ruler.NewRuler(
t.cfg.Ruler,
engine,
prometheus.DefaultRegisterer,
util.Logger,
t.RulerStorage,
)

if err != nil {
return
}

// Expose HTTP endpoints.
if t.cfg.Ruler.EnableAPI {

t.server.HTTP.Handle("/ruler/ring", t.ruler)
cortex_ruler.RegisterRulerServer(t.server.GRPC, t.ruler)

// Ruler Legacy API Routes
t.server.HTTP.Path("/api/prom/rules").Methods("GET").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.ListRules)))
owen-d marked this conversation as resolved.
Show resolved Hide resolved
t.server.HTTP.Path("/api/prom/rules/{namespace}").Methods("GET").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.ListRules)))
t.server.HTTP.Path("/api/prom/rules/{namespace}/{groupName}").Methods("GET").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.GetRuleGroup)))
t.server.HTTP.Path("/api/prom/rules/{namespace}").Methods("POST").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.CreateRuleGroup)))
t.server.HTTP.Path("/api/prom/rules/{namespace}/{groupName}").Methods("DELETE").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.DeleteRuleGroup)))

// Ruler API Routes
t.server.HTTP.Path("/loki/api/v1/rules").Methods("GET").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.ListRules)))
t.server.HTTP.Path("/loki/api/v1/rules/{namespace}").Methods("GET").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.ListRules)))
t.server.HTTP.Path("/loki/api/v1/rules/{namespace}/{groupName}").Methods("GET").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.GetRuleGroup)))
t.server.HTTP.Path("/loki/api/v1/rules/{namespace}").Methods("POST").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.CreateRuleGroup)))
t.server.HTTP.Path("/loki/api/v1/rules/{namespace}/{groupName}").Methods("DELETE").Handler(t.httpAuthMiddleware.Wrap(http.HandlerFunc(t.ruler.DeleteRuleGroup)))
}

return t.ruler, nil
}

func (t *Loki) initMemberlistKV() (services.Service, error) {
t.cfg.MemberlistKV.MetricsRegisterer = prometheus.DefaultRegisterer
t.cfg.MemberlistKV.Codecs = []codec.Codec{
Expand Down
123 changes: 123 additions & 0 deletions pkg/ruler/manager/compat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package manager

import (
"context"
"time"

"github.com/cortexproject/cortex/pkg/ruler"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/rules"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
)

// engineQueryFunc returns a new query function using the rules.EngineQueryFunc function
// and passing an altered timestamp.
func engineQueryFunc(engine *logql.Engine, delay time.Duration) rules.QueryFunc {
return rules.QueryFunc(func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
adjusted := t.Add(-delay)
params := logql.NewLiteralParams(
qs,
adjusted,
adjusted,
0,
0,
logproto.FORWARD,
0,
nil,
)
q := engine.Query(params)

res, err := q.Exec(ctx)
if err != nil {
return nil, err
}
switch v := res.Data.(type) {
case promql.Vector:
return v, nil
case promql.Scalar:
return promql.Vector{promql.Sample{
Point: promql.Point(v),
Metric: labels.Labels{},
}}, nil
default:
return nil, errors.New("rule result is not a vector or scalar")
}
})

}

func MemstoreTenantManager(
cfg ruler.Config,
engine *logql.Engine,
) ruler.ManagerFactory {
var metrics *Metrics

return func(
ctx context.Context,
userID string,
notifier *notifier.Manager,
logger log.Logger,
reg prometheus.Registerer,
) *rules.Manager {

// We'll ignore the passed registere and use the default registerer to avoid prefix issues and other weirdness.
// This closure prevents re-registering.
if metrics == nil {
metrics = NewMetrics(prometheus.DefaultRegisterer)
}
logger = log.With(logger, "user", userID)
queryFunc := engineQueryFunc(engine, cfg.EvaluationDelay)
memStore := NewMemStore(userID, queryFunc, metrics, 5*time.Minute, log.With(logger, "subcomponent", "MemStore"))

mgr := rules.NewManager(&rules.ManagerOptions{
Appendable: NoopAppender{},
Queryable: memStore,
QueryFunc: queryFunc,
Context: user.InjectOrgID(ctx, userID),
ExternalURL: cfg.ExternalURL.URL,
NotifyFunc: ruler.SendAlerts(notifier, cfg.ExternalURL.URL.String()),
Logger: logger,
Registerer: reg,
OutageTolerance: cfg.OutageTolerance,
ForGracePeriod: cfg.ForGracePeriod,
ResendDelay: cfg.ResendDelay,
GroupLoader: groupLoader{},
})

// initialize memStore, bound to the manager's alerting rules
memStore.Start(mgr)

return mgr
}
}

type groupLoader struct {
rules.FileLoader // embed the default and override the parse method for logql queries
}

func (groupLoader) Parse(query string) (parser.Expr, error) {
expr, err := logql.ParseExpr(query)
if err != nil {
return nil, err
}

return exprAdapter{expr}, nil
}

// Allows logql expressions to be treated as promql expressions by the prometheus rules pkg.
type exprAdapter struct {
logql.Expr
}

func (exprAdapter) PositionRange() parser.PositionRange { return parser.PositionRange{} }
func (exprAdapter) PromQLExpr() {}
func (exprAdapter) Type() parser.ValueType { return parser.ValueType("unimplemented") }
Loading