Skip to content

Commit

Permalink
feat: added meta nodes with example chaos.
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Neudegg <andrew.neudegg@finbourne.com>
  • Loading branch information
AndrewNeudegg committed Jan 2, 2021
1 parent 600ff9a commit 354c1ff
Show file tree
Hide file tree
Showing 21 changed files with 284 additions and 110 deletions.
24 changes: 21 additions & 3 deletions cmd/delta/main.go
Expand Up @@ -2,25 +2,42 @@
package main

import (
"fmt"
"os"
"runtime"
"strings"

"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/andrewneudegg/delta/cmd/delta/subcmd/serve"
)

func configureLogger(verbose bool) {
log.SetFormatter(&log.TextFormatter{

packageName := "/delta/"

log.SetReportCaller(true)
log.StandardLogger().SetFormatter(&logrus.TextFormatter{
CallerPrettyfier: func(f *runtime.Frame) (string, string) {
// s := strings.Split(f.Function, ".")
// funcName := s[len(s)-1]
relativeFPath := strings.SplitAfterN(f.File, packageName, 2)[1]
return "", fmt.Sprintf(" %s:%d", relativeFPath, f.Line)
// return funcName, fmt.Sprintf(" %s:%d", relativeFPath, f.Line)
},

DisableColors: false,
FullTimestamp: true,
})

// Output to stdout instead of the default stderr
// Can be any io.Writer, see below for File example
log.SetOutput(os.Stdout)

// Only log the warning severity or above.
if verbose {

log.SetLevel(log.DebugLevel)
} else {
log.SetLevel(log.InfoLevel)
Expand All @@ -45,5 +62,6 @@ For more information and up-to-date documentation take a look at http://github.c

rootCmd.AddCommand(serve.Cmd())

rootCmd.Execute()
err := rootCmd.Execute()
log.Error(err, "application exiting")
}
3 changes: 2 additions & 1 deletion cmd/sink/subcmd/httpsink/sink.go
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/andrewneudegg/delta/pkg/events"
"github.com/andrewneudegg/delta/pkg/source/http"
"github.com/andrewneudegg/delta/pkg/utils"
"github.com/pkg/errors"

"github.com/spf13/cobra"

Expand Down Expand Up @@ -79,7 +80,7 @@ func httpSink(opts *httpSinkOpts) {
go func() {
err := server.Do(context.TODO(), mq)
if err != nil {
log.Error(err)
log.Error(errors.Wrap(err, "failed to do http sink server"))
os.Exit(1)
}
}()
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -11,5 +11,6 @@ require (
github.com/sirupsen/logrus v1.7.0
github.com/spf13/cobra v0.0.7
github.com/stretchr/testify v1.6.1
google.golang.org/protobuf v1.23.0
gopkg.in/yaml.v2 v2.4.0
)
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Expand Up @@ -11,5 +11,5 @@ import (
// and the method that those services should receive the events.
type D interface {
Do(context.Context, <-chan events.Event) error // Do will emit events that are placed into the channel.
ID() string // ID returns a human readable identifier for this distributor.
}

8 changes: 7 additions & 1 deletion pkg/distributor/stdout/distributor.go
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/andrewneudegg/delta/pkg/distributor"
"github.com/andrewneudegg/delta/pkg/events"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

Expand All @@ -15,14 +16,19 @@ type Distributor struct {
distributor.D
}

// ID returns a human readable identifier for this thing.
func (d Distributor) ID() string {
return "distributor/stdout"
}

// Do will write all events to stdout.
func (d Distributor) Do(ctx context.Context, outbound <-chan events.Event) error {
for ctx.Err() == nil {
select {
case e := <-outbound:
jsonBytes, err := json.Marshal(e)
if err != nil {
log.Error(err)
log.Error(errors.Wrap(err, "failed to marshal json for distribution"))
}
log.Info(string(jsonBytes))
e.Complete()
Expand Down
4 changes: 2 additions & 2 deletions pkg/events/event.go
Expand Up @@ -32,8 +32,8 @@ type EventMsg struct {
URI string `json:"uri"`
Content []byte `json:"content"`

FailFunc *func(error)
CompleteFunc *func()
FailFunc *func(error) `json:"-"`
CompleteFunc *func() `json:"-"`
}

// ToJSON will convert an EventMsg to JSON.
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/builder/builder.go
Expand Up @@ -13,7 +13,7 @@ func Get(distributorName string, metaConfiguration interface{}) (meta.M, error)
case "meta/chaos/simple":
m := chaos.Simple{}
err := mapstructure.Decode(metaConfiguration, &m)
return m, err
return &m, err
}

return nil, nil
Expand Down
182 changes: 112 additions & 70 deletions pkg/meta/chaos/meta.go
Expand Up @@ -2,17 +2,35 @@ package chaos

import (
"context"
"fmt"
"math/rand"
"sync"

"github.com/andrewneudegg/delta/pkg/distributor"
"github.com/andrewneudegg/delta/pkg/events"
"github.com/andrewneudegg/delta/pkg/relay"
"github.com/andrewneudegg/delta/pkg/source"
"github.com/andrewneudegg/delta/pkg/utils"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

var (
ID = "meta/chaos/simple"
)

// Simple intercepts and randomly fails events.
type Simple struct {
FailChance float32 // 0.5
FailChance float32 `mapstructure:"failChance"` // 0.5

sources []source.S
relays []relay.R
distributors []distributor.D
}

// ID is a human readable string identifying this thing.
func (m Simple) ID() string {
return ID
}

// ----------- Substructs -----------
Expand All @@ -21,119 +39,143 @@ type s struct {
m *Simple
}

// ID is a human readable string identifying this thing.
func (s s) ID() string {
return ID
}

func (s s) Do(ctx context.Context, ch chan<- events.Event) error {
return s.m.DoS(ctx, ch)
return s.m.doS(ctx, ch)
}

type r struct {
relay.R
m *Simple
}

// ID is a human readable string identifying this thing.
func (r r) ID() string {
return ID
}

func (r r) Do(ctx context.Context, outbound <-chan events.Event, inbound chan<- events.Event) error {
return r.m.DoR(ctx, outbound, inbound)
return r.m.doR(ctx, outbound, inbound)
}

type d struct {
distributor.D
m *Simple
}

// ID is a human readable string identifying this thing.
func (d d) ID() string {
return ID
}

func (d d) Do(ctx context.Context, ch <-chan events.Event) error {
return d.m.DoD(ctx, ch)
return d.m.doD(ctx, ch)
}

// ----------- Substructs -----------

// S source intermediary.
func (m Simple) S([]source.S) (source.S, error) { return s{m: &m}, nil }
func (m *Simple) S(sources []source.S) (source.S, error) {
m.sources = sources
return s{m: m}, nil
}

// R relay intermediary.
func (m Simple) R([]relay.R) (relay.R, error) { return r{m: &m}, nil }
func (m *Simple) R(relays []relay.R) (relay.R, error) {
m.relays = relays
return r{m: m}, nil
}

// D distributor intermediary.
func (m Simple) D([]distributor.D) (distributor.D, error) { return d{m: &m}, nil }
func (m *Simple) D(distributors []distributor.D) (distributor.D, error) {
m.distributors = distributors
return d{m: m}, nil
}

// ----------- /Substructs -----------

func (m Simple) isChance(f float32) bool {
// rand.Float64() == 0.1, 0.5, 0.8
// if f == 0.1 (10% chance) then rand has to be above 0.9.
// if f == 0.90 (90% chance) then rand has to be above 0.1.
return rand.Float32() > (1 - f)
return rand.Float32() > f
}

// DoS will do S with some modification.
func (m Simple) DoS(context.Context, chan<- events.Event) error {
return nil
func (m *Simple) doS(ctx context.Context, ch chan<- events.Event) error {
wg := sync.WaitGroup{}
wg.Add(1)

// nCh := make(chan events.Event)
// go func() {
// for {
// select {
// case e := <-ch:
// if m.isChance(m.FailChance) {
// e.Fail(fmt.Errorf("event was unlucky"))
// continue
// }
// We don't do much here, we just pass on the do.
for _, v := range m.sources {
go v.Do(ctx, ch)
}
wg.Wait()

// // if its lucky then continue...
// nCh <- e
// case _ = <-ctx.Done():
// return
// }
// }
// }()

// return s.Do(ctx, nCh)
return ctx.Err()
}

// DoR will do R with some modification.
func (m Simple) DoR(ctx context.Context, outbound <-chan events.Event, inbound chan<- events.Event) error {
return nil

// nCh := make(chan events.Event)
func (m *Simple) doR(ctx context.Context, outbound <-chan events.Event, inbound chan<- events.Event) error {
wg := sync.WaitGroup{}
wg.Add(1)

// go func() {
// for {
// select {
// case e := <-chOut:
// if m.isChance(m.FailChance) {
// e.Fail(fmt.Errorf("event was unlucky"))
// continue
// }
// We don't do much here, we just pass on the do.
for _, v := range m.relays {
go v.Do(ctx, outbound, inbound)
}

// // if its lucky then continue...
// nCh <- e
// case _ = <-ctx.Done():
// return
// }
// }
// }()
wg.Wait()

// return r.Do(ctx, nCh, chIn)
return ctx.Err()
}

// DoD will do D with some modification.
func (m Simple) DoD(ctx context.Context, ch <-chan events.Event) error {
return nil
func (m *Simple) doD(ctx context.Context, ch <-chan events.Event) error {
c := utils.Channels{}
wg := sync.WaitGroup{}
wg.Add(1)

chs := make([]chan events.Event, len(m.distributors))
intermediateCh := make(chan events.Event)
for i := 0; i < len(m.distributors); i++ {
chs[i] = make(chan events.Event)
go func(i int) {
log.Infof("starting proxy distributor '%s'", m.distributors[i].ID())
err := m.distributors[i].Do(ctx, chs[i])
if err != nil {
log.Error(errors.Wrapf(err, "an error occurred starting, '%s' chaos proxy distributor", m.distributors[i].ID()))
}
log.Warnf("proxy distributor '%s' has exited", m.distributors[i].ID())
}(i)

}

go c.FanOut(ctx, intermediateCh, chs)

go func() {
for ctx.Err() == nil {
select {
case e := <-ch:
if m.isChance(m.FailChance) {
// log.Debugf("event '%s' was lucky!", e.GetMessageID())
intermediateCh <- e
continue
} else {
// log.Debugf("event '%s' was unlucky and will be dropped, event drop probability is '%f'", e.GetMessageID(), m.FailChance)
e.Fail(fmt.Errorf("chaos happened to this event"))
}
case _ = <-ctx.Done():
break
}
}
}()

wg.Wait()

// nCh := make(chan events.Event)

// go func() {
// for {
// select {
// case e := <-ch:
// if m.isChance(m.FailChance) {
// e.Fail(fmt.Errorf("event was unlucky"))
// continue
// }

// // if its lucky then continue...
// nCh <- e
// case _ = <-ctx.Done():
// return
// }
// }
// }()

// return d.Do(ctx, nCh)
return nil
}
1 change: 1 addition & 0 deletions pkg/meta/meta.go
Expand Up @@ -11,4 +11,5 @@ type M interface {
S([]source.S) (source.S, error) // S, source intermediary.
R([]relay.R) (relay.R, error) // R, relay intermediary.
D([]distributor.D) (distributor.D, error) // D, distributor intermediary.
ID() string // ID returns a human readable identifier for this Meta.
}

0 comments on commit 354c1ff

Please sign in to comment.