forked from go-kit/kit
/
request_response_func.go
182 lines (160 loc) · 6.1 KB
/
request_response_func.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package amqp
import (
"context"
"time"
"github.com/streadway/amqp"
)
// RequestFunc may take information from a publisher request and put it into a
// request context. In Subscribers, RequestFuncs are executed prior to invoking
// the endpoint.
type RequestFunc func(context.Context, *amqp.Publishing, *amqp.Delivery) context.Context
// SubscriberResponseFunc may take information from a request context and use it to
// manipulate a Publisher. SubscriberResponseFuncs are only executed in
// subscribers, after invoking the endpoint but prior to publishing a reply.
type SubscriberResponseFunc func(context.Context,
*amqp.Delivery,
Channel,
*amqp.Publishing,
) context.Context
// PublisherResponseFunc may take information from an AMQP request and make the
// response available for consumption. PublisherResponseFunc are only executed
// in publishers, after a request has been made, but prior to it being decoded.
type PublisherResponseFunc func(context.Context, *amqp.Delivery) context.Context
// SetPublishExchange returns a RequestFunc that sets the Exchange field
// of an AMQP Publish call.
func SetPublishExchange(publishExchange string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
return context.WithValue(ctx, ContextKeyExchange, publishExchange)
}
}
// SetPublishKey returns a RequestFunc that sets the Key field
// of an AMQP Publish call.
func SetPublishKey(publishKey string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
return context.WithValue(ctx, ContextKeyPublishKey, publishKey)
}
}
// SetPublishDeliveryMode sets the delivery mode of a Publishing.
// Please refer to AMQP delivery mode constants in the AMQP package.
func SetPublishDeliveryMode(dmode uint8) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
pub.DeliveryMode = dmode
return ctx
}
}
// SetNackSleepDuration returns a RequestFunc that sets the amount of time
// to sleep in the event of a Nack.
// This has to be used in conjunction with an error encoder that Nack and sleeps.
// One example is the SingleNackRequeueErrorEncoder.
// It is designed to be used by Subscribers.
func SetNackSleepDuration(duration time.Duration) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
return context.WithValue(ctx, ContextKeyNackSleepDuration, duration)
}
}
// SetConsumeAutoAck returns a RequestFunc that sets whether or not to autoAck
// messages when consuming.
// When set to false, the publisher will Ack the first message it receives with
// a matching correlationId.
// It is designed to be used by Publishers.
func SetConsumeAutoAck(autoAck bool) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
return context.WithValue(ctx, ContextKeyAutoAck, autoAck)
}
}
// SetConsumeArgs returns a RequestFunc that set the arguments for amqp Consume
// function.
// It is designed to be used by Publishers.
func SetConsumeArgs(args amqp.Table) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
return context.WithValue(ctx, ContextKeyConsumeArgs, args)
}
}
// SetContentType returns a RequestFunc that sets the ContentType field of
// an AMQP Publishing.
func SetContentType(contentType string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
pub.ContentType = contentType
return ctx
}
}
// SetContentEncoding returns a RequestFunc that sets the ContentEncoding field
// of an AMQP Publishing.
func SetContentEncoding(contentEncoding string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
pub.ContentEncoding = contentEncoding
return ctx
}
}
// SetCorrelationID returns a RequestFunc that sets the CorrelationId field
// of an AMQP Publishing.
func SetCorrelationID(cid string) RequestFunc {
return func(ctx context.Context, pub *amqp.Publishing, _ *amqp.Delivery) context.Context {
pub.CorrelationId = cid
return ctx
}
}
// SetAckAfterEndpoint returns a SubscriberResponseFunc that prompts the service
// to Ack the Delivery object after successfully evaluating the endpoint,
// and before it encodes the response.
// It is designed to be used by Subscribers.
func SetAckAfterEndpoint(multiple bool) SubscriberResponseFunc {
return func(ctx context.Context,
deliv *amqp.Delivery,
ch Channel,
pub *amqp.Publishing,
) context.Context {
deliv.Ack(multiple)
return ctx
}
}
func getPublishExchange(ctx context.Context) string {
if exchange := ctx.Value(ContextKeyExchange); exchange != nil {
return exchange.(string)
}
return ""
}
func getPublishKey(ctx context.Context) string {
if publishKey := ctx.Value(ContextKeyPublishKey); publishKey != nil {
return publishKey.(string)
}
return ""
}
func getNackSleepDuration(ctx context.Context) time.Duration {
if duration := ctx.Value(ContextKeyNackSleepDuration); duration != nil {
return duration.(time.Duration)
}
return 0
}
func getConsumeAutoAck(ctx context.Context) bool {
if autoAck := ctx.Value(ContextKeyAutoAck); autoAck != nil {
return autoAck.(bool)
}
return false
}
func getConsumeArgs(ctx context.Context) amqp.Table {
if args := ctx.Value(ContextKeyConsumeArgs); args != nil {
return args.(amqp.Table)
}
return nil
}
type contextKey int
const (
// ContextKeyExchange is the value of the reply Exchange in
// amqp.Publish.
ContextKeyExchange contextKey = iota
// ContextKeyPublishKey is the value of the ReplyTo field in
// amqp.Publish.
ContextKeyPublishKey
// ContextKeyNackSleepDuration is the duration to sleep for if the
// service Nack and requeues a message.
// This is to prevent sporadic send-resending of message
// when a message is constantly Nack'd and requeued.
ContextKeyNackSleepDuration
// ContextKeyAutoAck is the value of autoAck field when calling
// amqp.Channel.Consume.
ContextKeyAutoAck
// ContextKeyConsumeArgs is the value of consumeArgs field when calling
// amqp.Channel.Consume.
ContextKeyConsumeArgs
)