-
Notifications
You must be signed in to change notification settings - Fork 215
/
options.go
141 lines (126 loc) · 4.27 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
package client
import (
"context"
"fmt"
"github.com/cloudevents/sdk-go/v2/binding"
)
// Option is the function signature required to be considered an client.Option.
type Option func(interface{}) error
// WithEventDefaulter adds an event defaulter to the end of the defaulter chain.
func WithEventDefaulter(fn EventDefaulter) Option {
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
if fn == nil {
return fmt.Errorf("client option was given an nil event defaulter")
}
c.eventDefaulterFns = append(c.eventDefaulterFns, fn)
}
return nil
}
}
func WithForceBinary() Option {
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.outboundContextDecorators = append(c.outboundContextDecorators, binding.WithForceBinary)
}
return nil
}
}
func WithForceStructured() Option {
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.outboundContextDecorators = append(c.outboundContextDecorators, binding.WithForceStructured)
}
return nil
}
}
// WithUUIDs adds DefaultIDToUUIDIfNotSet event defaulter to the end of the
// defaulter chain.
func WithUUIDs() Option {
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.eventDefaulterFns = append(c.eventDefaulterFns, DefaultIDToUUIDIfNotSet)
}
return nil
}
}
// WithTimeNow adds DefaultTimeToNowIfNotSet event defaulter to the end of the
// defaulter chain.
func WithTimeNow() Option {
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.eventDefaulterFns = append(c.eventDefaulterFns, DefaultTimeToNowIfNotSet)
}
return nil
}
}
// WithTracePropagation enables trace propagation via the distributed tracing
// extension.
// Deprecated: this is now noop and will be removed in future releases.
// Don't use distributed tracing extension to propagate traces:
// https://github.com/cloudevents/spec/blob/v1.0.1/extensions/distributed-tracing.md#using-the-distributed-tracing-extension
func WithTracePropagation() Option {
return func(i interface{}) error {
return nil
}
}
// WithPollGoroutines configures how much goroutines should be used to
// poll the Receiver/Responder/Protocol implementations.
// Default value is GOMAXPROCS
func WithPollGoroutines(pollGoroutines int) Option {
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.pollGoroutines = pollGoroutines
}
return nil
}
}
// WithObservabilityService configures the observability service to use
// to record traces and metrics
func WithObservabilityService(service ObservabilityService) Option {
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.observabilityService = service
c.inboundContextDecorators = append(c.inboundContextDecorators, service.InboundContextDecorators()...)
}
return nil
}
}
// WithInboundContextDecorator configures a new inbound context decorator.
// Inbound context decorators are invoked to wrap additional informations from the binding.Message
// and propagate these informations in the context passed to the event receiver.
func WithInboundContextDecorator(dec func(context.Context, binding.Message) context.Context) Option {
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.inboundContextDecorators = append(c.inboundContextDecorators, dec)
}
return nil
}
}
// WithBlockingCallback makes the callback passed into StartReceiver is executed as a blocking call,
// i.e. in each poll go routine, the next event will not be received until the callback on current event completes.
// To make event processing serialized (no concurrency), use this option along with WithPollGoroutines(1)
func WithBlockingCallback() Option {
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.blockingCallback = true
}
return nil
}
}
// WithAckMalformedevents causes malformed events received within StartReceiver to be acknowledged
// rather than being permanently not-acknowledged. This can be useful when a protocol does not
// provide a responder implementation and would otherwise cause the receiver to be partially or
// fully stuck.
func WithAckMalformedEvent() Option {
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.ackMalformedEvent = true
}
return nil
}
}