Skip to content

Commit

Permalink
feat: groundwork for comleting events and routing hierarchy.
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Neudegg <andrew.neudegg@finbourne.com>
  • Loading branch information
AndrewNeudegg committed Jan 1, 2021
1 parent c87595a commit 271f3f1
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 5 deletions.
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -3,8 +3,8 @@ module github.com/andrewneudegg/delta
go 1.15

require (
github.com/mitchellh/mapstructure v1.4.0
github.com/google/uuid v1.1.3
github.com/mitchellh/mapstructure v1.4.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.9.0
github.com/prometheus/common v0.15.0
Expand Down
5 changes: 3 additions & 2 deletions pkg/configuration/structs.go
Expand Up @@ -21,6 +21,7 @@ type AppSettings struct{}

// NodeConfig defines the configuration for any given pluggable structure.
type NodeConfig struct {
Name string `mapstructure:"name"`
Config map[string]interface{} `mapstructure:"sourceConfig"`
Name string `yaml:"name"`
Config map[string]interface{} `yaml:"config"`
NodeConfigs []*NodeConfig `yaml:"nodeConfigs"`
}
9 changes: 8 additions & 1 deletion pkg/distributor/http/http_simple.go
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/andrewneudegg/delta/pkg/distributor"
"github.com/andrewneudegg/delta/pkg/events"
"github.com/pkg/errors"
)

// DirectDistributor will pelt events at a single predefined address.
Expand All @@ -28,6 +29,7 @@ func (d DirectDistributor) Do(ctx context.Context, ch <-chan events.Event) error
bytes.NewBuffer(e.GetContent()))

if err != nil {
e.Fail(errors.Wrap(err, "http request build failed"))
return err
}

Expand All @@ -38,7 +40,12 @@ func (d DirectDistributor) Do(ctx context.Context, ch <-chan events.Event) error
Timeout: time.Second * 15,
}
_, err = client.Do(req)
return err
if err != nil {
e.Fail(errors.Wrap(err, "http request send failed"))
}

e.Complete()
return nil
}

// backoffRetry will help when things get bumpy...
Expand Down
1 change: 1 addition & 0 deletions pkg/distributor/stdout/distributor.go
Expand Up @@ -25,6 +25,7 @@ func (d Distributor) Do(ctx context.Context, outbound <-chan events.Event) error
log.Error(err)
}
log.Info(string(jsonBytes))
e.Complete()
case _ = <-ctx.Done():
break
}
Expand Down
1 change: 1 addition & 0 deletions pkg/e2e/e2e_test.go
Expand Up @@ -61,6 +61,7 @@ distributorConfigurations:
}
configContainer, err := c.Load()
_, err = pipeline.BuildPipeline(configContainer)
time.Sleep(time.Second)
assert.Nil(t, err)

