/
write.go
46 lines (35 loc) · 1.16 KB
/
write.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package nsq
import (
"context"
"fmt"
"github.com/nsqio/go-nsq"
"github.com/pkg/errors"
"github.com/batchcorp/plumber/util"
"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/batchcorp/plumber-schemas/build/go/protos/records"
)
func (n *NSQ) Write(ctx context.Context, writeOpts *opts.WriteOptions, errorCh chan<- *records.ErrorRecord, messages ...*records.WriteRecord) error {
if err := validateWriteOptions(writeOpts); err != nil {
return errors.Wrap(err, "unable to validate write options")
}
producer, err := nsq.NewProducer(n.connArgs.NsqdAddress, n.config)
if err != nil {
return errors.Wrap(err, "unable to start NSQ producer")
}
defer producer.Stop()
producer.SetLogger(n.log, nsq.LogLevelError)
topic := writeOpts.GetNsq().Args.Topic
for _, value := range messages {
if err := producer.Publish(topic, []byte(value.Input)); err != nil {
util.WriteError(n.log.Entry, errorCh, fmt.Errorf("unable to write message to '%s': %s", topic, err))
continue
}
}
return nil
}
func validateWriteOptions(writeOpts *opts.WriteOptions) error {
if writeOpts.GetNsq().Args.Topic == "" {
return ErrMissingTopic
}
return nil
}