Skip to content

deliveryhero/pipeline

Repository files navigation

pipeline

Build GoDoc Go Report Card

Pipeline is a go library that helps you build pipelines without worrying about channel management and concurrency. It contains common fan-in and fan-out operations as well as useful utility funcs for batch processing and scaling.

If you have another common use case you would like to see covered by this package, please open a feature request.

Cookbook

Functions

func Apply

func Apply[A, B, C any](a Processor[A, []B], b Processor[B, C]) Processor[A, []C]

Apply connects two processes, applying the second to each item of the first output

transform := pipeline.NewProcessor(func(_ context.Context, s string) ([]string, error) {
    return strings.Split(s, ","), nil
}, nil)

double := pipeline.NewProcessor(func(_ context.Context, s string) (string, error) {
    return s + s, nil
}, nil)

addLeadingZero := pipeline.NewProcessor(func(_ context.Context, s string) (string, error) {
    return "0" + s, nil
}, nil)

apply := pipeline.Apply(
    transform,
    pipeline.Sequence(
        double,
        addLeadingZero,
        double,
    ),
)

input := "1,2,3,4,5"

for out := range pipeline.Process(context.Background(), apply, pipeline.Emit(input)) {
    for j := range out {
        fmt.Printf("process: %s\n", out[j])
    }
}

Output:

process: 011011
process: 022022
process: 033033
process: 044044
process: 055055

func Buffer

func Buffer[Item any](size int, in <-chan Item) <-chan Item

Buffer creates a buffered channel that will close after the input is closed and the buffer is fully drained

func Cancel

func Cancel[Item any](ctx context.Context, cancel func(Item, error), in <-chan Item) <-chan Item

Cancel passes an Item any from the in <-chan Item directly to the out <-chan Item until the Context is canceled. After the context is canceled, everything from in <-chan Item is sent to the cancel func instead with the ctx.Err().

// Create a context that lasts for 1 second
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// Create a basic pipeline that emits one int every 250ms
p := pipeline.Delay(ctx, time.Second/4,
    pipeline.Emit(1, 2, 3, 4, 5),
)

// If the context is canceled, pass the ints to the cancel func for teardown
p = pipeline.Cancel(ctx, func(i int, err error) {
    fmt.Printf("%+v could not be processed, %s\n", i, err)
}, p)

// Otherwise, process the inputs
for out := range p {
    fmt.Printf("process: %+v\n", out)
}

Output:

process: 1
process: 2
process: 3
process: 4
5 could not be processed, context deadline exceeded

func Collect

func Collect[Item any](ctx context.Context, maxSize int, maxDuration time.Duration, in <-chan Item) <-chan []Item

Collect collects [Item any]s from its in channel and returns []Item from its out channel. It will collect up to maxSize inputs from the in <-chan Item over up to maxDuration before returning them as []Item. That means when maxSize is reached before maxDuration, [maxSize]Item will be passed to the out channel. But if maxDuration is reached before maxSize inputs are collected, [< maxSize]Item will be passed to the out channel. When the context is canceled, everything in the buffer will be flushed to the out channel.

func Delay

func Delay[Item any](ctx context.Context, duration time.Duration, in <-chan Item) <-chan Item

Delay delays reading each input by duration. If the context is canceled, the delay will not be applied.

func Drain

func Drain[Item any](in <-chan Item)

Drain empties the input and blocks until the channel is closed

func Emit

func Emit[Item any](is ...Item) <-chan Item