// ------
Expand Down
20 changes: 20 additions & 0 deletions pkg/events/event.go
Expand Up @@ -16,6 +16,9 @@ type Event interface {
GetURI() string // GetURI returns a HTTP like route.
GetContent() []byte // GetContent returns the []byte content of the message.

Fail(error)
Complete()

// SetMessageID(string) // SetMessageID will set the message id, mostly used for testing.
// SetHeaders(map[string][]string) // SetHeaders will set the headers, mostly used for testing.
// SetURI(string) // SetURI will set the URI, mostly used for testing.
Expand All @@ -28,6 +31,9 @@ type EventMsg struct {
Headers map[string][]string `json:"headers"`
URI string `json:"uri"`
Content []byte `json:"content"`

FailFunc *func(error)
CompleteFunc *func()
}

// ToJSON will convert an EventMsg to JSON.
Expand Down Expand Up @@ -61,6 +67,20 @@ func (e EventMsg) GetContent() []byte {
return e.Content
}

// Complete indicates that the given event has successfully been compelted.
func (e EventMsg) Complete() {
if e.CompleteFunc != nil {
(*e.CompleteFunc)()
}
}

// Fail indicates that the given event has failed.
func (e EventMsg) Fail(err error) {
if e.FailFunc != nil {
(*e.FailFunc)(err)
}
}

// FromJSON will load an EventMsg from JSON string.
func FromJSON(payload string) (EventMsg, error) {
return FromJSONb([]byte(payload))
Expand Down
3 changes: 3 additions & 0 deletions pkg/meta/README.md
@@ -0,0 +1,3 @@
# Meta

Meta is for when you need to modify the behaviour of a source, relay or distributor.
29 changes: 29 additions & 0 deletions pkg/meta/builder/builder.go
@@ -0,0 +1,29 @@
package builder

import (
"github.com/andrewneudegg/delta/pkg/meta"
"github.com/andrewneudegg/delta/pkg/meta/chaos"
"github.com/andrewneudegg/delta/pkg/meta/example"

"github.com/mitchellh/mapstructure"
)

// Get will return the given source with its data values initialised.
func Get(distributorName string, metaConfiguration interface{}) (meta.M, error) {
switch distributorName {
case "meta/example":
m := example.Example{}
err := mapstructure.Decode(metaConfiguration, &m)
return m, err
case "meta/example2":
m := example.Example{}
err := mapstructure.Decode(metaConfiguration, &m)
return m, err
case "meta/chaos/simple":
m := chaos.ChaosSimple{}
err := mapstructure.Decode(metaConfiguration, &m)
return m, err
}

return nil, nil
}
95 changes: 95 additions & 0 deletions pkg/meta/chaos/meta.go
@@ -0,0 +1,95 @@
package chaos

import (
"context"
"fmt"
"math/rand"

"github.com/andrewneudegg/delta/pkg/distributor"
"github.com/andrewneudegg/delta/pkg/events"
"github.com/andrewneudegg/delta/pkg/relay"
"github.com/andrewneudegg/delta/pkg/source"
)

// ChaosSimple intercepts and randomly fails events.
type ChaosSimple struct {
FailChance float32 // 0.5
}

func (m ChaosSimple) isChance(f float32) bool {
// rand.Float64() == 0.1, 0.5, 0.8
// if f == 0.1 (10% chance) then rand has to be above 0.9.
// if f == 0.90 (90% chance) then rand has to be above 0.1.
return rand.Float32() > (1 - f)
}

// DoS will do S with some modification.
func (m ChaosSimple) DoS(ctx context.Context, ch chan events.Event, s source.S) error {
nCh := make(chan events.Event)
go func() {
for {
select {
case e := <-ch:
if m.isChance(m.FailChance) {
e.Fail(fmt.Errorf("event was unlucky"))
continue
}

// if its lucky then continue...
nCh <- e
case _ = <-ctx.Done():
return
}
}
}()

return s.Do(ctx, nCh)
}

// DoR will do R with some modification.
func (m ChaosSimple) DoR(ctx context.Context, chOut chan events.Event, chIn chan events.Event, r relay.R) error {
nCh := make(chan events.Event)

go func() {
for {
select {
case e := <-chOut:
if m.isChance(m.FailChance) {
e.Fail(fmt.Errorf("event was unlucky"))
continue
}

// if its lucky then continue...
nCh <- e
case _ = <-ctx.Done():
return
}
}
}()

return r.Do(ctx, nCh, chIn)
}

// DoD will do D with some modification.
func (m ChaosSimple) DoD(ctx context.Context, ch chan events.Event, d distributor.D) error {
nCh := make(chan events.Event)

go func() {
for {
select {
case e := <-ch:
if m.isChance(m.FailChance) {
e.Fail(fmt.Errorf("event was unlucky"))
continue
}

// if its lucky then continue...
nCh <- e
case _ = <-ctx.Done():
return
}
}
}()

return d.Do(ctx, nCh)
}
31 changes: 31 additions & 0 deletions pkg/meta/example/meta.go
@@ -0,0 +1,31 @@
package example

import (
"context"

"github.com/andrewneudegg/delta/pkg/distributor"
"github.com/andrewneudegg/delta/pkg/events"
"github.com/andrewneudegg/delta/pkg/meta"
"github.com/andrewneudegg/delta/pkg/relay"
"github.com/andrewneudegg/delta/pkg/source"
)

// Example doesn't really do anything here.
type Example struct {
meta.M
}

// DoS will do S with some modification.
func (m Example) DoS(ctx context.Context, ch chan events.Event, s source.S) error {
return s.Do(ctx, ch)
}

// DoR will do R with some modification.
func (m Example) DoR(ctx context.Context, chOut chan events.Event, chIn chan events.Event, r relay.R) error {
return r.Do(ctx, chOut, chIn)
}

// DoD will do D with some modification.
func (m Example) DoD(ctx context.Context, ch chan events.Event, d distributor.D) error {
return d.Do(ctx, ch)
}
17 changes: 17 additions & 0 deletions pkg/meta/meta.go
@@ -0,0 +1,17 @@
package meta

import (
"context"

"github.com/andrewneudegg/delta/pkg/distributor"
"github.com/andrewneudegg/delta/pkg/events"
"github.com/andrewneudegg/delta/pkg/relay"
"github.com/andrewneudegg/delta/pkg/source"
)

// M defines a block that can augment other functions.
type M interface {
DoS(context.Context, chan events.Event, source.S) error
DoR(context.Context, chan events.Event, chan events.Event, relay.R) error
DoD(context.Context, chan events.Event, distributor.D) error
}
28 changes: 28 additions & 0 deletions pkg/pipeline/pipeline_test.go
Expand Up @@ -63,3 +63,31 @@ distributorConfigurations:
_, err = BuildPipeline(configContainer)
assert.Nil(t, err)
}

// func TestMetaPipelineYaml(t *testing.T) {
// config := []byte(`
// applicationSettings: {}
// sourceConfigurations:
// - name: http/simple
// config:
// ListenAddr: :8080
// MaxBodySize: 512
// relayConfigs:
// - name: meta/chaos/simple
// config:
// - name: memory
// config: {}
// distributorConfigurations:
// - name: http/direct
// config:
// Addr: http://localhost:5080
// `)

// c := configuration.RawConfig{
// ConfigData: config,
// }
// configContainer, err := c.Load()
// assert.Nil(t, err)
// _, err = BuildPipeline(configContainer)
// assert.Nil(t, err)
// }
1 change: 0 additions & 1 deletion recipies/simple_encryption.yaml
Expand Up @@ -15,7 +15,6 @@ relayConfigs:
config: {}

# Decrypt data before distributing it.
# TODO: Changing the password should result in failure!
- name: crypto/symmetric-simple
config:
Mode: decrypt
Expand Down

0 comments on commit 271f3f1

Please sign in to comment.