This repository has been archived by the owner on Jan 17, 2022. It is now read-only.
/
factory.go
64 lines (54 loc) · 1.86 KB
/
factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package kitkafka
import (
"github.com/go-kit/kit/endpoint"
"github.com/segmentio/kafka-go"
)
// MakeClient creates an Handler. This handler can write *kafka.Message to
// kafka broker. The Handler is mean to be consumed by NewPublisher.
func MakeClient(writer *kafka.Writer) (*writerHandle, error) {
return &writerHandle{
Writer: writer,
}, nil
}
type subscriberConfig struct {
parallelism int
syncCommit bool
}
// ReaderOpt are options that configures the kafka reader.
type ReaderOpt func(config *subscriberConfig)
// WithParallelism configures the parallelism of fan out workers.
func WithParallelism(parallelism int) ReaderOpt {
return func(config *subscriberConfig) {
config.parallelism = parallelism
}
}
// WithSyncCommit is an kafka option that when enabled, only commit the message
// synchronously if no error is returned from the endpoint.
func WithSyncCommit() ReaderOpt {
return func(config *subscriberConfig) {
config.syncCommit = true
}
}
// MakeSubscriberServer creates a *SubscriberServer. Subscriber is the go kit transport layer equivalent.
func MakeSubscriberServer(reader *kafka.Reader, subscriber Handler, opt ...ReaderOpt) (*SubscriberServer, error) {
var config = subscriberConfig{
parallelism: 1,
}
for _, o := range opt {
o(&config)
}
return &SubscriberServer{
reader: reader,
handler: subscriber,
parallelism: config.parallelism,
syncCommit: config.syncCommit,
}, nil
}
type publisherConfig struct{}
// A PublisherOpt is an option that configures publisher.
type PublisherOpt func(config *publisherConfig)
// MakePublisherService returns a *PublisherService that can publish user-domain messages to kafka brokers.
// in go kit analog, this is a service with one method, publish.
func MakePublisherService(endpoint endpoint.Endpoint, opt ...PublisherOpt) *PublisherService {
return &PublisherService{endpoint: endpoint}
}