Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
if meta.SchemaRegistryURL != "" {
k.logger.Infof("Schema registry URL '%s' provided. Configuring the Schema Registry client.", meta.SchemaRegistryURL)
k.srClient = srclient.CreateSchemaRegistryClient(meta.SchemaRegistryURL)
k.srClient.CodecCreationEnabled(true)
k.srClient.CodecJsonEnabled(!meta.UseAvroJSON)
// Empty password is a possibility
if meta.SchemaRegistryAPIKey != "" {
Expand Down
80 changes: 77 additions & 3 deletions common/component/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,18 +207,20 @@ func formatByteRecord(schemaID int, valueBytes []byte) []byte {
return recordValue
}

func assertValueSerialized(t *testing.T, act []byte, valJSON []byte, schema *srclient.Schema) {
require.NotEqual(t, act, valJSON)
func assertValueSerialized(t *testing.T, act []byte, expJSON []byte, schema *srclient.Schema) {
require.NotEqual(t, expJSON, act)

actSchemaID := int(binary.BigEndian.Uint32(act[1:5]))
codec, _ := goavro.NewCodecForStandardJSONFull(schema.Schema())
native, _, _ := codec.NativeFromBinary(act[5:])
actJSON, _ := codec.TextualFromNative(nil, native)
var actMap map[string]any
json.Unmarshal(actJSON, &actMap)
var expMap map[string]any
json.Unmarshal(expJSON, &expMap)

require.Equal(t, schema.ID(), actSchemaID)
require.Equal(t, testValue1, actMap)
require.Equal(t, expMap, actMap)
}

func TestSerializeValueCachingDisabled(t *testing.T) {
Expand Down Expand Up @@ -340,6 +342,78 @@ func TestSerializeValueCachingEnabled(t *testing.T) {
assertValueSerialized(t, act, valJSON, schema)
require.NoError(t, err)
})

t.Run("serialize with complex avro schema", func(t *testing.T) {
testSchemaOcr := `{
"type": "record",
"name": "ocr_requested",
"namespace": "foo.cmd.image_processing",
"fields": [
{
"name": "id",
"type": "string",
"doc": "Idempotency key"
},
{
"name": "document_metadata",
"type": {
"type": "record",
"name": "DocumentMetadata",
"fields": [
{
"name": "content_type",
"type": "string"
},
{
"name": "original_filename",
"type": ["null", "string"],
"default": null
},
{
"name": "source",
"type": {
"type": "enum",
"name": "DocumentSource",
"symbols": [
"Unknown",
"Import",
"PatientUpload",
"UserUpload",
"UserUploadFax"
]
}
},
{
"name": "type",
"type": {
"type": "enum",
"name": "DocumentType",
"symbols": ["Unknown", "InsuranceCard", "MiscReport"]
}
}
]
}
},
{
"name": "s3_path",
"type": {
"type": "record",
"name": "S3Path",
"fields": [
{ "name": "bucket", "type": "string" },
{ "name": "key", "type": "string" }
]
}
}
]
}`
schemaOcr, _ := registry.CreateSchema("my-ocr-topic-value", testSchemaOcr, srclient.Avro)
valueOcr := map[string]any{"id": "123", "document_metadata": map[string]any{"content_type": "application/pdf", "original_filename": nil, "source": "UserUpload", "type": "InsuranceCard"}, "s3_path": map[string]any{"bucket": "test-bucket", "key": "test-key"}}
valJSONOcr, _ := json.Marshal(valueOcr)
act, err := k.SerializeValue("my-ocr-topic", valJSONOcr, map[string]string{"valueSchemaType": "Avro"})
assertValueSerialized(t, act, valJSONOcr, schemaOcr)
require.NoError(t, err)
})
}

func TestLatestSchemaCaching(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ require (
github.com/labd/commercetools-go-sdk v1.3.1
github.com/lestrrat-go/httprc v1.0.5
github.com/lestrrat-go/jwx/v2 v2.0.21
github.com/linkedin/goavro/v2 v2.13.1
github.com/linkedin/goavro/v2 v2.14.0
github.com/machinebox/graphql v0.2.2
github.com/matoous/go-nanoid/v2 v2.0.0
github.com/microsoft/go-mssqldb v1.6.0
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1186,8 +1186,9 @@ github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f/go.mod
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPpsiPUEh0zFL1Snz4crhMlBe60PYxRHr5oFF3rRYg0=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/linkedin/goavro/v2 v2.13.1 h1:4qZ5M0QzQFDRqccsroJlgOJznqAS/TpdvXg55h429+I=
github.com/linkedin/goavro/v2 v2.13.1/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
github.com/linkedin/goavro/v2 v2.14.0 h1:aNO/js65U+Mwq4yB5f1h01c3wiM458qtRad1DN0CMUI=
github.com/linkedin/goavro/v2 v2.14.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
Expand Down
2 changes: 1 addition & 1 deletion tests/certification/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ require (
github.com/lestrrat-go/httprc v1.0.5 // indirect
github.com/lestrrat-go/iter v1.0.2 // indirect
github.com/lestrrat-go/option v1.0.1 // indirect
github.com/linkedin/goavro/v2 v2.13.1 // indirect
github.com/linkedin/goavro/v2 v2.14.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand Down
3 changes: 2 additions & 1 deletion tests/certification/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1009,8 +1009,9 @@ github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f/go.mod
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPpsiPUEh0zFL1Snz4crhMlBe60PYxRHr5oFF3rRYg0=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/linkedin/goavro/v2 v2.13.1 h1:4qZ5M0QzQFDRqccsroJlgOJznqAS/TpdvXg55h429+I=
github.com/linkedin/goavro/v2 v2.13.1/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
github.com/linkedin/goavro/v2 v2.14.0 h1:aNO/js65U+Mwq4yB5f1h01c3wiM458qtRad1DN0CMUI=
github.com/linkedin/goavro/v2 v2.14.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
Expand Down
Loading