This repository has been archived by the owner on Jun 19, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 75
/
options.go
109 lines (93 loc) · 2.95 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/*
Copyright 2020 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package handler
import (
"runtime"
"time"
"cloud.google.com/go/pubsub"
)
var (
defaultHandlerConcurrency = runtime.NumCPU()
defaultMaxConcurrencyPerEvent = 1
defaultTimeout = 10 * time.Minute
// This is the pubsub default MaxExtension.
// It would not make sense for handler timeout per event be greater
// than this value because the message would be nacked before the handler
// timeouts.
// TODO: consider allow changing this value?
maxTimeout = 10 * time.Minute
)
// Options holds all the options for create handler pool.
type Options struct {
// HandlerConcurrency is the number of goroutines
// will be spawned in each handler.
HandlerConcurrency int
// MaxConcurrencyPerEvent is the max number of goroutines
// will be spawned to handle an event.
MaxConcurrencyPerEvent int
// TimeoutPerEvent is the timeout for handling an event.
TimeoutPerEvent time.Duration
// DeliveryTimeout is the timeout for delivering an event to a consumer.
DeliveryTimeout time.Duration
// PubsubReceiveSettings is the pubsub receive settings.
PubsubReceiveSettings pubsub.ReceiveSettings
}
// NewOptions creates a Options.
func NewOptions(opts ...Option) (*Options, error) {
opt := &Options{
HandlerConcurrency: defaultHandlerConcurrency,
MaxConcurrencyPerEvent: defaultMaxConcurrencyPerEvent,
TimeoutPerEvent: defaultTimeout,
PubsubReceiveSettings: pubsub.DefaultReceiveSettings,
}
for _, o := range opts {
o(opt)
}
return opt, nil
}
// Option is for providing individual option.
type Option func(*Options)
// WithHandlerConcurrency sets HandlerConcurrency.
func WithHandlerConcurrency(c int) Option {
return func(o *Options) {
o.HandlerConcurrency = c
}
}
// WithMaxConcurrentPerEvent sets MaxConcurrencyPerEvent.
func WithMaxConcurrentPerEvent(c int) Option {
return func(o *Options) {
o.MaxConcurrencyPerEvent = c
}
}
// WithTimeoutPerEvent sets TimeoutPerEvent.
func WithTimeoutPerEvent(t time.Duration) Option {
return func(o *Options) {
if t > maxTimeout {
o.TimeoutPerEvent = maxTimeout
} else {
o.TimeoutPerEvent = t
}
}
}
// WithPubsubReceiveSettings sets PubsubReceiveSettings.
func WithPubsubReceiveSettings(s pubsub.ReceiveSettings) Option {
return func(o *Options) {
o.PubsubReceiveSettings = s
}
}
// WithDeliveryTimeout sets the DeliveryTimeout.
func WithDeliveryTimeout(t time.Duration) Option {
return func(o *Options) {
o.DeliveryTimeout = t
}
}