-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpubsub.go
113 lines (95 loc) · 2.92 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package input
import (
"context"
"encoding/json"
"os"
"sync"
"cloud.google.com/go/pubsub"
"github.com/devopsext/events/common"
sreCommon "github.com/devopsext/sre/common"
"github.com/devopsext/utils"
"google.golang.org/api/option"
)
type PubSubInputOptions struct {
Credentials string
ProjectID string
Subscription string
}
type PubSubInput struct {
options PubSubInputOptions
client *pubsub.Client
ctx context.Context
processors *common.Processors
eventer sreCommon.Eventer
tracer sreCommon.Tracer
logger sreCommon.Logger
requests sreCommon.Counter
errors sreCommon.Counter
}
func (ps *PubSubInput) Start(wg *sync.WaitGroup, outputs *common.Outputs) {
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
ps.logger.Info("Start pubsub input...")
sub := ps.client.Subscription(ps.options.Subscription)
ps.logger.Info("PubSub input is up. Listening...")
err := sub.Receive(ps.ctx, func(ctx context.Context, m *pubsub.Message) {
span := ps.tracer.StartSpan()
defer m.Ack()
defer span.Finish()
ps.requests.Inc(ps.options.Subscription)
ps.logger.SpanDebug(span, string(m.Data))
var event common.Event
if err := json.Unmarshal(m.Data, &event); err != nil {
ps.errors.Inc(ps.options.Subscription)
ps.logger.SpanError(span, err)
return
}
p := ps.processors.Find(event.Type)
if p == nil {
ps.logger.SpanDebug(span, "PubSub processor is not found for %s", event.Type)
return
}
event.SetLogger(ps.logger)
event.SetSpanContext(span.GetContext())
err := p.HandleEvent(&event)
if err != nil {
ps.errors.Inc(ps.options.Subscription)
}
})
if err != nil {
ps.logger.Error(err)
}
}(wg)
}
func NewPubSubInput(options PubSubInputOptions, processors *common.Processors, observability *common.Observability) *PubSubInput {
logger := observability.Logs()
if utils.IsEmpty(options.Credentials) || utils.IsEmpty(options.ProjectID) || utils.IsEmpty(options.Subscription) {
logger.Debug("PubSub input credentials, project ID or subscription is not defined. Skipped")
return nil
}
var o option.ClientOption
if _, err := os.Stat(options.Credentials); err == nil {
o = option.WithCredentialsFile(options.Credentials)
} else {
o = option.WithCredentialsJSON([]byte(options.Credentials))
}
ctx := context.Background()
client, err := pubsub.NewClient(ctx, options.ProjectID, o)
if err != nil {
logger.Error(err)
return nil
}
meter := observability.Metrics()
return &PubSubInput{
options: options,
client: client,
ctx: ctx,
processors: processors,
eventer: observability.Events(),
tracer: observability.Traces(),
logger: observability.Logs(),
requests: meter.Counter("requests", "Count of all pubsub input requests", []string{"subscription"}, "pubsub", "input"),
errors: meter.Counter("errors", "Count of all pubsub input errors", []string{"subscription"}, "pubsub", "input"),
}
}