Skip to content

Commit

Permalink
refactor: make pipeline build clearer.
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Neudegg <andrew.neudegg@finbourne.com>
  • Loading branch information
AndrewNeudegg committed Jan 2, 2021
1 parent 354c1ff commit 9bd3ccd
Show file tree
Hide file tree
Showing 12 changed files with 245 additions and 75 deletions.
36 changes: 20 additions & 16 deletions cmd/delta/main.go
Expand Up @@ -18,28 +18,30 @@ func configureLogger(verbose bool) {

packageName := "/delta/"

log.SetReportCaller(true)
log.StandardLogger().SetFormatter(&logrus.TextFormatter{
CallerPrettyfier: func(f *runtime.Frame) (string, string) {
// s := strings.Split(f.Function, ".")
// funcName := s[len(s)-1]
relativeFPath := strings.SplitAfterN(f.File, packageName, 2)[1]
return "", fmt.Sprintf(" %s:%d", relativeFPath, f.Line)
// return funcName, fmt.Sprintf(" %s:%d", relativeFPath, f.Line)
},

DisableColors: false,
FullTimestamp: true,
})

// Output to stdout instead of the default stderr
// Can be any io.Writer, see below for File example
log.SetOutput(os.Stdout)
// Only log the warning severity or above.
if verbose {
log.SetReportCaller(true)
log.StandardLogger().SetFormatter(&logrus.TextFormatter{
CallerPrettyfier: func(f *runtime.Frame) (string, string) {
// s := strings.Split(f.Function, ".")
// funcName := s[len(s)-1]
relativeFPath := strings.SplitAfterN(f.File, packageName, 2)[1]
return "", fmt.Sprintf(" %s:%d", relativeFPath, f.Line)
// return funcName, fmt.Sprintf(" %s:%d", relativeFPath, f.Line)
},

DisableColors: false,
FullTimestamp: true,
})
log.SetLevel(log.DebugLevel)
} else {
log.StandardLogger().SetFormatter(&logrus.TextFormatter{
DisableColors: false,
FullTimestamp: true,
})
log.SetLevel(log.InfoLevel)
}
}
Expand All @@ -59,9 +61,11 @@ For more information and up-to-date documentation take a look at http://github.c
}

rootCmd.PersistentFlags().BoolVarP(&verboseMode, "verbose", "v", false, "verbose output")

rootCmd.AddCommand(serve.Cmd())

err := rootCmd.Execute()
log.Error(err, "application exiting")
if err != nil {
log.Error(err)
}
log.Warn("application exiting")
}
5 changes: 5 additions & 0 deletions pkg/distributor/http/http_simple.go
Expand Up @@ -18,6 +18,11 @@ type DirectDistributor struct {
Addr string // Addr to send events to (http://localhost:8080).
}

// ID returns a human readable identifier for this thing.
func (d DirectDistributor) ID() string {
return "distributor/http/direct"
}

