This repository has been archived by the owner on Jun 19, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 75
/
adapter.go
254 lines (208 loc) · 7.88 KB
/
adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
/*
Copyright 2019 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package adapter
import (
"context"
"fmt"
"github.com/cloudevents/sdk-go/pkg/cloudevents/observability"
cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http"
cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
"go.uber.org/zap"
"knative.dev/pkg/logging"
"github.com/google/knative-gcp/pkg/kncloudevents"
"github.com/google/knative-gcp/pkg/pubsub/adapter/converters"
"github.com/google/knative-gcp/pkg/reconciler/decorator/resources"
)
// Adapter implements the Pub/Sub adapter to deliver Pub/Sub messages from a
// pre-existing topic/subscription to a Sink.
type Adapter struct {
// Environment variable containing project id.
Project string `envconfig:"PROJECT_ID"`
// Environment variable containing the sink URI.
Sink string `envconfig:"SINK_URI" required:"true"`
// Environment variable containing the transformer URI.
Transformer string `envconfig:"TRANSFORMER_URI"`
// Topic is the environment variable containing the PubSub Topic being
// subscribed to's name. In the form that is unique within the project.
// E.g. 'laconia', not 'projects/my-gcp-project/topics/laconia'.
Topic string `envconfig:"PUBSUB_TOPIC_ID" required:"true"`
// Subscription is the environment variable containing the name of the
// subscription to use.
Subscription string `envconfig:"PUBSUB_SUBSCRIPTION_ID" required:"true"`
// ExtensionsBased64 is a based64 encoded json string of a map of
// CloudEvents extensions (key-value pairs) override onto the outbound
// event.
ExtensionsBased64 string `envconfig:"K_CE_EXTENSIONS" required:"true"`
// extensions is the converted ExtensionsBased64 value.
extensions map[string]string
// SendMode describes how the adapter sends events.
// One of [binary, structured, push]. Default: binary
SendMode converters.ModeType `envconfig:"SEND_MODE" default:"binary" required:"true"`
// MetricsDomain holds the metrics domain to use for surfacing metrics.
MetricsDomain string `envconfig:"METRICS_DOMAIN" required:"true"`
// inbound is the cloudevents client to use to receive events.
inbound cloudevents.Client
// outbound is the cloudevents client to use to send events.
outbound cloudevents.Client
// transformer is the cloudevents client to transform received events before sending.
transformer cloudevents.Client
}
// Start starts the adapter. Note: Only call once, not thread safe.
func (a *Adapter) Start(ctx context.Context) error {
var err error
if a.SendMode == "" {
a.SendMode = converters.DefaultSendMode
}
// Convert base64 encoded json map to extensions map.
// This implementation comes from the Decorator object.
a.extensions = resources.MakeDecoratorExtensionsMap(a.ExtensionsBased64)
// Receive Events on Pub/Sub.
if a.inbound == nil {
if a.inbound, err = a.newPubSubClient(ctx); err != nil {
return fmt.Errorf("failed to create inbound cloudevent client: %s", err.Error())
}
}
// Send events on HTTP.
if a.outbound == nil {
if a.outbound, err = a.newHTTPClient(ctx, a.Sink); err != nil {
return fmt.Errorf("failed to create outbound cloudevent client: %s", err.Error())
}
}
// Make the transformer client in case the TransformerURI has been set.
if a.Transformer != "" {
if a.transformer == nil {
if a.transformer, err = kncloudevents.NewDefaultClient(a.Transformer); err != nil {
return fmt.Errorf("failed to create transformer cloudevent client: %s", err.Error())
}
}
}
return a.inbound.StartReceiver(ctx, a.receive)
}
func (a *Adapter) receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
ctx, r := observability.NewReporter(ctx, CodecObserved{o: reportReceive})
err := a.obsReceive(ctx, event, resp)
if err != nil {
r.Error()
} else {
r.OK()
}
return err
}
func (a *Adapter) obsReceive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
logger := logging.FromContext(ctx).With(zap.Any("event.id", event.ID()), zap.Any("sink", a.Sink))
// If a transformer has been configured, then transform the message.
if a.transformer != nil {
// TODO: I do not like the transformer as it is. It would be better to pass the transport context and the
// message to the transformer function as a transform request. Better yet, only do it for conversion issues?
transformedEvent, err := a.transformer.Send(ctx, event)
if err != nil {
logger.Errorf("error transforming cloud event %q", event.ID())
return err
}
if transformedEvent == nil {
logger.Warnf("cloud event %q was not transformed", event.ID())
return nil
}
// Update the event with the transformed one.
event = *transformedEvent
}
// If send mode is Push, convert to Pub/Sub Push payload style.
if a.SendMode == converters.Push {
event = ConvertToPush(ctx, event)
}
// Apply CloudEvent override extensions to the outbound event.
for k, v := range a.extensions {
event.SetExtension(k, v)
}
// Send the event.
if r, err := a.outbound.Send(ctx, event); err != nil {
return err
} else if r != nil {
resp.RespondWith(200, r)
}
return nil
}
func (a *Adapter) convert(ctx context.Context, m transport.Message, err error) (*cloudevents.Event, error) {
ctx, r := observability.NewReporter(ctx, CodecObserved{o: reportReceive})
event, cerr := a.obsConvert(ctx, m, err)
if cerr != nil {
r.Error()
} else {
r.OK()
}
return event, cerr
}
func (a *Adapter) obsConvert(ctx context.Context, m transport.Message, err error) (*cloudevents.Event, error) {
logger := logging.FromContext(ctx)
logger.Debug("Converting event from transport.")
if msg, ok := m.(*cepubsub.Message); ok {
return converters.Convert(ctx, msg, a.SendMode)
}
return nil, err
}
func (a *Adapter) newPubSubClient(ctx context.Context) (cloudevents.Client, error) {
ctx, r := observability.NewReporter(ctx, CodecObserved{o: reportNewPubSubClient})
c, err := a.obsNewPubSubClient(ctx)
if err != nil {
r.Error()
} else {
r.OK()
}
return c, err
}
func (a *Adapter) obsNewPubSubClient(ctx context.Context) (cloudevents.Client, error) {
tOpts := []cepubsub.Option{
cepubsub.WithProjectID(a.Project),
cepubsub.WithTopicID(a.Topic),
cepubsub.WithSubscriptionID(a.Subscription),
}
// Make a pubsub transport for the CloudEvents client.
t, err := cepubsub.New(ctx, tOpts...)
if err != nil {
return nil, err
}
// Use the transport to make a new CloudEvents client.
return cloudevents.NewClient(t,
cloudevents.WithConverterFn(a.convert),
)
}
func (a *Adapter) newHTTPClient(ctx context.Context, target string) (cloudevents.Client, error) {
_, r := observability.NewReporter(ctx, CodecObserved{o: reportNewHTTPClient})
c, err := a.obsNewHTTPClient(ctx, target)
if err != nil {
r.Error()
} else {
r.OK()
}
return c, err
}
func (a *Adapter) obsNewHTTPClient(ctx context.Context, target string) (cloudevents.Client, error) {
tOpts := []http.Option{
cloudevents.WithTarget(target),
}
switch a.SendMode {
case converters.Binary, converters.Push:
tOpts = append(tOpts, cloudevents.WithBinaryEncoding())
case converters.Structured:
tOpts = append(tOpts, cloudevents.WithStructuredEncoding())
}
// Make an http transport for the CloudEvents client.
t, err := cloudevents.NewHTTPTransport(tOpts...)
if err != nil {
return nil, err
}
// Use the transport to make a new CloudEvents client.
return cloudevents.NewClient(t)
}