-
Notifications
You must be signed in to change notification settings - Fork 0
/
client_avro.go
66 lines (55 loc) · 1.71 KB
/
client_avro.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package schema_registry
import (
"context"
"fmt"
"github.com/amient/avro"
)
func (c *Client) RegisterAvroType(ctx context.Context, subject string, schema avro.Schema) (uint32, error) {
fp, err := schema.Fingerprint()
if err != nil {
return 0, err
}
if id, ok := c.cacheAvro[*fp]; ok {
return id, nil
}
refs, err := c.registerReferencedAvroSchemas(ctx, schema)
if err != nil {
return 0, fmt.Errorf("RegisterAvroType.registerReferencedAvroSchemas: %v", err)
}
id, err := c.registerSchemaUnderSubject(ctx, subject, schemaTypeProtobuf, schema.String(), refs)
if err != nil {
return 0, err
}
c.cacheAvro[*fp] = id
return id, nil
}
func (c *Client) registerReferencedAvroSchemas(_ context.Context, _ avro.Schema) (references, error) {
result := make(references, 0)
//TODO #4
return result, nil
}
func (c *Client) deserializeAvro(_ context.Context, schema *AvroSchema, data []byte) (*avro.GenericRecord, error) {
decodedRecord := avro.NewGenericRecord(schema.avro)
reader := avro.NewDatumReader(schema.avro)
if err := reader.Read(decodedRecord, avro.NewBinaryDecoder(data[5:])); err != nil {
return nil, err
}
return decodedRecord, nil
}
func (c *Client) deserializeAvroInto(_ context.Context, schema *AvroSchema, payload []byte, value avro.AvroRecord) error {
reader, err := avro.NewDatumProjector(value.Schema(), schema.avro)
if err != nil {
return err
}
if err := reader.Read(value, avro.NewBinaryDecoder(payload)); err != nil {
return err
}
return nil
}
func (c *Client) parseAvroSchema(_ context.Context, definition string, _ references, _ *string) (*AvroSchema, error) {
schema, err := avro.ParseSchema(definition)
if err != nil {
return nil, err
}
return &AvroSchema{avro: schema},nil
}