/
serde_json.go
66 lines (51 loc) · 1.81 KB
/
serde_json.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package confluent
import (
"context"
"fmt"
"github.com/xeipuuv/gojsonschema"
"github.com/benthosdev/benthos/v4/public/service"
)
func resolveJSONSchema(ctx context.Context, client *schemaRegistryClient, info SchemaInfo) (*gojsonschema.Schema, error) {
sl := gojsonschema.NewSchemaLoader()
if len(info.References) == 0 {
if err := sl.AddSchemas(); err != nil {
return nil, fmt.Errorf("failed to parse root schema: %w", err)
}
return sl.Compile(gojsonschema.NewStringLoader(info.Schema))
}
if err := client.WalkReferences(ctx, info.References, func(ctx context.Context, name string, info SchemaInfo) error {
return sl.AddSchemas(gojsonschema.NewStringLoader(info.Schema))
}); err != nil {
return nil, err
}
return sl.Compile(gojsonschema.NewStringLoader(info.Schema))
}
func (s *schemaRegistryEncoder) getJSONEncoder(ctx context.Context, info SchemaInfo) (schemaEncoder, error) {
return getJSONTranscoder(ctx, s.client, info)
}
func (s *schemaRegistryDecoder) getJSONDecoder(ctx context.Context, info SchemaInfo) (schemaDecoder, error) {
return getJSONTranscoder(ctx, s.client, info)
}
func getJSONTranscoder(ctx context.Context, cl *schemaRegistryClient, info SchemaInfo) (func(m *service.Message) error, error) {
sch, err := resolveJSONSchema(ctx, cl, info)
if err != nil {
return nil, err
}
// -- we only need to verify if the message is valid since the input format which benthos uses (json) is the same
// -- as the output format
return func(m *service.Message) error {
b, err := m.AsBytes()
if err != nil {
return err
}
// -- verify the json message against the schema
res, err := sch.Validate(gojsonschema.NewBytesLoader(b))
if err != nil {
return err
}
if !res.Valid() {
return fmt.Errorf("json message does not conform to schema: %v", res.Errors())
}
return nil
}, nil
}