Skip to content

Commit

Permalink
added metrics for the data plane (#2005)
Browse files Browse the repository at this point in the history
* added metrics for the data plane

* updated docker-compose.dev.yml with optional prometheus image

* updated RedisQueue as a metrics collector

* updated postgres as a metrics collector

* revert error check in process_meta_event.go

* updated metrics collection for different http status codes

* updated postgres_collector.go

* fixed lint issues and updated some queries

* updated metrics to be configurable

* updated test config

* fixed failing CI tests

* gated metrics with feature flags

* fixed CI issues

* updated metrics tests

* refactored postgres_collector.go
  • Loading branch information
mekilis committed May 22, 2024
1 parent 54e906b commit 6bc5094
Show file tree
Hide file tree
Showing 26 changed files with 656 additions and 13 deletions.
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func (a *ApplicationHandler) BuildControlPlaneRoutes() *chi.Mux {
router.Handle("/metrics", promhttp.HandlerFor(metrics.Reg(), promhttp.HandlerOpts{}))
router.HandleFunc("/*", reactRootHandler)

metrics.RegisterQueueMetrics(a.A.Queue)
metrics.RegisterQueueMetrics(a.A.Queue, a.A.DB)
prometheus.MustRegister(metrics.RequestDuration())
a.Router = router

Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func startServerComponent(ctx context.Context, a *cli.App) error {
a.Logger.WithError(err).Fatal("failed to initialize realm chain")
}

flag := fflag.NewFFlag()
flag, err := fflag.NewFFlag()
if err != nil {
a.Logger.WithError(err).Fatal("failed to create fflag controller")
}
Expand Down
44 changes: 44 additions & 0 deletions cmd/hooks/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
fflag2 "github.com/frain-dev/convoy/internal/pkg/fflag"
"io"
"os"
"time"
Expand Down Expand Up @@ -55,6 +56,10 @@ func PreRun(app *cli.App, db *postgres.Postgres) func(cmd *cobra.Command, args [
if err = config.Override(cliConfig); err != nil {
return err
}
cfg, err = config.Get() // updated
if err != nil {
return err
}

app.TracerShutdown, err = tracer.Init(cfg.Tracer, cmd.Name())
if err != nil {
Expand Down Expand Up @@ -388,6 +393,45 @@ func buildCliConfiguration(cmd *cobra.Command) (*config.Configuration, error) {

}

flag, err := fflag2.NewFFlag()
if err != nil {
return nil, err
}
c.Metrics = config.MetricsConfiguration{
IsEnabled: false,
}
if flag.CanAccessFeature(fflag2.Prometheus, c) {
metricsBackend, err := cmd.Flags().GetString("metrics-backend")
if err != nil {
return nil, err
}
if !config.IsStringEmpty(metricsBackend) {
c.Metrics = config.MetricsConfiguration{
IsEnabled: false,
Backend: config.MetricsBackend(metricsBackend),
}
switch c.Metrics.Backend {
case config.PrometheusMetricsProvider:
sampleTime, err := cmd.Flags().GetUint64("metrics-prometheus-sample-time")
if err != nil {
return nil, err
}
if sampleTime < 1 {
return nil, errors.New("metrics-prometheus-sample-time must be non-zero")
}
c.Metrics = config.MetricsConfiguration{
IsEnabled: true,
Backend: config.MetricsBackend(metricsBackend),
Prometheus: config.PrometheusMetricsConfiguration{
SampleTime: sampleTime,
},
}
}
} else {
log.Warn("No metrics-backend specified")
}
}

return c, nil
}

Expand Down
6 changes: 5 additions & 1 deletion cmd/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"github.com/frain-dev/convoy/database/postgres"
"github.com/frain-dev/convoy/internal/pkg/cli"
"github.com/frain-dev/convoy/internal/pkg/memorystore"
"github.com/frain-dev/convoy/internal/pkg/metrics"
"github.com/frain-dev/convoy/internal/pkg/pubsub"
"github.com/frain-dev/convoy/internal/pkg/server"
"github.com/frain-dev/convoy/pkg/log"
"github.com/frain-dev/convoy/util"
"github.com/go-chi/chi/v5"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -73,7 +75,9 @@ func AddIngestCommand(a *cli.App) *cobra.Command {
go ingest.Run()

srv := server.NewServer(cfg.Server.HTTP.IngestPort, func() {})
srv.SetHandler(chi.NewMux())
mux := chi.NewMux()
mux.Handle("/metrics", promhttp.HandlerFor(metrics.Reg(), promhttp.HandlerOpts{Registry: metrics.Reg()}))
srv.SetHandler(mux)

a.Logger.Info("Starting Convoy Message Broker Ingester...")

Expand Down
6 changes: 6 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func main() {
var otelCollectorURL string
var otelAuthHeaderName string
var otelAuthHeaderValue string
var metricsBackend string
var prometheusMetricsSampleTime uint64

var configFile string

Expand Down Expand Up @@ -99,6 +101,10 @@ func main() {
c.Flags().StringVar(&otelAuthHeaderName, "otel-auth-header-name", "", "OTel backend auth header name")
c.Flags().StringVar(&otelAuthHeaderValue, "otel-auth-header-value", "", "OTel backend auth header value")

// metrics
c.Flags().StringVar(&metricsBackend, "metrics-backend", "prometheus", "Metrics backend e.g. prometheus. ('experimental' feature flag level required")
c.Flags().Uint64Var(&prometheusMetricsSampleTime, "metrics-prometheus-sample-time", 5, "Prometheus metrics sample time")

c.PersistentPreRunE(hooks.PreRun(app, db))
c.PersistentPostRunE(hooks.PostRun(app, db))

Expand Down
2 changes: 1 addition & 1 deletion cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func startConvoyServer(a *cli.App) error {
a.Logger.WithError(err).Fatal("failed to initialize realm chain")
}

flag := fflag.NewFFlag()
flag, err := fflag.NewFFlag()
if err != nil {
a.Logger.WithError(err).Fatal("failed to create fflag controller")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func AddWorkerCommand(a *cli.App) *cobra.Command {
lo.Infof("Starting Convoy workers...")
consumer.Start()

metrics.RegisterQueueMetrics(a.Queue)
metrics.RegisterQueueMetrics(a.Queue, a.DB)

router := chi.NewRouter()
router.Handle("/metrics", promhttp.HandlerFor(metrics.Reg(), promhttp.HandlerOpts{}))
Expand Down
33 changes: 33 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ var DefaultConfiguration = Configuration{
},
},
EnableProfiling: false,
Metrics: MetricsConfiguration{
IsEnabled: false,
Backend: PrometheusMetricsProvider,
Prometheus: PrometheusMetricsConfiguration{
SampleTime: 5,
},
},
}

type DatabaseConfiguration struct {
Expand Down Expand Up @@ -266,6 +273,16 @@ type OnPremStorage struct {
Path string `json:"path" envconfig:"CONVOY_STORAGE_PREM_PATH"`
}

type MetricsConfiguration struct {
IsEnabled bool `json:"metrics_enabled" envconfig:"CONVOY_METRICS_ENABLED"`
Backend MetricsBackend `json:"metrics_backend" envconfig:"CONVOY_METRICS_BACKEND"`
Prometheus PrometheusMetricsConfiguration `json:"prometheus_metrics"`
}

type PrometheusMetricsConfiguration struct {
SampleTime uint64 `json:"sample_time"`
}

const (
envPrefix string = "convoy"
OSSEnvironment string = "oss"
Expand All @@ -284,6 +301,10 @@ const (
TypesenseSearchProvider SearchProvider = "typesense"
)

const (
PrometheusMetricsProvider MetricsBackend = "prometheus"
)

type (
AuthProvider string
QueueProvider string
Expand All @@ -294,6 +315,7 @@ type (
DatabaseProvider string
SearchProvider string
FeatureFlagProvider string
MetricsBackend string
)

func (s SignatureHeaderProvider) String() string {
Expand Down Expand Up @@ -344,6 +366,7 @@ type Configuration struct {
StoragePolicy StoragePolicyConfiguration `json:"storage_policy"`
ConsumerPoolSize int `json:"consumer_pool_size" envconfig:"CONVOY_CONSUMER_POOL_SIZE"`
EnableProfiling bool `json:"enable_profiling" envconfig:"CONVOY_ENABLE_PROFILING"`
Metrics MetricsConfiguration `json:"metrics" envconfig:"CONVOY_METRICS"`
}

// Get fetches the application configuration. LoadConfig must have been called
Expand Down Expand Up @@ -473,5 +496,15 @@ func validate(c *Configuration) error {
return err
}

if c.Metrics.IsEnabled {
backend := c.Metrics.Backend
switch backend {
case PrometheusMetricsProvider:
break
default:
c.Metrics.IsEnabled = false
}
}

return nil
}
21 changes: 21 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ func TestLoadConfig(t *testing.T) {
InsecureSkipVerify: true,
},
},
Metrics: MetricsConfiguration{
IsEnabled: false,
Backend: "prometheus",
Prometheus: PrometheusMetricsConfiguration{
SampleTime: 5,
},
},
},
wantErr: false,
wantErrMsg: "",
Expand Down Expand Up @@ -223,6 +230,13 @@ func TestLoadConfig(t *testing.T) {
InsecureSkipVerify: true,
},
},
Metrics: MetricsConfiguration{
IsEnabled: false,
Backend: "prometheus",
Prometheus: PrometheusMetricsConfiguration{
SampleTime: 5,
},
},
},
wantErr: false,
wantErrMsg: "",
Expand Down Expand Up @@ -288,6 +302,13 @@ func TestLoadConfig(t *testing.T) {
InsecureSkipVerify: true,
},
},
Metrics: MetricsConfiguration{
IsEnabled: false,
Backend: "prometheus",
Prometheus: PrometheusMetricsConfiguration{
SampleTime: 5,
},
},
},
wantErr: false,
wantErrMsg: "",
Expand Down
8 changes: 7 additions & 1 deletion convoy.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,11 @@
"path": "<insert-on-prem-path>"
}
},
"consumer_pool_size": 200
"consumer_pool_size": 200,
"metrics": {
"metrics_backend": "prometheus",
"prometheus_metrics": {
"sample_time": 10
}
}
}
Loading

0 comments on commit 6bc5094

Please sign in to comment.