Skip to content

Commit

Permalink
chore: add pre callbacks (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
henomis committed May 19, 2023
1 parent 7e151ce commit 72cd999
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 26 deletions.
18 changes: 8 additions & 10 deletions examples/pipeline/callbacks/main.go
Expand Up @@ -31,26 +31,24 @@ func main() {
},
)

cbTranslate := pipeline.PipelineCallback(func(output types.M) (types.M, error) {
iterator++
return output, nil
translatePreCallback := pipeline.PipelineCallback(func(input types.M) (types.M, error) {
input["language"] = languages[iterator]
input["sentence"] = sentence
return input, nil
})

cbExpand := pipeline.PipelineCallback(func(output types.M) (types.M, error) {

expandPostCallback := pipeline.PipelineCallback(func(output types.M) (types.M, error) {
iterator++
if iterator >= len(languages) {
pipeline.SetNextTubeExit(output)
} else {
pipeline.SetNextTube(output, 0)
output["language"] = languages[iterator]
output["sentence"] = sentence
}

return output, nil
})

pipeLine := pipeline.New(translate, expand).WithCallbacks(cbTranslate, cbExpand)
pipeLine := pipeline.New(translate, expand).WithPreCallbacks(translatePreCallback, nil).WithPostCallbacks(nil, expandPostCallback)

pipeLine.Run(context.Background(), types.M{"sentence": sentence, "language": languages[iterator]})
pipeLine.Run(context.Background(), nil)

}
67 changes: 51 additions & 16 deletions pipeline/pipeline.go
Expand Up @@ -31,44 +31,73 @@ type Pipe interface {
Run(ctx context.Context, input types.M) (types.M, error)
}

type PipelineCallback func(input types.M) (types.M, error)
type PipelineCallback func(values types.M) (types.M, error)

type pipeline struct {
pipes []Pipe
callbacks []PipelineCallback
pipes map[int]Pipe
preCallbacks map[int]PipelineCallback
postCallbacks map[int]PipelineCallback
}

func New(pipes ...Pipe) *pipeline {

pipesMap := make(map[int]Pipe)
for i, pipe := range pipes {
pipesMap[i] = pipe
}

return &pipeline{
pipes: pipes,
pipes: pipesMap,
}
}

func (p *pipeline) WithCallbacks(callbacks ...PipelineCallback) pipeline {
p.callbacks = callbacks
return *p
func (p *pipeline) WithPreCallbacks(callbacks ...PipelineCallback) *pipeline {

p.preCallbacks = make(map[int]PipelineCallback)
for i, callback := range callbacks {
p.preCallbacks[i] = callback
}

return p
}

func (p *pipeline) WithPostCallbacks(callbacks ...PipelineCallback) *pipeline {

p.postCallbacks = make(map[int]PipelineCallback)
for i, callback := range callbacks {
p.postCallbacks[i] = callback
}

return p
}

// Run chains the steps of the pipeline and returns the output of the last step.
func (p pipeline) Run(ctx context.Context, input types.M) (types.M, error) {
var err error
var output types.M
currentTube := -1
currentTube := 0

if input == nil {
input = types.M{}
}

output := input

for {

if currentTube == -1 {
currentTube = 0
output = input
if p.thereIsAValidPreCallbackForTube(currentTube) {
output, err = p.preCallbacks[currentTube](output)
if err != nil {
return nil, err
}
}

output, err = p.pipes[currentTube].Run(ctx, output)
if err != nil {
return nil, err
}

if p.thereIsAValidCallbackForTube(currentTube) {
output, err = p.callbacks[currentTube](output)
if p.thereIsAValidPostCallbackForTube(currentTube) {
output, err = p.postCallbacks[currentTube](output)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -105,8 +134,14 @@ func SetNextTubeExit(output types.M) types.M {
return output
}

func (p *pipeline) thereIsAValidCallbackForTube(currentTube int) bool {
return len(p.callbacks) == len(p.pipes) && p.callbacks[currentTube] != nil
func (p *pipeline) thereIsAValidPreCallbackForTube(currentTube int) bool {
cb, ok := p.preCallbacks[currentTube]
return cb != nil && ok
}

func (p *pipeline) thereIsAValidPostCallbackForTube(currentTube int) bool {
cb, ok := p.postCallbacks[currentTube]
return cb != nil && ok
}

func (p *pipeline) getNextTube(output types.M) *int {
Expand Down

0 comments on commit 72cd999

Please sign in to comment.