// Do will make a http post at the given Addr.
func (d DirectDistributor) Do(ctx context.Context, ch <-chan events.Event) error {

Expand Down
4 changes: 3 additions & 1 deletion pkg/distributor/http/http_simple_test.go
Expand Up @@ -24,7 +24,9 @@ func TestNaiveSmoke(t *testing.T) {
go server.Do(context.TODO(), inboundEventsCh)
go func(ch chan events.Event) {
for {
inboundEvents = append(inboundEvents, <-ch)
e := <-ch
e.Complete()
inboundEvents = append(inboundEvents, e)
}
}(inboundEventsCh)

Expand Down
27 changes: 25 additions & 2 deletions pkg/events/event.go
Expand Up @@ -2,6 +2,10 @@ package events

import (
"encoding/json"
"fmt"
"math/rand"

"github.com/google/uuid"
)

// Distributor will queue an event for distribution, taking care of the messy details.
Expand Down Expand Up @@ -70,14 +74,18 @@ func (e EventMsg) GetContent() []byte {
// Complete indicates that the given event has successfully been compelted.
func (e EventMsg) Complete() {
if e.CompleteFunc != nil {
(*e.CompleteFunc)()
cF := *e.CompleteFunc
e.CompleteFunc = nil
cF()
}
}

// Fail indicates that the given event has failed.
func (e EventMsg) Fail(err error) {
if e.FailFunc != nil {
(*e.FailFunc)(err)
eF := *e.FailFunc
e.FailFunc = nil
eF(err)
}
}

Expand All @@ -93,6 +101,21 @@ func FromJSONb(payload []byte) (EventMsg, error) {
return e, err
}

// JunkEvent generates a junk event.
func JunkEvent() EventMsg {
r := rand.Int()
sR := fmt.Sprintf("%d", r)

return EventMsg{
ID: uuid.New().String(),
Headers: map[string][]string{},
URI: fmt.Sprintf("/%s/%s", sR, "junk"),
Content: []byte(sR),
FailFunc: nil,
CompleteFunc: nil,
}
}

// // SetMessageID will set the message id, mostly used for testing.
// func (e *EventMsg) SetMessageID(id string) {
// e.ID = id
Expand Down
132 changes: 76 additions & 56 deletions pkg/pipeline/pipeline.go
Expand Up @@ -2,7 +2,6 @@ package pipeline

import (
"context"
"fmt"
"sync"

"github.com/andrewneudegg/delta/pkg/configuration"
Expand All @@ -29,38 +28,15 @@ func (p Pipeline) Await() {
wg.Wait()
}

// BuildPipeline will construct the pipeline at the core of delta.
func BuildPipeline(c configuration.Container) (Pipeline, error) {
chUtils := utils.Channels{}

p := Pipeline{
sources: make([]source.S, 0),
distributors: make([]distributor.D, 0),
relays: make([]relay.R, 0),
}

if err := p.buildSources(c.SourceConfigs); err != nil {
return Pipeline{}, err
}

if err := p.buildRelays(c.RelayConfigs); err != nil {
return Pipeline{}, err
}

if err := p.buildDistributors(c.DistributorConfigs); err != nil {
return Pipeline{}, err
}

// Now we have constructed each of the nodes we must connect them.
// If we want to insert middleware, i.e. telemetry, this is
// probably the place to do it.
// hookupSources will build a channel for each source so that
// they may be audited at another point in time.
func (p *Pipeline) hookupSources(ctx context.Context) (chan events.Event, error) {
sourceChannels := []chan events.Event{}
for _, s := range p.sources {
if s == nil {
return Pipeline{}, fmt.Errorf("source was unexpectedly nil")
}
outputCh := make(chan events.Event)

for _, s := range p.sources {
thisSourceChan := make(chan events.Event)

go func(s source.S, ch chan events.Event) {
log.Infof("launching source '%s'", s.ID())
err := s.Do(context.TODO(), ch)
Expand All @@ -73,44 +49,38 @@ func BuildPipeline(c configuration.Container) (Pipeline, error) {
}
log.Infof("found '%d' sources", len(p.sources))

// TODO: Use prometheus middleware here instead...
inCh := Inject(make(chan events.Event), NoopEventMiddleware("inbound"))
go func() {
chUtils.FanIn(context.TODO(), sourceChannels, inCh)
}()
c := utils.Channels{}
go c.FanIn(ctx, sourceChannels, outputCh)

// -- --
return outputCh, nil
}

// hookupRelays will daisy chain the relays together
// from first to last, returning the final output chan.
func (p *Pipeline) hookupRelays(ctx context.Context, input chan events.Event) (chan events.Event, error) {
var previousSourceOutput *chan events.Event
previousSourceOutput = &inCh
for _, r := range p.relays {

if r == nil {
return Pipeline{}, fmt.Errorf("relay was unexpectedly nil")
}
previousSourceOutput = &input

for _, r := range p.relays {
thisRelayOutputChan := make(chan events.Event)
go func () {
go func(inCh <-chan events.Event, outCh chan<- events.Event) {
log.Infof("launching relay '%s'", r.ID())
err := r.Do(context.TODO(), *previousSourceOutput, thisRelayOutputChan)
err := r.Do(context.TODO(), inCh, outCh)
if err != nil {
log.Error(errors.Wrap(err, "failed to do relay"))
}
log.Warnf("relay '%s' has exited", r.ID())
}()

}(*previousSourceOutput, thisRelayOutputChan)
previousSourceOutput = &thisRelayOutputChan
}
log.Infof("found '%d' relays", len(p.relays))

return *previousSourceOutput, nil
}

distributorChannels := []chan events.Event{}
func (p *Pipeline) hookupDistributors(ctx context.Context, input chan events.Event) error {
distributorChannels := make([]chan events.Event, len(p.distributors))
for _, d := range p.distributors {

if d == nil {
return Pipeline{}, fmt.Errorf("distributor was unexpectedly nil")
}

distributorInputChannel := make(chan events.Event)
go func(d distributor.D, ch chan events.Event) {
log.Infof("launching distributor '%s'", d.ID())
Expand All @@ -124,10 +94,60 @@ func BuildPipeline(c configuration.Container) (Pipeline, error) {
}
log.Infof("found '%d' distributors", len(p.distributors))

go func() {
// TODO: Use prometheus middleware here instead...
chUtils.FanOut(context.TODO(), Inject(*previousSourceOutput, NoopEventMiddleware("outbound")), distributorChannels)
}()
c := utils.Channels{}
go c.FanOut(ctx, input, distributorChannels)

return nil
}

// Hookup will build the channels required for data flow and press go.
func (p *Pipeline) Hookup(ctx context.Context) error {
sourceCh, err := p.hookupSources(ctx)
if err != nil {
return err
}

finalRelayCh, err := p.hookupRelays(ctx, sourceCh)
if err != nil {
return err
}

return p.hookupDistributors(ctx, finalRelayCh)
}

// BuildPipeline will construct the pipeline at the core of delta.
func BuildPipeline(c configuration.Container) (Pipeline, error) {
p := Pipeline{
sources: make([]source.S, 0),
distributors: make([]distributor.D, 0),
relays: make([]relay.R, 0),
}

if err := p.buildSources(c.SourceConfigs); err != nil {
return Pipeline{}, errors.Wrap(err, "could not build sources")
}

if err := p.buildRelays(c.RelayConfigs); err != nil {
return Pipeline{}, errors.Wrap(err, "could not build relays")
}

if err := p.buildDistributors(c.DistributorConfigs); err != nil {
return Pipeline{}, errors.Wrap(err, "could not build distributors")
}

if err := p.Hookup(context.TODO()); err != nil {
return Pipeline{}, errors.Wrap(err, "could not hookup sources, relays & distributors")
}

// Now we have constructed each of the nodes we must connect them.
// If we want to insert middleware, i.e. telemetry, this is
// probably the place to do it.

// TODO: Use prometheus middleware here instead...
// inCh := Inject(make(chan events.Event), NoopEventMiddleware("inbound"))
// go func() {
// chUtils.FanIn(context.TODO(), sourceChannels, inCh)
// }()

return p, nil
}
3 changes: 3 additions & 0 deletions pkg/relay/memory/relay.go
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/andrewneudegg/delta/pkg/events"
"github.com/andrewneudegg/delta/pkg/relay"
log "github.com/sirupsen/logrus"
)

// Relay is an in memory, reference implementation of the relay.
Expand All @@ -19,10 +20,12 @@ func (r Relay) ID() string {

// Do will pass messages through an intermediary that may perform operations on the data.
func (r Relay) Do(ctx context.Context, outbound <-chan events.Event, inbound chan<- events.Event) error {
log.Info("starting in-memory relay")
// Pass all messages from the outbound queue to the inbound queue.
for ctx.Err() == nil {
select {
case e := <-outbound:
log.Debugf("in-memory relay received '%s'", e.GetMessageID())
inbound <- e
case _ = <-ctx.Done():
break
Expand Down
5 changes: 5 additions & 0 deletions pkg/source/builder/builder.go
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/andrewneudegg/delta/pkg/source"
"github.com/andrewneudegg/delta/pkg/source/http"
"github.com/andrewneudegg/delta/pkg/source/simulator"
)

// Get will return the given source with its data values initialised.
Expand All @@ -14,6 +15,10 @@ func Get(sourceName string, sourceConfiguration interface{}) (source.S, error) {
source := http.SimpleHTTPSink{}
err := mapstructure.Decode(sourceConfiguration, &source)
return source, err
case "source/simulator/simple":
source := simulator.Source{}
err := mapstructure.Decode(sourceConfiguration, &source)
return source, err
}

return nil, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/source/http/simple_http.go
Expand Up @@ -146,5 +146,6 @@ func (s SimpleHTTPSink) Do(ctx context.Context, ch chan<- events.Event) error {
}()

// do the serving.
log.Infof("starting sink server at '%s'", s.ListenAddr)
return s.server.ListenAndServe()
}

0 comments on commit 9bd3ccd

Please sign in to comment.