Skip to content

Commit

Permalink
feat: performance measuring.
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Neudegg <andrew.neudegg@finbourne.com>
  • Loading branch information
AndrewNeudegg committed Jan 9, 2021
1 parent 60c323c commit 0055238
Show file tree
Hide file tree
Showing 16 changed files with 285 additions and 36 deletions.
3 changes: 0 additions & 3 deletions pkg/pipelines/fipfo/fipfo.go
Expand Up @@ -39,12 +39,10 @@ func (f Pipeline) buildInput(n definitions.PipelineNode) (resourceDefinitions.In
}
sub = append(sub, snR)
}

meta, err := resources.BuildMetaResource(n.ID, n.Config, resourceMapping.MetaMapping())
if err != nil {
return nil, errors.Wrapf(err, "failed to build meta resource for '%s'", n.ID)
}

return meta.I(sub)
}

Expand All @@ -66,7 +64,6 @@ func (f Pipeline) inputs(ctx context.Context) (chan events.Collection, error) {

go func(i resourceDefinitions.Input, ch chan events.Collection) {
log.Infof("launching input '%s'", i.ID())

err := i.DoInput(context.TODO(), ch)
if err != nil {
log.Error(errors.Wrap(err, "failed to do input"))
Expand Down
7 changes: 5 additions & 2 deletions pkg/pipelines/fipfo/fipfo_build_test.go
Expand Up @@ -23,7 +23,10 @@ pipeline:
input:
- id: examples/noop
config: {}
nodes: []
nodes:
- id: examples/noop
config: {}
nodes: []
process:
- id: examples/noop
config: {}
Expand Down Expand Up @@ -69,5 +72,5 @@ func TestBuildRun(t *testing.T) {
}
}()

time.Sleep(time.Second * 1)
time.Sleep(time.Second * 5)
}
29 changes: 17 additions & 12 deletions pkg/resources/builder/mapping.go
Expand Up @@ -5,40 +5,45 @@ import (
"github.com/andrewneudegg/delta/pkg/resources/examples/noop"
console1 "github.com/andrewneudegg/delta/pkg/resources/utilities/console/v1"
generators1 "github.com/andrewneudegg/delta/pkg/resources/utilities/generators/v1"
performance1 "github.com/andrewneudegg/delta/pkg/resources/utilities/performance/v1"
)

// InputMapping defines which resources are what.
func InputMapping() map[string]definitions.Input {
return map[string]definitions.Input{
noop.Input{}.ID(): noop.Input{},
generators1.Input{}.ID(): generators1.Input{},
console1.Input{}.ID(): console1.Input{},
noop.Input{}.ID(): noop.Input{},
generators1.Input{}.ID(): generators1.Input{},
console1.Input{}.ID(): console1.Input{},
performance1.Input{}.ID(): performance1.Input{},
}
}

// ProcessMapping defines which resources are what.
func ProcessMapping() map[string]definitions.Process {
return map[string]definitions.Process{
noop.Process{}.ID(): noop.Process{},
generators1.Process{}.ID(): generators1.Process{},
console1.Process{}.ID(): console1.Process{},
noop.Process{}.ID(): noop.Process{},
generators1.Process{}.ID(): generators1.Process{},
console1.Process{}.ID(): console1.Process{},
performance1.Process{}.ID(): performance1.Process{},
}
}

// OutputMapping defines which resources are what.
func OutputMapping() map[string]definitions.Output {
return map[string]definitions.Output{
noop.Output{}.ID(): noop.Output{},
generators1.Output{}.ID(): generators1.Output{},
console1.Output{}.ID(): console1.Output{},
noop.Output{}.ID(): noop.Output{},
generators1.Output{}.ID(): generators1.Output{},
console1.Output{}.ID(): console1.Output{},
performance1.Output{}.ID(): performance1.Output{},
}
}

// MetaMapping defines which resources are what.
func MetaMapping() map[string]definitions.Meta {
return map[string]definitions.Meta{
noop.Meta{}.ID(): noop.Meta{},
generators1.Meta{}.ID(): generators1.Meta{},
console1.Meta{}.ID(): console1.Meta{},
noop.Meta{}.ID(): noop.Meta{},
generators1.Meta{}.ID(): generators1.Meta{},
console1.Meta{}.ID(): console1.Meta{},
performance1.Meta{}.ID(): performance1.Meta{},
}
}
6 changes: 6 additions & 0 deletions pkg/resources/examples/noop/common.go
@@ -0,0 +1,6 @@
package noop

const (
// ID for this collection of resources.
ID = "examples/noop"
)
2 changes: 1 addition & 1 deletion pkg/resources/examples/noop/input.go
Expand Up @@ -13,7 +13,7 @@ type Input struct {

// ID defines what this thing is.
func (i Input) ID() string {
return "examples/noop"
return ID
}

// Type defines what type of resource this is.
Expand Down
2 changes: 1 addition & 1 deletion pkg/resources/examples/noop/meta.go
Expand Up @@ -10,7 +10,7 @@ type Meta struct {

// ID defines what this thing is.
func (m Meta) ID() string {
return "examples/noop"
return ID
}

// Type defines what type of resource this is.
Expand Down
2 changes: 1 addition & 1 deletion pkg/resources/examples/noop/output.go
Expand Up @@ -13,7 +13,7 @@ type Output struct {

// ID defines what this thing is.
func (o Output) ID() string {
return "examples/noop"
return ID
}

// Type defines what type of resource this is.
Expand Down
2 changes: 1 addition & 1 deletion pkg/resources/examples/noop/process.go
Expand Up @@ -13,7 +13,7 @@ type Process struct {

// ID defines what this thing is.
func (p Process) ID() string {
return "examples/noop"
return ID
}

// Type defines what type of resource this is.
Expand Down
2 changes: 1 addition & 1 deletion pkg/resources/utilities/console/v1/output.go
Expand Up @@ -29,7 +29,7 @@ func (o Output) DoOutput(ctx context.Context, ch <-chan events.Collection) error
for {
select {
case eCol := <-ch:
log.Infof("received '%d' events", len(eCol))
log.Debugf("received '%d' events", len(eCol))
case <-ctx.Done():
return nil
}
Expand Down
22 changes: 8 additions & 14 deletions pkg/resources/utilities/generators/v1/input.go
Expand Up @@ -9,7 +9,7 @@ import (
log "github.com/sirupsen/logrus"
)

// Input is simple noop.
// Input is simple event generator.
type Input struct {
Interval string `mapstructure:"interval"` // Interval specifies the delays between even
NumberEvents int `mapstructure:"numberEvents"` // NumberEvents is how many events will be in each `events.Collection`.
Expand All @@ -28,17 +28,11 @@ func (i Input) Type() definitions.ResourceType {

// DoInput will accept collections of events, passing them into the channel.
func (i Input) DoInput(ctx context.Context, ch chan<- events.Collection) error {
go func() {
err := RunGenerator(ctx, Configuration{
Interval: i.Interval,
NumberEvents: i.NumberEvents,
NumberCollections: i.NumberCollections,
}, ch)
if err != nil {
log.Error(err)
}
}()

<-ctx.Done()
return nil
log.Infof("starting DoInput for '%s'", i.ID())

return RunGenerator(ctx, Configuration{
Interval: i.Interval,
NumberEvents: i.NumberEvents,
NumberCollections: i.NumberCollections,
}, ch)
}
6 changes: 6 additions & 0 deletions pkg/resources/utilities/performance/v1/common.go
@@ -0,0 +1,6 @@
package performance1

const (
// ID for this collection of resources.
ID = "utilities/performance/v1"
)
65 changes: 65 additions & 0 deletions pkg/resources/utilities/performance/v1/input.go
@@ -0,0 +1,65 @@
package performance1

import (
"context"
"time"

"github.com/andrewneudegg/delta/pkg/events"
"github.com/andrewneudegg/delta/pkg/resources/definitions"
"github.com/pkg/errors"

log "github.com/sirupsen/logrus"
)

// Input is simple wrapper around another input resource.
type Input struct {
i definitions.Input
sampleWindow time.Duration
}

// ID defines what this thing is.
func (i Input) ID() string {
return ID
}

// Type defines what type of resource this is.
func (i Input) Type() definitions.ResourceType {
return definitions.InputType
}

// DoInput will accept collections of events, passing them into the channel.
func (i Input) DoInput(ctx context.Context, ch chan<- events.Collection) error {
log.Infof("starting DoInput proxy for '%s'", i.i.ID())
if i.i == nil {
return errors.Errorf("'%s' cannot be used as an input resource directly", ID)
}

proxyCh := make(chan events.Collection)
count := 0
lastTime := time.Now()

go func(chIn chan events.Collection, chOut chan<- events.Collection) {
for {
select {
case eCol := <-chIn:
count++
chOut <- eCol
}
}
}(proxyCh, ch)

go func() {
for {
time.Sleep(i.sampleWindow)

tDiff := time.Now().Sub(lastTime)
metricFrame := float64(count) / tDiff.Seconds()
log.Warnf("'%s' at '%f' tx/s (%d transactions / %f seconds)", i.i.ID(), metricFrame, count, tDiff.Seconds())

lastTime = time.Now()
count = 0
}
}()

return i.i.DoInput(ctx, proxyCh)
}
84 changes: 84 additions & 0 deletions pkg/resources/utilities/performance/v1/meta.go
@@ -0,0 +1,84 @@
package performance1

import (
"fmt"
"time"

"github.com/andrewneudegg/delta/pkg/resources/definitions"
"github.com/pkg/errors"

log "github.com/sirupsen/logrus"
)

// Meta is simple noop.
type Meta struct {
SampleWindow string `mapstructure:"sampleWindow"` // SampleWindow is the time that metrics are collected before reporting (i.e. 60s)
}

// ID defines what this thing is.
func (m Meta) ID() string {
return ID
}

// Type defines what type of resource this is.
func (m Meta) Type() definitions.ResourceType {
return definitions.MetaType
}

// I accepts inputs returning an input representing the given collection.
func (m Meta) I(i []definitions.Input) (definitions.Input, error) {
if len(i) != 1 {
return nil, fmt.Errorf("'%s' can only accept one input, got '%d'", ID, len(i))
}

dur, err := time.ParseDuration(m.SampleWindow)
if err != nil {
return nil, errors.Wrapf(err, "could not parse '%s' as duration", m.SampleWindow)
}

log.Debugf("returning meta input for '%s'", i[0].ID())

return Input{
i: i[0],
sampleWindow: dur,
}, nil
}

// P accepts process returning an process representing the given collection.
func (m Meta) P(p []definitions.Process) (definitions.Process, error) {
if len(p) != 1 {
return nil, fmt.Errorf("'%s' can only accept one process, got '%d'", ID, len(p))
}

dur, err := time.ParseDuration(m.SampleWindow)
if err != nil {
return nil, errors.Wrapf(err, "could not parse '%s' as duration", m.SampleWindow)
}

return Process{
p: p[0],
sampleWindow: dur,
}, nil
}

// O accepts outputs returning an output representing the given collection.
func (m Meta) O(o []definitions.Output) (definitions.Output, error) {
if len(o) != 1 {
return nil, fmt.Errorf("'%s' can only accept one output, got '%d'", ID, len(o))
}

dur, err := time.ParseDuration(m.SampleWindow)
if err != nil {
return nil, errors.Wrapf(err, "could not parse '%s' as duration", m.SampleWindow)
}

return Output{
o: o[0],
sampleWindow: dur,
}, nil
}

// M accepts meta resources returning a meta input for the given collection.
func (m Meta) M(me []definitions.Meta) (definitions.Meta, error) {
return Meta{}, fmt.Errorf("'%s' is not designed to return a meta resource", ID)
}
36 changes: 36 additions & 0 deletions pkg/resources/utilities/performance/v1/output.go
@@ -0,0 +1,36 @@
package performance1

import (
"context"
"time"

"github.com/andrewneudegg/delta/pkg/events"
"github.com/andrewneudegg/delta/pkg/resources/definitions"
"github.com/pkg/errors"
)

// Output is simple noop.
type Output struct {
o definitions.Output
sampleWindow time.Duration
}

// ID defines what this thing is.
func (o Output) ID() string {
return ID
}

// Type defines what type of resource this is.
func (o Output) Type() definitions.ResourceType {
return definitions.OutputType
}

// DoOutput will perform its function on each collection placed into the channel.
func (o Output) DoOutput(ctx context.Context, ch <-chan events.Collection) error {
if o.o == nil {
return errors.Errorf("'%s' cannot be used as an output resource directly", ID)
}

<-ctx.Done()
return nil
}

0 comments on commit 0055238

Please sign in to comment.