From db1fa3f2f94173fe9a35864e80ad32a0bd309d44 Mon Sep 17 00:00:00 2001 From: Andrew Neudegg Date: Thu, 31 Dec 2020 00:24:30 +0000 Subject: [PATCH] feat: added telemetry middleware for routing. Signed-off-by: Andrew Neudegg --- pkg/pipeline/flow_middleware.go | 47 +++++++++++++++++++++++++++++++++ pkg/pipeline/pipeline.go | 14 +++++----- 2 files changed, 53 insertions(+), 8 deletions(-) create mode 100644 pkg/pipeline/flow_middleware.go diff --git a/pkg/pipeline/flow_middleware.go b/pkg/pipeline/flow_middleware.go new file mode 100644 index 0000000..ad827d2 --- /dev/null +++ b/pkg/pipeline/flow_middleware.go @@ -0,0 +1,47 @@ +package pipeline + +import ( + "fmt" + + "github.com/andrewneudegg/delta/pkg/events" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// EventMiddlewareFunc is a function that will be called on each event. +type EventMiddlewareFunc func(events.Event) + +// Inject wraps the given channel with another channel that actions the middleware on each item in a separate goroutine. +func Inject(ch chan events.Event, middleware EventMiddlewareFunc) chan events.Event { + newOutPipe := make(chan events.Event) + + go func() { + for { + select { + case e := <-ch: + go middleware(e) + newOutPipe <- e + } + } + }() + + return newOutPipe +} + +// NoopEventMiddleware does not do anything, but matches the prometheus definition. +func NoopEventMiddleware(counterName string) func(events.Event) { + return func(e events.Event) {} +} + +// PrometheusTelemetryEventMiddleware will increment a counter with the given name. +func PrometheusTelemetryEventMiddleware(counterName string) func(events.Event) { + + counter := promauto.NewCounter(prometheus.CounterOpts{ + Name: counterName, + Help: fmt.Sprintf("autogenerated counter for '%s'", counterName), + }) + + return func(e events.Event) { + counter.Add(1) + } +} diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 6c38f15..71310fc 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -16,9 +16,6 @@ import ( // Pipeline is the representation of data flow through this application. type Pipeline struct { - inCh chan events.Event // inCh merges all input channels into a singular channel. - outCh chan events.Event // outCh replicates events to all output channels. - sources []source.S relays []relay.R distributors []distributor.D @@ -27,8 +24,6 @@ type Pipeline struct { // BuildPipeline will construct the pipeline at the core of delta. func BuildPipeline(c configuration.Container) (Pipeline, error) { p := Pipeline{ - inCh: make(chan events.Event), - outCh: make(chan events.Event), sources: make([]source.S, 0), distributors: make([]distributor.D, 0), } @@ -70,14 +65,16 @@ func BuildPipeline(c configuration.Container) (Pipeline, error) { sourceChannels = append(sourceChannels, thisSourceChan) } + // TODO: Use prometheus middleware here instead... + inCh := Inject(make(chan events.Event), NoopEventMiddleware("inbound")) go func() { - fanIn(context.TODO(), sourceChannels, p.inCh) + fanIn(context.TODO(), sourceChannels, inCh) }() // -- -- var previousSourceOutput *chan events.Event - previousSourceOutput = &p.inCh + previousSourceOutput = &inCh for _, r := range p.relays { thisRelayOutputChan := make(chan events.Event) go r.Do(context.TODO(), *previousSourceOutput, thisRelayOutputChan) @@ -95,7 +92,8 @@ func BuildPipeline(c configuration.Container) (Pipeline, error) { } go func() { - fanOut(context.TODO(), *previousSourceOutput, distributorChannels) + // TODO: Use prometheus middleware here instead... + fanOut(context.TODO(), Inject(*previousSourceOutput, NoopEventMiddleware("outbound")), distributorChannels) }() return p, nil