-
Notifications
You must be signed in to change notification settings - Fork 71
/
pulsar.go
127 lines (102 loc) · 3.03 KB
/
pulsar.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package pulsar
import (
"context"
"crypto/tls"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/batchcorp/plumber-schemas/build/go/protos/args"
"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/batchcorp/plumber/types"
"github.com/batchcorp/plumber/util"
"github.com/batchcorp/plumber/validate"
)
const BackendName = "pulsar"
var (
ErrEmptyTopic = errors.New("topic cannot be empty")
ErrEmptySubscriptionName = errors.New("subscription name cannot be empty")
)
type Pulsar struct {
connOpts *opts.ConnectionOptions
connArgs *args.PulsarConn
client pulsar.Client
log *logrus.Entry
}
func New(connOpts *opts.ConnectionOptions) (*Pulsar, error) {
if err := validateBaseConnOpts(connOpts); err != nil {
return nil, errors.Wrap(err, "invalid connection options")
}
client, err := pulsar.NewClient(*getClientOptions(connOpts))
if err != nil {
return nil, errors.Wrap(err, "Could not instantiate Pulsar client")
}
return &Pulsar{
connOpts: connOpts,
connArgs: connOpts.GetPulsar(),
client: client,
log: logrus.WithField("backend", BackendName),
}, nil
}
func getClientOptions(connOpts *opts.ConnectionOptions) *pulsar.ClientOptions {
args := connOpts.GetPulsar()
clientOpts := &pulsar.ClientOptions{
URL: args.Dsn,
OperationTimeout: 30 * time.Second,
ConnectionTimeout: util.DurationSec(args.ConnectTimeoutSeconds),
TLSAllowInsecureConnection: args.TlsSkipVerify,
}
if len(args.TlsClientCert) > 0 && len(args.TlsClientKey) > 0 {
if util.FileExists(args.TlsClientCert) {
// Certs inputted as files
clientOpts.Authentication = pulsar.NewAuthenticationTLS(
string(args.TlsClientCert),
string(args.TlsClientKey),
)
} else {
// Certs inputted as strings
clientOpts.Authentication = pulsar.NewAuthenticationFromTLSCertSupplier(func() (*tls.Certificate, error) {
return &tls.Certificate{
Certificate: [][]byte{[]byte(args.TlsClientCert)},
PrivateKey: args.TlsClientKey,
}, nil
})
}
}
return clientOpts
}
func (p *Pulsar) Name() string {
return BackendName
}
func (p *Pulsar) Close(_ context.Context) error {
p.client.Close() // no return value
return nil
}
func (p *Pulsar) Test(_ context.Context) error {
return types.NotImplementedErr
}
func validateBaseConnOpts(connOpts *opts.ConnectionOptions) error {
if connOpts == nil {
return validate.ErrMissingConnOpts
}
if connOpts.Conn == nil {
return validate.ErrMissingConnCfg
}
pulsarOpts := connOpts.GetPulsar()
if pulsarOpts == nil {
return validate.ErrMissingConnArgs
}
if pulsarOpts.Dsn == "" {
return validate.ErrMissingDSN
}
if pulsarOpts.ConnectTimeoutSeconds <= 0 {
return validate.ErrInvalidConnTimeout
}
if len(pulsarOpts.TlsClientCert) > 0 && len(pulsarOpts.TlsClientKey) == 0 {
return validate.ErrMissingClientKey
}
if len(pulsarOpts.TlsClientKey) > 0 && len(pulsarOpts.TlsClientCert) == 0 {
return validate.ErrMissingClientCert
}
return nil
}