Emit fans is ...Item`` out to a <-chan Item`

func Emitter

func Emitter[Item any](ctx context.Context, next func() Item) <-chan Item

Emitter continuously emits new items generated by the next func until the context is canceled

func Merge

func Merge[Item any](ins ...<-chan Item) <-chan Item

Merge fans multiple channels in to a single channel

one := pipeline.Emit(1)
two := pipeline.Emit(2, 2)
three := pipeline.Emit(3, 3, 3)

for i := range pipeline.Merge(one, two, three) {
    fmt.Printf("output: %d\n", i)
}

fmt.Println("done")

Output:

Output:: 1
Output:: 3
Output:: 2
Output:: 2
Output:: 3
Output:: 3
done

func Process

func Process[Input, Output any](ctx context.Context, processor Processor[Input, Output], in <-chan Input) <-chan Output

Process takes each input from the in <-chan Input and calls Processor.Process on it. When Processor.Process returns an Output, it will be sent to the output <-chan Output. If Processor.Process returns an error, Processor.Cancel will be called with the corresponding input and error message. Finally, if the Context is canceled, all inputs remaining in the in <-chan Input will go directly to Processor.Cancel.

// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Create a pipeline that emits 1-6 at a rate of one int per second
p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6))

// Multiply each number by 10
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, in int) (int, error) {
    return in * 10, nil
}, func(i int, err error) {
    fmt.Printf("error: could not multiply %v, %s\n", i, err)
}), p)

// Finally, lets print the results and see what happened
for result := range p {
    fmt.Printf("result: %d\n", result)
}

Output:

result: 10
result: 20
result: 30
result: 40
result: 50
error: could not multiply 6, context deadline exceeded

func ProcessBatch[Input, Output any]( ctx context.Context, maxSize int, maxDuration time.Duration, processor Processor[[]Input, []Output], in <-chan Input, ) <-chan Output

ProcessBatch collects up to maxSize elements over maxDuration and processes them together as a slice of Inputs. It passed an []Output to the Processor.Process method and expects a []Input back. It passes []Input batches of inputs to the Processor.Cancel method. If the receiver is backed up, ProcessBatch can holds up to 2x maxSize.

// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Create a pipeline that emits 1-6 at a rate of one int per second
p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6))

// Multiply every 2 adjacent numbers together
p = pipeline.ProcessBatch(ctx, 2, time.Minute, pipeline.NewProcessor(func(ctx context.Context, is []int) ([]int, error) {
    o := 1
    for _, i := range is {
        o *= i
    }
    return []int{o}, nil
}, func(is []int, err error) {
    fmt.Printf("error: could not multiply %v, %s\n", is, err)
}), p)

// Finally, lets print the results and see what happened
for result := range p {
    fmt.Printf("result: %d\n", result)
}

Output:

result: 2
result: 12
error: could not multiply [5 6], context deadline exceeded

func ProcessBatchConcurrently[Input, Output any]( ctx context.Context, concurrently, maxSize int, maxDuration time.Duration, processor Processor[[]Input, []Output], in <-chan Input, ) <-chan Output

ProcessBatchConcurrently fans the in channel out to multiple batch Processors running concurrently, then it fans the out channels of the batch Processors back into a single out chan

// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Create a pipeline that emits 1-9
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9)

// Add a 1 second delay to each number
p = pipeline.Delay(ctx, time.Second, p)

// Group two inputs at a time
p = pipeline.ProcessBatchConcurrently(ctx, 2, 2, time.Minute, pipeline.NewProcessor(func(ctx context.Context, ins []int) ([]int, error) {
    return ins, nil
}, func(i []int, err error) {
    fmt.Printf("error: could not process %v, %s\n", i, err)
}), p)

// Finally, lets print the results and see what happened
for result := range p {
    fmt.Printf("result: %d\n", result)
}

Output:

result: 1
result: 2
result: 3
result: 5
error: could not process [7 8], context deadline exceeded
error: could not process [4 6], context deadline exceeded
error: could not process [9], context deadline exceeded

func ProcessConcurrently[Input, Output any](ctx context.Context, concurrently int, p Processor[Input, Output], in <-chan Input) <-chan Output

ProcessConcurrently fans the in channel out to multiple Processors running concurrently, then it fans the out channels of the Processors back into a single out chan

// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Create a pipeline that emits 1-7
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7)

// Add a two second delay to each number
p = pipeline.Delay(ctx, 2*time.Second, p)

// Add two concurrent processors that pass input numbers to the output
p = pipeline.ProcessConcurrently(ctx, 2, pipeline.NewProcessor(func(ctx context.Context, in int) (int, error) {
    return in, nil
}, func(i int, err error) {
    fmt.Printf("error: could not process %v, %s\n", i, err)
}), p)

// Finally, lets print the results and see what happened
for result := range p {
    log.Printf("result: %d\n", result)
}

Output:

result: 2
result: 1
result: 4
result: 3
error: could not process 6, process was canceled
error: could not process 5, process was canceled
error: could not process 7, context deadline exceeded

func Split

func Split[Item any](in <-chan []Item) <-chan Item

Split takes an interface from Collect and splits it back out into individual elements

Examples

PipelineShutsDownOnError

The following example shows how you can shutdown a pipeline gracefully when it receives an error message

// Create a context that can be canceled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a pipeline that emits 1-10
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// A step that will shutdown the pipeline if the number is greater than 1
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) {
    // Shut down the pipeline by canceling the context
    if i != 1 {
        cancel()
        return i, fmt.Errorf("%d caused the shutdown", i)
    }
    return i, nil
}, func(i int, err error) {
    // The cancel func is called when an error is returned by the process func or the context is canceled
    fmt.Printf("could not process %d: %s\n", i, err)
}), p)

// Finally, lets print the results and see what happened
for result := range p {
    fmt.Printf("result: %d\n", result)
}

fmt.Println("exiting the pipeline after all data is processed")

Output:

could not process 2: 2 caused the shutdown
result: 1
could not process 3: context canceled
could not process 4: context canceled
could not process 5: context canceled
could not process 6: context canceled
could not process 7: context canceled
could not process 8: context canceled
could not process 9: context canceled
could not process 10: context canceled
exiting the pipeline after all data is processed

PipelineShutsDownWhenContainerIsKilled

This example demonstrates a pipline that runs until the os / container the pipline is running in kills it

// Gracefully shutdown the pipeline when the the system is shutting down
// by canceling the context when os.Kill or os.Interrupt signal is sent
ctx, cancel := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
defer cancel()

// Create a pipeline that keeps emitting numbers sequentially until the context is canceled
var count int
p := pipeline.Emitter(ctx, func() int {
    count++
    return count
})

// Filter out only even numbers
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) {
    if i%2 == 0 {
        return i, nil
    }
    return i, fmt.Errorf("'%d' is an odd number", i)
}, func(i int, err error) {
    fmt.Printf("error processing '%v': %s\n", i, err)
}), p)

// Wait a few nanoseconds an simulate the os.Interrupt signal
go func() {
    time.Sleep(time.Millisecond / 10)
    fmt.Print("\n--- os kills the app ---\n\n")
    syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}()

// Finally, lets print the results and see what happened
for result := range p {
    fmt.Printf("result: %d\n", result)
}

fmt.Println("exiting after the input channel is closed")

Output:

error processing '1': '1' is an odd number
result: 2

--- os kills the app ---

error processing '3': '3' is an odd number
error processing '4': context canceled
exiting after the input channel is closed

PipelineShutsDownWhenInputChannelIsClosed

The following example demonstrates a pipeline that naturally finishes its run when the input channel is closed

// Create a context that can be canceled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create a pipeline that emits 1-10 and then closes its output channel
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// Multiply every number by 2
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) {
    return i * 2, nil
}, func(i int, err error) {
    fmt.Printf("could not multiply %d: %s\n", i, err)
}), p)

// Finally, lets print the results and see what happened
for result := range p {
    fmt.Printf("result: %d\n", result)
}

fmt.Println("exiting after the input channel is closed")

Output:

result: 2
result: 4
result: 6
result: 8
result: 10
result: 12
result: 14
result: 16
result: 18
result: 20
exiting after the input channel is closed