From 354c1ffec84d0d52e1e1f20298cbd2d452dd160e Mon Sep 17 00:00:00 2001 From: Andrew Neudegg Date: Sat, 2 Jan 2021 01:03:05 +0000 Subject: [PATCH] feat: added meta nodes with example chaos. Signed-off-by: Andrew Neudegg --- cmd/delta/main.go | 24 ++- cmd/sink/subcmd/httpsink/sink.go | 3 +- go.mod | 1 + pkg/distributor/distributor.go | 2 +- pkg/distributor/stdout/distributor.go | 8 +- pkg/events/event.go | 4 +- pkg/meta/builder/builder.go | 2 +- pkg/meta/chaos/meta.go | 182 +++++++++++------- pkg/meta/meta.go | 1 + pkg/pipeline/pipeline.go | 34 +++- pkg/pipeline/relays.go | 7 +- pkg/probes/probes.go | 3 +- pkg/relay/crypto/symetric_simple.go | 5 + pkg/relay/memory/relay.go | 5 + pkg/relay/relay.go | 1 + pkg/relay/tcp/relay.go | 5 + pkg/source/http/simple_http.go | 66 ++++++- pkg/source/source.go | 1 + pkg/{pipeline/util.go => utils/chans.go} | 12 +- .../util_test.go => utils/chans_test.go} | 15 +- recipies/no_relay_simple_stdout_chaos.yaml | 13 ++ 21 files changed, 284 insertions(+), 110 deletions(-) rename pkg/{pipeline/util.go => utils/chans.go} (65%) rename pkg/{pipeline/util_test.go => utils/chans_test.go} (86%) create mode 100644 recipies/no_relay_simple_stdout_chaos.yaml diff --git a/cmd/delta/main.go b/cmd/delta/main.go index 1e8070e..d467401 100644 --- a/cmd/delta/main.go +++ b/cmd/delta/main.go @@ -2,8 +2,12 @@ package main import ( + "fmt" "os" + "runtime" + "strings" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -11,16 +15,29 @@ import ( ) func configureLogger(verbose bool) { - log.SetFormatter(&log.TextFormatter{ + + packageName := "/delta/" + + log.SetReportCaller(true) + log.StandardLogger().SetFormatter(&logrus.TextFormatter{ + CallerPrettyfier: func(f *runtime.Frame) (string, string) { + // s := strings.Split(f.Function, ".") + // funcName := s[len(s)-1] + relativeFPath := strings.SplitAfterN(f.File, packageName, 2)[1] + return "", fmt.Sprintf(" %s:%d", relativeFPath, f.Line) + // return funcName, fmt.Sprintf(" %s:%d", relativeFPath, f.Line) + }, + DisableColors: false, FullTimestamp: true, }) + // Output to stdout instead of the default stderr // Can be any io.Writer, see below for File example log.SetOutput(os.Stdout) - // Only log the warning severity or above. if verbose { + log.SetLevel(log.DebugLevel) } else { log.SetLevel(log.InfoLevel) @@ -45,5 +62,6 @@ For more information and up-to-date documentation take a look at http://github.c rootCmd.AddCommand(serve.Cmd()) - rootCmd.Execute() + err := rootCmd.Execute() + log.Error(err, "application exiting") } diff --git a/cmd/sink/subcmd/httpsink/sink.go b/cmd/sink/subcmd/httpsink/sink.go index 5a19a2b..6a8daa6 100644 --- a/cmd/sink/subcmd/httpsink/sink.go +++ b/cmd/sink/subcmd/httpsink/sink.go @@ -7,6 +7,7 @@ import ( "github.com/andrewneudegg/delta/pkg/events" "github.com/andrewneudegg/delta/pkg/source/http" "github.com/andrewneudegg/delta/pkg/utils" + "github.com/pkg/errors" "github.com/spf13/cobra" @@ -79,7 +80,7 @@ func httpSink(opts *httpSinkOpts) { go func() { err := server.Do(context.TODO(), mq) if err != nil { - log.Error(err) + log.Error(errors.Wrap(err, "failed to do http sink server")) os.Exit(1) } }() diff --git a/go.mod b/go.mod index 33b2742..08496df 100644 --- a/go.mod +++ b/go.mod @@ -11,5 +11,6 @@ require ( github.com/sirupsen/logrus v1.7.0 github.com/spf13/cobra v0.0.7 github.com/stretchr/testify v1.6.1 + google.golang.org/protobuf v1.23.0 gopkg.in/yaml.v2 v2.4.0 ) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 810a204..b582fae 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -11,5 +11,5 @@ import ( // and the method that those services should receive the events. type D interface { Do(context.Context, <-chan events.Event) error // Do will emit events that are placed into the channel. + ID() string // ID returns a human readable identifier for this distributor. } - diff --git a/pkg/distributor/stdout/distributor.go b/pkg/distributor/stdout/distributor.go index 35ea07b..f65f3cf 100644 --- a/pkg/distributor/stdout/distributor.go +++ b/pkg/distributor/stdout/distributor.go @@ -7,6 +7,7 @@ import ( "github.com/andrewneudegg/delta/pkg/distributor" "github.com/andrewneudegg/delta/pkg/events" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) @@ -15,6 +16,11 @@ type Distributor struct { distributor.D } +// ID returns a human readable identifier for this thing. +func (d Distributor) ID() string { + return "distributor/stdout" +} + // Do will write all events to stdout. func (d Distributor) Do(ctx context.Context, outbound <-chan events.Event) error { for ctx.Err() == nil { @@ -22,7 +28,7 @@ func (d Distributor) Do(ctx context.Context, outbound <-chan events.Event) error case e := <-outbound: jsonBytes, err := json.Marshal(e) if err != nil { - log.Error(err) + log.Error(errors.Wrap(err, "failed to marshal json for distribution")) } log.Info(string(jsonBytes)) e.Complete() diff --git a/pkg/events/event.go b/pkg/events/event.go index 8164024..8783a4e 100644 --- a/pkg/events/event.go +++ b/pkg/events/event.go @@ -32,8 +32,8 @@ type EventMsg struct { URI string `json:"uri"` Content []byte `json:"content"` - FailFunc *func(error) - CompleteFunc *func() + FailFunc *func(error) `json:"-"` + CompleteFunc *func() `json:"-"` } // ToJSON will convert an EventMsg to JSON. diff --git a/pkg/meta/builder/builder.go b/pkg/meta/builder/builder.go index eef927a..911bd3a 100644 --- a/pkg/meta/builder/builder.go +++ b/pkg/meta/builder/builder.go @@ -13,7 +13,7 @@ func Get(distributorName string, metaConfiguration interface{}) (meta.M, error) case "meta/chaos/simple": m := chaos.Simple{} err := mapstructure.Decode(metaConfiguration, &m) - return m, err + return &m, err } return nil, nil diff --git a/pkg/meta/chaos/meta.go b/pkg/meta/chaos/meta.go index e86104e..2b8f132 100644 --- a/pkg/meta/chaos/meta.go +++ b/pkg/meta/chaos/meta.go @@ -2,17 +2,35 @@ package chaos import ( "context" + "fmt" "math/rand" + "sync" "github.com/andrewneudegg/delta/pkg/distributor" "github.com/andrewneudegg/delta/pkg/events" "github.com/andrewneudegg/delta/pkg/relay" "github.com/andrewneudegg/delta/pkg/source" + "github.com/andrewneudegg/delta/pkg/utils" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +var ( + ID = "meta/chaos/simple" ) // Simple intercepts and randomly fails events. type Simple struct { - FailChance float32 // 0.5 + FailChance float32 `mapstructure:"failChance"` // 0.5 + + sources []source.S + relays []relay.R + distributors []distributor.D +} + +// ID is a human readable string identifying this thing. +func (m Simple) ID() string { + return ID } // ----------- Substructs ----------- @@ -21,8 +39,13 @@ type s struct { m *Simple } +// ID is a human readable string identifying this thing. +func (s s) ID() string { + return ID +} + func (s s) Do(ctx context.Context, ch chan<- events.Event) error { - return s.m.DoS(ctx, ch) + return s.m.doS(ctx, ch) } type r struct { @@ -30,8 +53,13 @@ type r struct { m *Simple } +// ID is a human readable string identifying this thing. +func (r r) ID() string { + return ID +} + func (r r) Do(ctx context.Context, outbound <-chan events.Event, inbound chan<- events.Event) error { - return r.m.DoR(ctx, outbound, inbound) + return r.m.doR(ctx, outbound, inbound) } type d struct { @@ -39,101 +67,115 @@ type d struct { m *Simple } +// ID is a human readable string identifying this thing. +func (d d) ID() string { + return ID +} + func (d d) Do(ctx context.Context, ch <-chan events.Event) error { - return d.m.DoD(ctx, ch) + return d.m.doD(ctx, ch) } // ----------- Substructs ----------- // S source intermediary. -func (m Simple) S([]source.S) (source.S, error) { return s{m: &m}, nil } +func (m *Simple) S(sources []source.S) (source.S, error) { + m.sources = sources + return s{m: m}, nil +} // R relay intermediary. -func (m Simple) R([]relay.R) (relay.R, error) { return r{m: &m}, nil } +func (m *Simple) R(relays []relay.R) (relay.R, error) { + m.relays = relays + return r{m: m}, nil +} // D distributor intermediary. -func (m Simple) D([]distributor.D) (distributor.D, error) { return d{m: &m}, nil } +func (m *Simple) D(distributors []distributor.D) (distributor.D, error) { + m.distributors = distributors + return d{m: m}, nil +} + +// ----------- /Substructs ----------- func (m Simple) isChance(f float32) bool { // rand.Float64() == 0.1, 0.5, 0.8 // if f == 0.1 (10% chance) then rand has to be above 0.9. // if f == 0.90 (90% chance) then rand has to be above 0.1. - return rand.Float32() > (1 - f) + return rand.Float32() > f } // DoS will do S with some modification. -func (m Simple) DoS(context.Context, chan<- events.Event) error { - return nil +func (m *Simple) doS(ctx context.Context, ch chan<- events.Event) error { + wg := sync.WaitGroup{} + wg.Add(1) - // nCh := make(chan events.Event) - // go func() { - // for { - // select { - // case e := <-ch: - // if m.isChance(m.FailChance) { - // e.Fail(fmt.Errorf("event was unlucky")) - // continue - // } + // We don't do much here, we just pass on the do. + for _, v := range m.sources { + go v.Do(ctx, ch) + } + wg.Wait() - // // if its lucky then continue... - // nCh <- e - // case _ = <-ctx.Done(): - // return - // } - // } - // }() - - // return s.Do(ctx, nCh) + return ctx.Err() } // DoR will do R with some modification. -func (m Simple) DoR(ctx context.Context, outbound <-chan events.Event, inbound chan<- events.Event) error { - return nil - - // nCh := make(chan events.Event) +func (m *Simple) doR(ctx context.Context, outbound <-chan events.Event, inbound chan<- events.Event) error { + wg := sync.WaitGroup{} + wg.Add(1) - // go func() { - // for { - // select { - // case e := <-chOut: - // if m.isChance(m.FailChance) { - // e.Fail(fmt.Errorf("event was unlucky")) - // continue - // } + // We don't do much here, we just pass on the do. + for _, v := range m.relays { + go v.Do(ctx, outbound, inbound) + } - // // if its lucky then continue... - // nCh <- e - // case _ = <-ctx.Done(): - // return - // } - // } - // }() + wg.Wait() - // return r.Do(ctx, nCh, chIn) + return ctx.Err() } // DoD will do D with some modification. -func (m Simple) DoD(ctx context.Context, ch <-chan events.Event) error { - return nil +func (m *Simple) doD(ctx context.Context, ch <-chan events.Event) error { + c := utils.Channels{} + wg := sync.WaitGroup{} + wg.Add(1) + + chs := make([]chan events.Event, len(m.distributors)) + intermediateCh := make(chan events.Event) + for i := 0; i < len(m.distributors); i++ { + chs[i] = make(chan events.Event) + go func(i int) { + log.Infof("starting proxy distributor '%s'", m.distributors[i].ID()) + err := m.distributors[i].Do(ctx, chs[i]) + if err != nil { + log.Error(errors.Wrapf(err, "an error occurred starting, '%s' chaos proxy distributor", m.distributors[i].ID())) + } + log.Warnf("proxy distributor '%s' has exited", m.distributors[i].ID()) + }(i) + + } + + go c.FanOut(ctx, intermediateCh, chs) + + go func() { + for ctx.Err() == nil { + select { + case e := <-ch: + if m.isChance(m.FailChance) { + // log.Debugf("event '%s' was lucky!", e.GetMessageID()) + intermediateCh <- e + continue + } else { + // log.Debugf("event '%s' was unlucky and will be dropped, event drop probability is '%f'", e.GetMessageID(), m.FailChance) + e.Fail(fmt.Errorf("chaos happened to this event")) + } + case _ = <-ctx.Done(): + break + } + } + }() + + wg.Wait() - // nCh := make(chan events.Event) - - // go func() { - // for { - // select { - // case e := <-ch: - // if m.isChance(m.FailChance) { - // e.Fail(fmt.Errorf("event was unlucky")) - // continue - // } - - // // if its lucky then continue... - // nCh <- e - // case _ = <-ctx.Done(): - // return - // } - // } - // }() - - // return d.Do(ctx, nCh) + return nil } diff --git a/pkg/meta/meta.go b/pkg/meta/meta.go index 633c8c9..e90408b 100644 --- a/pkg/meta/meta.go +++ b/pkg/meta/meta.go @@ -11,4 +11,5 @@ type M interface { S([]source.S) (source.S, error) // S, source intermediary. R([]relay.R) (relay.R, error) // R, relay intermediary. D([]distributor.D) (distributor.D, error) // D, distributor intermediary. + ID() string // ID returns a human readable identifier for this Meta. } diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index f7281cd..5d059fa 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -10,6 +10,8 @@ import ( "github.com/andrewneudegg/delta/pkg/events" "github.com/andrewneudegg/delta/pkg/relay" "github.com/andrewneudegg/delta/pkg/source" + "github.com/andrewneudegg/delta/pkg/utils" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) @@ -29,6 +31,8 @@ func (p Pipeline) Await() { // BuildPipeline will construct the pipeline at the core of delta. func BuildPipeline(c configuration.Container) (Pipeline, error) { + chUtils := utils.Channels{} + p := Pipeline{ sources: make([]source.S, 0), distributors: make([]distributor.D, 0), @@ -58,16 +62,21 @@ func BuildPipeline(c configuration.Container) (Pipeline, error) { thisSourceChan := make(chan events.Event) go func(s source.S, ch chan events.Event) { + log.Infof("launching source '%s'", s.ID()) err := s.Do(context.TODO(), ch) - log.Error(err) + if err != nil { + log.Error(errors.Wrap(err, "failed to do source")) + } + log.Warnf("source '%s' has exited", s.ID()) }(s, thisSourceChan) sourceChannels = append(sourceChannels, thisSourceChan) } + log.Infof("found '%d' sources", len(p.sources)) // TODO: Use prometheus middleware here instead... inCh := Inject(make(chan events.Event), NoopEventMiddleware("inbound")) go func() { - fanIn(context.TODO(), sourceChannels, inCh) + chUtils.FanIn(context.TODO(), sourceChannels, inCh) }() // -- -- @@ -81,9 +90,19 @@ func BuildPipeline(c configuration.Container) (Pipeline, error) { } thisRelayOutputChan := make(chan events.Event) - go r.Do(context.TODO(), *previousSourceOutput, thisRelayOutputChan) + go func () { + log.Infof("launching relay '%s'", r.ID()) + err := r.Do(context.TODO(), *previousSourceOutput, thisRelayOutputChan) + if err != nil { + log.Error(errors.Wrap(err, "failed to do relay")) + } + log.Warnf("relay '%s' has exited", r.ID()) + }() + previousSourceOutput = &thisRelayOutputChan } + log.Infof("found '%d' relays", len(p.relays)) + distributorChannels := []chan events.Event{} for _, d := range p.distributors { @@ -94,15 +113,20 @@ func BuildPipeline(c configuration.Container) (Pipeline, error) { distributorInputChannel := make(chan events.Event) go func(d distributor.D, ch chan events.Event) { + log.Infof("launching distributor '%s'", d.ID()) err := d.Do(context.TODO(), ch) - log.Error(err) + if err != nil { + log.Error(errors.Wrap(err, "failed to do distributor")) + } + log.Warnf("distributor '%s' has exited", d.ID()) }(d, distributorInputChannel) distributorChannels = append(distributorChannels, distributorInputChannel) } + log.Infof("found '%d' distributors", len(p.distributors)) go func() { // TODO: Use prometheus middleware here instead... - fanOut(context.TODO(), Inject(*previousSourceOutput, NoopEventMiddleware("outbound")), distributorChannels) + chUtils.FanOut(context.TODO(), Inject(*previousSourceOutput, NoopEventMiddleware("outbound")), distributorChannels) }() return p, nil diff --git a/pkg/pipeline/relays.go b/pkg/pipeline/relays.go index 2905a28..066f756 100644 --- a/pkg/pipeline/relays.go +++ b/pkg/pipeline/relays.go @@ -58,9 +58,10 @@ func (p *Pipeline) buildRelays(nc []configuration.NodeConfig) error { return err } - if len(resultantrelays) == 0 { - return fmt.Errorf("no relays were found") - } + // This is permitted (S->D without R). + // if len(resultantrelays) == 0 { + // return fmt.Errorf("no relays were found") + // } p.relays = resultantrelays diff --git a/pkg/probes/probes.go b/pkg/probes/probes.go index 130a259..4f9f7af 100644 --- a/pkg/probes/probes.go +++ b/pkg/probes/probes.go @@ -3,6 +3,7 @@ package probes import ( "net/http" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) @@ -59,6 +60,6 @@ func (p *ProbeServer) StartProbeServer() { err := http.ListenAndServe(p.ListenAddr, nil) if err != nil { - log.Error(err) + log.Error(errors.Wrap(err, "failed to listen and serve probes")) } } diff --git a/pkg/relay/crypto/symetric_simple.go b/pkg/relay/crypto/symetric_simple.go index 1220ca2..9846c73 100644 --- a/pkg/relay/crypto/symetric_simple.go +++ b/pkg/relay/crypto/symetric_simple.go @@ -40,6 +40,11 @@ type SimpleSymmetricCryptoRelay struct { direction encryptionDirection // specifies if this is encryption or decryption. } +// ID returns a human readable identifier for this thing. +func (r SimpleSymmetricCryptoRelay) ID() string { + return "relay/SimpleSymmetricCryptoRelay" +} + func (r SimpleSymmetricCryptoRelay) actionMap(m map[string][]string) (map[string][]string, error) { resultantMap := make(map[string][]string) diff --git a/pkg/relay/memory/relay.go b/pkg/relay/memory/relay.go index 36e7b80..1960514 100644 --- a/pkg/relay/memory/relay.go +++ b/pkg/relay/memory/relay.go @@ -12,6 +12,11 @@ type Relay struct { relay.R } +// ID returns a human readable identifier for this thing. +func (r Relay) ID() string { + return "relay/memory" +} + // Do will pass messages through an intermediary that may perform operations on the data. func (r Relay) Do(ctx context.Context, outbound <-chan events.Event, inbound chan<- events.Event) error { // Pass all messages from the outbound queue to the inbound queue. diff --git a/pkg/relay/relay.go b/pkg/relay/relay.go index 091fa80..c70b6c4 100644 --- a/pkg/relay/relay.go +++ b/pkg/relay/relay.go @@ -13,4 +13,5 @@ type R interface { // outbound: will receive messages from the current applications. // inbound: will receive messages from the relay source. Do(ctx context.Context, outbound <-chan events.Event, inbound chan<- events.Event) error + ID() string // ID returns a human readable identifier for this relay. } diff --git a/pkg/relay/tcp/relay.go b/pkg/relay/tcp/relay.go index 639ec6c..eec5fb2 100644 --- a/pkg/relay/tcp/relay.go +++ b/pkg/relay/tcp/relay.go @@ -25,6 +25,11 @@ type Relay struct { Opts Opts } +// ID returns a human readable identifier for this thing. +func (r Relay) ID() string { + return "relay/tcp" +} + // Listen for incoming events. func (r Relay) Listen(ctx context.Context, ch chan<- events.Event) error { handleConnection := func(conn net.Conn) { diff --git a/pkg/source/http/simple_http.go b/pkg/source/http/simple_http.go index fd634b7..d9758de 100644 --- a/pkg/source/http/simple_http.go +++ b/pkg/source/http/simple_http.go @@ -6,17 +6,29 @@ import ( "fmt" "io/ioutil" "net/http" + "sync" "github.com/andrewneudegg/delta/pkg/events" "github.com/andrewneudegg/delta/pkg/source" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/google/uuid" ) // httpSinkServerResponse is the response that the server will send. + +const ( + // SuccessStatus is the string success message. + SuccessStatus = "success" + // FailureStatus is the string failed message. + FailureStatus = "success" +) + type httpSinkServerResponse struct { - ID string `json:"id"` // ID is the response ID for this accepted event. + ID string `json:"id"` // ID is the response ID for this accepted event. + Reason string `json:"reason"` // Reason is why the response happened as it did. + Status string `json:"status"` // Status states what happened to this event. } // SimpleHTTPSink is a http server. @@ -29,6 +41,11 @@ type SimpleHTTPSink struct { server *http.Server } +// ID returns a human readable identifier for this thing. +func (r SimpleHTTPSink) ID() string { + return "source/SimpleHTTPSink" +} + // init this sink. func (s *SimpleHTTPSink) init(ch chan<- events.Event) error { s.inboundCh = ch @@ -48,12 +65,14 @@ func (s *SimpleHTTPSink) ServeHTTP(rw http.ResponseWriter, r *http.Request) { log.Debugf("received '%s' at '%s%s'.", uniqueID, r.Host, r.RequestURI) responseBytes, err := json.Marshal(httpSinkServerResponse{ - ID: uniqueID, + ID: uniqueID, + Reason: "none", + Status: SuccessStatus, }) if err != nil { rw.WriteHeader(http.StatusInternalServerError) - log.Error(err) + log.Error(errors.Wrap(err, "failed to unmarshal http message")) return } @@ -66,19 +85,46 @@ func (s *SimpleHTTPSink) ServeHTTP(rw http.ResponseWriter, r *http.Request) { if err != nil { rw.WriteHeader(http.StatusInternalServerError) - log.Error(err) + log.Error(errors.Wrap(err, "failed to read http body")) return } + wg := sync.WaitGroup{} + wg.Add(1) + + fail := func(err error) { + responseBytes, _ := json.Marshal(httpSinkServerResponse{ + ID: uniqueID, + Reason: err.Error(), + Status: FailureStatus, + }) + + if err != nil { + rw.WriteHeader(http.StatusInternalServerError) + } else { + rw.WriteHeader(http.StatusBadRequest) + } + + rw.Write(responseBytes) + wg.Done() + } + + complete := func() { + rw.WriteHeader(http.StatusAccepted) + rw.Write(responseBytes) + wg.Done() + } + s.inboundCh <- events.EventMsg{ - ID: uniqueID, - Headers: r.Header, - URI: r.RequestURI, - Content: body, + ID: uniqueID, + Headers: r.Header, + URI: r.RequestURI, + Content: body, + FailFunc: &fail, + CompleteFunc: &complete, } - rw.WriteHeader(http.StatusAccepted) - rw.Write(responseBytes) + wg.Wait() return } diff --git a/pkg/source/source.go b/pkg/source/source.go index d4eb5e9..5ac912f 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -9,4 +9,5 @@ import ( // S is a generic source of events. type S interface { Do(context.Context, chan<- events.Event) error // Do will begin the loop for this source. + ID() string // ID returns a human readable identifier for this source. } diff --git a/pkg/pipeline/util.go b/pkg/utils/chans.go similarity index 65% rename from pkg/pipeline/util.go rename to pkg/utils/chans.go index aadb884..9845a68 100644 --- a/pkg/pipeline/util.go +++ b/pkg/utils/chans.go @@ -1,4 +1,4 @@ -package pipeline +package utils import ( "context" @@ -7,8 +7,10 @@ import ( "github.com/andrewneudegg/delta/pkg/events" ) -// fanIn will merge multiple input channels to a singular output channel. -func fanIn(ctx context.Context, chs []chan events.Event, combined chan events.Event) error { +type Channels struct{} + +// FanIn will merge multiple input channels to a singular output channel. +func (c *Channels) FanIn(ctx context.Context, chs []chan events.Event, combined chan events.Event) error { wg := sync.WaitGroup{} merge := func(ch <-chan events.Event) { @@ -32,8 +34,8 @@ func fanIn(ctx context.Context, chs []chan events.Event, combined chan events.Ev return ctx.Err() } -// fanOut will split a single input channel into multiple output channels. -func fanOut(ctx context.Context, ch chan events.Event, outputs []chan events.Event) error { +// FanOut will split a single input channel into multiple output channels. +func (c *Channels) FanOut(ctx context.Context, ch chan events.Event, outputs []chan events.Event) error { write := func(ch chan events.Event, e events.Event) { ch <- e diff --git a/pkg/pipeline/util_test.go b/pkg/utils/chans_test.go similarity index 86% rename from pkg/pipeline/util_test.go rename to pkg/utils/chans_test.go index 1cf15f6..44125bb 100644 --- a/pkg/pipeline/util_test.go +++ b/pkg/utils/chans_test.go @@ -1,4 +1,4 @@ -package pipeline +package utils import ( "context" @@ -11,6 +11,7 @@ import ( ) func TestFanInSmoke(t *testing.T) { + c := Channels{} resultCh := make(chan events.Event) results := []events.Event{} @@ -28,7 +29,7 @@ func TestFanInSmoke(t *testing.T) { fans = append(fans, make(chan events.Event)) } - go fanIn(context.TODO(), fans, resultCh) + go c.FanIn(context.TODO(), fans, resultCh) for i := 0; i < numFans; i++ { count := fmt.Sprintf("%d", i) @@ -36,7 +37,7 @@ func TestFanInSmoke(t *testing.T) { fans[i] <- events.EventMsg{ ID: count, Headers: map[string][]string{ - count: []string{count}, + count: {count}, }, URI: fmt.Sprintf("/%s", count), Content: []byte(count), @@ -49,7 +50,7 @@ func TestFanInSmoke(t *testing.T) { } func TestFanOutSmoke(t *testing.T) { - + c := Channels{} inputCh := make(chan events.Event) // -- Test Setup @@ -70,11 +71,11 @@ func TestFanOutSmoke(t *testing.T) { fans = append(fans, make(chan events.Event)) } - go fanIn(context.TODO(), fans, resultCh) + go c.FanIn(context.TODO(), fans, resultCh) // -- Test -- - go fanOut(context.TODO(), inputCh, fans) + go c.FanOut(context.TODO(), inputCh, fans) for i := 0; i < numFans; i++ { count := fmt.Sprintf("%d", i) @@ -82,7 +83,7 @@ func TestFanOutSmoke(t *testing.T) { inputCh <- events.EventMsg{ ID: count, Headers: map[string][]string{ - count: []string{count}, + count: {count}, }, URI: fmt.Sprintf("/%s", count), Content: []byte(count), diff --git a/recipies/no_relay_simple_stdout_chaos.yaml b/recipies/no_relay_simple_stdout_chaos.yaml new file mode 100644 index 0000000..632db13 --- /dev/null +++ b/recipies/no_relay_simple_stdout_chaos.yaml @@ -0,0 +1,13 @@ +# This recipie passes event data from the sink to standard out. +applicationSettings: {} +sourceConfigurations: + - name: http/simple + config: + ListenAddr: :8080 + MaxBodySize: 512 +distributorConfigurations: + - name: meta/chaos/simple + config: + failChance: 0.5 + subConfigs: + - name: stdout