forked from alebabai/go-kit-kafka
/
producer.go
137 lines (114 loc) · 3.41 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package transport
import (
"context"
"encoding/json"
"github.com/go-kit/kit/endpoint"
"github.com/SoftSwiss/go-kit-kafka/kafka"
)
// Producer wraps single Kafka topic for message producing
// and implements endpoint.Endpoint.
type Producer struct {
handler kafka.Handler
topic string
response interface{}
enc EncodeRequestFunc
before []RequestFunc
after []ProducerResponseFunc
finalizer []ProducerFinalizerFunc
}
// successResponse represents successful empty response placeholder
type successResponse struct{}
// NewProducer constructs a new producer for a single Kafka topic,
// which implements endpoint.Endpoint.
func NewProducer(
handler kafka.Handler,
topic string,
enc EncodeRequestFunc,
options ...ProducerOption,
) *Producer {
p := &Producer{
handler: handler,
topic: topic,
response: successResponse{},
enc: enc,
}
for _, opt := range options {
opt(p)
}
return p
}
// ProducerOption sets an optional parameter for producers.
type ProducerOption func(producer *Producer)
// ProducerResponse sets the successful response value
func ProducerResponse(response interface{}) ProducerOption {
return func(p *Producer) {
p.response = response
}
}
// ProducerBefore sets the RequestFuncs that are applied to the outgoing producer
// request before it's invoked.
func ProducerBefore(before ...RequestFunc) ProducerOption {
return func(p *Producer) {
p.before = append(p.before, before...)
}
}
// ProducerAfter adds one or more ProducerResponseFuncs, which are applied to the
// context after successful message producing.
// This is useful for context-manipulation operations.
func ProducerAfter(after ...ProducerResponseFunc) ProducerOption {
return func(p *Producer) {
p.after = append(p.after, after...)
}
}
// ProducerFinalizer adds one or more ProducerFinalizerFuncs to be executed at the
// end of producing Kafka message. Finalizers are executed in the order in which they
// were added. By default, no finalizer is registered.
func ProducerFinalizer(f ...ProducerFinalizerFunc) ProducerOption {
return func(p *Producer) {
p.finalizer = append(p.finalizer, f...)
}
}
// Endpoint returns a usable endpoint that invokes message producing.
func (p Producer) Endpoint() endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if len(p.finalizer) > 0 {
defer func() {
for _, f := range p.finalizer {
f(ctx, err)
}
}()
}
msg := &kafka.Message{
Topic: p.topic,
}
if err := p.enc(ctx, msg, request); err != nil {
return nil, err
}
for _, f := range p.before {
ctx = f(ctx, msg)
}
if err := p.handler.Handle(ctx, msg); err != nil {
return nil, err
}
for _, f := range p.after {
ctx = f(ctx)
}
return p.response, nil
}
}
// ProducerFinalizerFunc can be used to perform work at the end of a producing Kafka message,
// after response is returned. The principal intended use is for error logging.
type ProducerFinalizerFunc func(ctx context.Context, err error)
// EncodeJSONRequest is an EncodeRequestFunc that serializes the request as a
// JSON object to the Message value.
// Many services can use it as a sensible default.
func EncodeJSONRequest(_ context.Context, msg *kafka.Message, request interface{}) error {
rawJSON, err := json.Marshal(request)
if err != nil {
return err
}
msg.Value = rawJSON
return nil
}