Skip to content

Commit

Permalink
feat: added telemetry middleware for routing.
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Neudegg <andrew.neudegg@finbourne.com>
  • Loading branch information
AndrewNeudegg committed Dec 31, 2020
1 parent 871d448 commit db1fa3f
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 8 deletions.
47 changes: 47 additions & 0 deletions pkg/pipeline/flow_middleware.go
@@ -0,0 +1,47 @@
package pipeline

import (
"fmt"

"github.com/andrewneudegg/delta/pkg/events"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// EventMiddlewareFunc is a function that will be called on each event.
type EventMiddlewareFunc func(events.Event)

// Inject wraps the given channel with another channel that actions the middleware on each item in a separate goroutine.
func Inject(ch chan events.Event, middleware EventMiddlewareFunc) chan events.Event {
newOutPipe := make(chan events.Event)

go func() {
for {
select {
case e := <-ch:
go middleware(e)
newOutPipe <- e
}
}
}()

return newOutPipe
}

// NoopEventMiddleware does not do anything, but matches the prometheus definition.
func NoopEventMiddleware(counterName string) func(events.Event) {
return func(e events.Event) {}
}

// PrometheusTelemetryEventMiddleware will increment a counter with the given name.
func PrometheusTelemetryEventMiddleware(counterName string) func(events.Event) {

counter := promauto.NewCounter(prometheus.CounterOpts{
Name: counterName,
Help: fmt.Sprintf("autogenerated counter for '%s'", counterName),
})

return func(e events.Event) {
counter.Add(1)
}
}
14 changes: 6 additions & 8 deletions pkg/pipeline/pipeline.go
Expand Up @@ -16,9 +16,6 @@ import (

// Pipeline is the representation of data flow through this application.
type Pipeline struct {
inCh chan events.Event // inCh merges all input channels into a singular channel.
outCh chan events.Event // outCh replicates events to all output channels.

sources []source.S
relays []relay.R
distributors []distributor.D
Expand All @@ -27,8 +24,6 @@ type Pipeline struct {
// BuildPipeline will construct the pipeline at the core of delta.
func BuildPipeline(c configuration.Container) (Pipeline, error) {
p := Pipeline{
inCh: make(chan events.Event),
outCh: make(chan events.Event),
sources: make([]source.S, 0),
distributors: make([]distributor.D, 0),
}
Expand Down Expand Up @@ -70,14 +65,16 @@ func BuildPipeline(c configuration.Container) (Pipeline, error) {
sourceChannels = append(sourceChannels, thisSourceChan)
}

// TODO: Use prometheus middleware here instead...
inCh := Inject(make(chan events.Event), NoopEventMiddleware("inbound"))
go func() {
fanIn(context.TODO(), sourceChannels, p.inCh)
fanIn(context.TODO(), sourceChannels, inCh)
}()

// -- --

var previousSourceOutput *chan events.Event
previousSourceOutput = &p.inCh
previousSourceOutput = &inCh
for _, r := range p.relays {
thisRelayOutputChan := make(chan events.Event)
go r.Do(context.TODO(), *previousSourceOutput, thisRelayOutputChan)
Expand All @@ -95,7 +92,8 @@ func BuildPipeline(c configuration.Container) (Pipeline, error) {
}

go func() {
fanOut(context.TODO(), *previousSourceOutput, distributorChannels)
// TODO: Use prometheus middleware here instead...
fanOut(context.TODO(), Inject(*previousSourceOutput, NoopEventMiddleware("outbound")), distributorChannels)
}()

return p, nil
Expand Down

0 comments on commit db1fa3f

Please sign in to comment.