Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix up gorilla mux handling and test locally. #137

Merged
merged 2 commits into from Nov 11, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
80 changes: 32 additions & 48 deletions cmd/cortex/main.go
Expand Up @@ -10,6 +10,7 @@ import (
"syscall"
"time"

"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/route"
Expand Down Expand Up @@ -128,13 +129,15 @@ func main() {
r := ring.New(consul, cfg.distributorConfig.HeartbeatTimeout)
defer r.Stop()

router := mux.NewRouter()
switch cfg.mode {
case modeDistributor:
cfg.distributorConfig.Ring = r
cfg.distributorConfig.ClientFactory = func(address string) (*distributor.IngesterClient, error) {
return distributor.NewIngesterClient(address, cfg.remoteTimeout)
}
setupDistributor(cfg.distributorConfig, chunkStore, cfg.logSuccess)
setupDistributor(cfg.distributorConfig, chunkStore, router.PathPrefix("/api/prom").Subrouter())

case modeIngester:
cfg.ingesterConfig.Ring = r
registration, err := ring.RegisterIngester(consul, cfg.listenPort, cfg.numTokens)
Expand All @@ -143,7 +146,7 @@ func main() {
// network errors.
log.Fatalf("Could not register ingester: %v", err)
}
ing := setupIngester(chunkStore, cfg.ingesterConfig, cfg.logSuccess)
ing := setupIngester(chunkStore, cfg.ingesterConfig, router)

// Deferring a func to make ordering obvious
defer func() {
Expand All @@ -157,8 +160,16 @@ func main() {
log.Fatalf("Mode %s not supported!", cfg.mode)
}

http.Handle("/metrics", prometheus.Handler())
go http.ListenAndServe(fmt.Sprintf(":%d", cfg.listenPort), nil)
router.Handle("/metrics", prometheus.Handler())
instrumented := middleware.Merge(
middleware.Log{
LogSuccess: cfg.logSuccess,
},
middleware.Instrument{
Duration: requestDuration,
},
).Wrap(router)
go http.ListenAndServe(fmt.Sprintf(":%d", cfg.listenPort), instrumented)

term := make(chan os.Signal)
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
Expand Down Expand Up @@ -198,19 +209,18 @@ func setupChunkStore(cfg cfg) (chunk.Store, error) {
func setupDistributor(
cfg distributor.Config,
chunkStore chunk.Store,
logSuccess bool,
router *mux.Router,
) {
dist, err := distributor.New(cfg)
if err != nil {
log.Fatal(err)
}
prometheus.MustRegister(dist)

prefix := "/api/prom"
http.Handle(prefix+"/push", instrument(logSuccess, cortex.AppenderHandler(dist, handleDistributorError)))
router.Path("/push").Handler(cortex.AppenderHandler(dist, handleDistributorError))

// TODO: Move querier to separate binary.
setupQuerier(dist, chunkStore, prefix, logSuccess)
setupQuerier(dist, chunkStore, router)
}

func handleDistributorError(w http.ResponseWriter, err error) {
Expand All @@ -235,8 +245,7 @@ func handleDistributorError(w http.ResponseWriter, err error) {
func setupQuerier(
distributor *distributor.Distributor,
chunkStore chunk.Store,
prefix string,
logSuccess bool,
router *mux.Router,
) {
queryable := querier.Queryable{
Q: querier.MergeQuerier{
Expand All @@ -248,48 +257,35 @@ func setupQuerier(
},
},
}

engine := promql.NewEngine(queryable, nil)

api := v1.NewAPI(engine, querier.DummyStorage{Queryable: queryable})
router := route.New(func(r *http.Request) (context.Context, error) {
promRouter := route.New(func(r *http.Request) (context.Context, error) {
userID := r.Header.Get(userIDHeaderName)
if r.Method != "OPTIONS" && userID == "" {
// For now, getting the user ID from basic auth allows for easy testing
// with Grafana.
// TODO: Remove basic auth support.
userID, _, _ = r.BasicAuth()
if userID == "" {
return nil, fmt.Errorf("missing user ID")
}
}
return user.WithID(context.Background(), userID), nil
})
api.Register(router.WithPrefix(prefix + "/api/v1"))
http.Handle("/", router)

http.Handle(prefix+"/user_stats", instrument(logSuccess, cortex.DistributorUserStatsHandler(distributor.UserStats)))

http.Handle(prefix+"/graph", instrument(logSuccess, ui.GraphHandler()))
http.Handle(prefix+"/static/", instrument(logSuccess, ui.StaticAssetsHandler(prefix+"/static/")))
}).WithPrefix("/api/prom/api/v1")
api.Register(promRouter)
router.PathPrefix("/api/v1").Handler(promRouter)
router.Path("/user_stats").Handler(cortex.DistributorUserStatsHandler(distributor.UserStats))
router.Path("/graph").Handler(ui.GraphHandler())
router.PathPrefix("/static/").Handler(ui.StaticAssetsHandler("/api/prom/static/"))
}

func setupIngester(
chunkStore chunk.Store,
cfg ingester.Config,
logSuccess bool,
router *mux.Router,
) *ingester.Ingester {
ingester, err := ingester.New(cfg, chunkStore)
if err != nil {
log.Fatal(err)
}
prometheus.MustRegister(ingester)

http.Handle("/push", instrument(logSuccess, cortex.AppenderHandler(ingester, handleIngesterError)))
http.Handle("/query", instrument(logSuccess, cortex.QueryHandler(ingester)))
http.Handle("/label_values", instrument(logSuccess, cortex.LabelValuesHandler(ingester)))
http.Handle("/user_stats", instrument(logSuccess, cortex.IngesterUserStatsHandler(ingester.UserStats)))
http.Handle("/ready", instrument(logSuccess, cortex.IngesterReadinessHandler(ingester)))
router.Path("/push").Handler(cortex.AppenderHandler(ingester, handleIngesterError))
router.Path("/query").Handler(cortex.QueryHandler(ingester))
router.Path("/label_values").Handler(cortex.LabelValuesHandler(ingester))
router.Path("/user_stats").Handler(cortex.IngesterUserStatsHandler(ingester.UserStats))
router.Path("/ready").Handler(cortex.IngesterReadinessHandler(ingester))
return ingester
}

Expand All @@ -303,15 +299,3 @@ func handleIngesterError(w http.ResponseWriter, err error) {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}

// instrument instruments a handler.
func instrument(logSuccess bool, handler http.Handler) http.Handler {
return middleware.Merge(
middleware.Log{
LogSuccess: logSuccess,
},
middleware.Instrument{
Duration: requestDuration,
},
).Wrap(handler)
}