-
Notifications
You must be signed in to change notification settings - Fork 42
/
encode.go
235 lines (195 loc) · 7.55 KB
/
encode.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:generate paramgen -output=encode_paramgen.go encodeConfig
//go:generate mockgen -source encode.go -destination=mock_encoder.go -package=avro -mock_names=encoder=MockEncoder . encoder
package avro
import (
"context"
"crypto/tls"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin/impl/avro/schemaregistry"
"github.com/goccy/go-json"
"github.com/lovromazgon/franz-go/pkg/sr"
)
type encoder interface {
Encode(ctx context.Context, sd opencdc.StructuredData) (opencdc.RawData, error)
}
type encodeConfig struct {
// The field that will be encoded.
//
// For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields).
Field string `json:"field" default:".Payload.After"`
// URL of the schema registry (e.g. http://localhost:8085)
URL string `json:"url" validate:"required"`
Schema schemaConfig `json:"schema"`
Auth authConfig `json:"auth"`
TLS tlsConfig `json:"tls"`
fieldResolver sdk.ReferenceResolver
}
func (c encodeConfig) ClientOptions() []sr.Opt {
clientOpts := []sr.Opt{sr.URLs(c.URL), sr.Normalize()}
if c.Auth.Username != "" && c.Auth.Password != "" {
clientOpts = append(clientOpts, sr.BasicAuth(c.Auth.Username, c.Auth.Password))
}
if c.TLS.tlsClientCert != nil {
tlsCfg := &tls.Config{
Certificates: []tls.Certificate{*c.TLS.tlsClientCert},
MinVersion: tls.VersionTLS12,
}
if c.TLS.tlsCACert != nil {
tlsCfg.RootCAs = c.TLS.tlsCACert
}
clientOpts = append(clientOpts, sr.DialTLSConfig(tlsCfg))
}
return clientOpts
}
func parseEncodeConfig(ctx context.Context, m map[string]string) (encodeConfig, error) {
cfg := encodeConfig{}
err := sdk.ParseConfig(ctx, m, &cfg, cfg.Parameters())
if err != nil {
return encodeConfig{}, err
}
err = cfg.Auth.validate()
if err != nil {
return encodeConfig{}, cerrors.Errorf("invalid basic auth: %w", err)
}
err = cfg.TLS.parse()
if err != nil {
return encodeConfig{}, cerrors.Errorf("failed parsing TLS: %w", err)
}
err = cfg.Schema.parse()
if err != nil {
return encodeConfig{}, cerrors.Errorf("failed parsing schema strategy: %w", err)
}
// Parse target field
rr, err := sdk.NewReferenceResolver(cfg.Field)
if err != nil {
return encodeConfig{}, cerrors.Errorf("failed parsing target field: %w", err)
}
cfg.fieldResolver = rr
return cfg, nil
}
type encodeProcessor struct {
sdk.UnimplementedProcessor
logger log.CtxLogger
cfg encodeConfig
encoder encoder
}
func NewEncodeProcessor(logger log.CtxLogger) sdk.Processor {
return &encodeProcessor{logger: logger}
}
func (p *encodeProcessor) Specification() (sdk.Specification, error) {
return sdk.Specification{
Name: "avro.encode",
Summary: "Encodes a record's field into the Avro format.",
Description: `The processor takes a record's field and encodes it using a schema into the [Avro format](https://avro.apache.org/).
It provides two strategies for determining the schema:
* **preRegistered** (recommended)
This strategy downloads an existing schema from the schema registry and uses it to encode the record.
This requires the schema to already be registered in the schema registry. The schema is downloaded
only once and cached locally.
* **autoRegister** (for development purposes)
This strategy infers the schema by inspecting the structured data and registers it in the schema
registry. If the record schema is known in advance it's recommended to use the preRegistered strategy
and manually register the schema, as this strategy comes with limitations.
The strategy uses reflection to traverse the structured data of each record and determine the type
of each field. If a specific field is set to nil the processor won't have enough information to determine
the type and will default to a nullable string. Because of this it is not guaranteed that two records
with the same structure produce the same schema or even a backwards compatible schema. The processor
registers each inferred schema in the schema registry with the same subject, therefore the schema compatibility
checks need to be disabled for this schema to prevent failures. If the schema subject does not exist before running
this processor, it will automatically set the correct compatibility settings in the schema registry.
This processor is the counterpart to [` + "`avro.decode`" + `](/docs/processors/builtin/avro.decode).`,
Version: "v0.1.0",
Author: "Meroxa, Inc.",
Parameters: encodeConfig{}.Parameters(),
}, nil
}
func (p *encodeProcessor) Configure(ctx context.Context, m map[string]string) error {
cfg, err := parseEncodeConfig(ctx, m)
if err != nil {
return cerrors.Errorf("invalid config: %w", err)
}
p.cfg = cfg
return nil
}
func (p *encodeProcessor) Open(context.Context) error {
client, err := schemaregistry.NewClient(p.logger, p.cfg.ClientOptions()...)
if err != nil {
return cerrors.Errorf("could not create schema registry client: %w", err)
}
p.encoder = schemaregistry.NewEncoder(client, p.logger, &sr.Serde{}, p.cfg.Schema.strategy)
return nil
}
func (p *encodeProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord {
out := make([]sdk.ProcessedRecord, 0, len(records))
for _, rec := range records {
proc, err := p.processRecord(ctx, rec)
if err != nil {
return append(out, sdk.ErrorRecord{Error: err})
}
out = append(out, proc)
}
return out
}
func (p *encodeProcessor) processRecord(ctx context.Context, rec opencdc.Record) (sdk.ProcessedRecord, error) {
field, err := p.cfg.fieldResolver.Resolve(&rec)
if err != nil {
return nil, cerrors.Errorf("failed resolving field: %w", err)
}
data, err := p.structuredData(field.Get())
if err != nil {
return nil, cerrors.Errorf("failed getting structured data: %w", err)
}
rd, err := p.encoder.Encode(ctx, data)
if err != nil {
return nil, cerrors.Errorf("failed encoding data: %w", err)
}
err = field.Set(rd)
if err != nil {
return nil, cerrors.Errorf("failed setting encoded value into the record: %w", err)
}
return sdk.SingleRecord(rec), nil
}
func (p *encodeProcessor) Teardown(context.Context) error {
return nil
}
func (p *encodeProcessor) structuredData(data any) (opencdc.StructuredData, error) {
var sd opencdc.StructuredData
switch v := data.(type) {
case opencdc.RawData:
err := json.Unmarshal(v.Bytes(), &sd)
if err != nil {
return nil, cerrors.Errorf("failed unmarshalling JSON from raw data: %w", err)
}
case string:
err := json.Unmarshal([]byte(v), &sd)
if err != nil {
return nil, cerrors.Errorf("failed unmarshalling JSON from raw data: %w", err)
}
case []byte:
err := json.Unmarshal(v, &sd)
if err != nil {
return nil, cerrors.Errorf("failed unmarshalling JSON from raw data: %w", err)
}
case opencdc.StructuredData:
sd = v
default:
return nil, cerrors.Errorf("unexpected data type %T", v)
}
return sd, nil
}