-
Notifications
You must be signed in to change notification settings - Fork 4
/
builder_options.go
102 lines (80 loc) · 2.66 KB
/
builder_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package pipeline
import (
"time"
"github.com/arquivei/foundationkit/errors"
"github.com/arquivei/goduck"
"github.com/go-kit/kit/endpoint"
"github.com/rs/zerolog/log"
)
// pipelineBuilderOptions is the configuration of a pipeline.
type pipelineBuilderOptions struct {
// endpoint that executes the main business logic
endpoint endpoint.Endpoint
// engineType determines the type of engine to be used
engineType string
// GoDuck configuration:
//
// inputStreams are the message sources
inputStreams []goduck.Stream
// messagePool, if set, makes the pipeline uses a job pool engine to fetch the messages.
messagePool goduck.MessagePool
// nPoolWorkers is the number of workers consuming from the job pool.
nPoolWorkers int
// Decoders converts the messages from the input stream to an endpoint request.
// Only one of the following decoders is allowed. This affects with kind of engine will be used.
//
// batchDecoder decodes the stream in batches. Implies batchstreamengine.
batchDecoder goduck.EndpointBatchDecoder
// batchSize is the size of the batch when using batchstreamengine.
// Defaults to 1.
batchSize int
// decoder decodes the stream one message at a time. Implies streamengine.
decoder goduck.EndpointDecoder
// maxTimeout is the timeout when fetching messages from the stream
maxTimeout time.Duration
// sink configuration:
//
// sink handles the endpoint responses. Could be publishing to a topic of saving into a database.
sink Sink
// sinkEncoder encodes an endpoint response into a SinkMessage
sinkEncoder SinkEncoder
// dlq handles errors that have been retried and failed
dlq struct {
brokers []string
topic string
username string
password string
}
middlewares []endpoint.Middleware
}
func checkPipelineBuilderOptions(c pipelineBuilderOptions) error {
const op = errors.Op("checkPipelineBuilderOptions")
if c.endpoint == nil {
return errors.E(op, ErrEndpointNil)
}
if c.messagePool == nil && len(c.inputStreams) == 0 {
return errors.E(op, ErrEmptyInputStreamOrMessagePool)
}
if c.messagePool != nil && len(c.inputStreams) > 1 {
return errors.E(op, ErrBothInputSet)
}
if c.decoder == nil && c.batchDecoder == nil {
return errors.E(op, ErrNilDecoders)
}
if c.decoder != nil && c.batchDecoder != nil {
return errors.E(op, ErrBothDecodersSet)
}
if c.batchDecoder != nil && c.batchSize < 1 {
return errors.E(op, ErrBatchSizeInvalid)
}
if c.sink == nil {
return errors.E(op, ErrSinkNil)
}
if c.sinkEncoder == nil {
return errors.E(op, ErrSinkEncoderNil)
}
if c.dlq.brokers == nil {
log.Warn().Msg("[goduck][pipeline] No DLQ Topic is set, all messages will be retried indefinitely.")
}
return nil
}