Skip to content

Commit

Permalink
SR: Encoder and decoder (#1096)
Browse files Browse the repository at this point in the history
* set compatibility level of schema if new subject is created

* implement schema registry encoder and decoder

* make fake schema registry server routing more readable
  • Loading branch information
lovromazgon committed Jul 14, 2023
1 parent 2e102a9 commit d036f1f
Show file tree
Hide file tree
Showing 9 changed files with 521 additions and 75 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/jackc/pgx/v5 v5.4.2
github.com/jinzhu/copier v0.3.5
github.com/jpillora/backoff v1.0.0
github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230605121418-82e53767f0ac
github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230630140346-bb9ce3f90f4a
github.com/matryer/is v1.4.1
github.com/modern-go/reflect2 v1.0.2
github.com/piotrkowalczuk/promgrpc/v4 v4.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,8 @@ github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.4 h1:SO9z7FRPzA03QhHKJrH5BXA6HU1rS4V2nIVrrNC1iYk=
github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230605121418-82e53767f0ac h1:f0RCTaThW3/D5xByrGxfvR3o95UZsrkXFVkKSY+s89w=
github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230605121418-82e53767f0ac/go.mod h1:iz9EnaFViALD6sVqxYHs8BPC0ZEQtfhTpN7SG5b0Nqo=
github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230630140346-bb9ce3f90f4a h1:TrxQUmJBE1pZsnTW3rqG5Fsx3Xz0wGm5xgqLDV/mMGk=
github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230630140346-bb9ce3f90f4a/go.mod h1:iz9EnaFViALD6sVqxYHs8BPC0ZEQtfhTpN7SG5b0Nqo=
github.com/matryer/is v1.4.1 h1:55ehd8zaGABKLXQUe2awZ99BD/PTc2ls+KV/dXphgEQ=
github.com/matryer/is v1.4.1/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
Expand Down
17 changes: 16 additions & 1 deletion pkg/processor/schemaregistry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,22 @@ func (c *Client) CreateSchema(ctx context.Context, subject string, schema sr.Sch
ss, err := c.cache.GetBySubjectText(subject, schema.Schema, func() (sr.SubjectSchema, error) {
logEvent.Msg("schema cache miss")
logEvent = nil // disable output for hit
return c.client.CreateSchema(ctx, subject, schema)

// Check if the subject exists. Ignore the error as this is not critical
// for creating a schema, we assume the subject exists in case of an error.
versions, _ := c.client.SubjectVersions(ctx, subject, sr.ShowDeleted)
subjectExists := len(versions) > 0

ss, err := c.client.CreateSchema(ctx, subject, schema)
if err != nil {
return ss, err
}

if !subjectExists {
// if we are created the schema we need to disable compatibility checks
c.client.SetCompatibilityLevel(ctx, sr.CompatNone, subject)
}
return ss, nil
})
if err != nil {
return sr.SubjectSchema{}, cerrors.Errorf("failed to create schema with subject %q: %w", subject, err)
Expand Down
158 changes: 90 additions & 68 deletions pkg/processor/schemaregistry/client_fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ func (fr *fakeRegistry) findBySubjectVersion(subject string, version int) (sr.Su

// fakeServer is a fake schema registry server.
type fakeServer struct {
mux http.ServeMux
fr fakeRegistry
logf func(format string, args ...any)
}
Expand All @@ -210,43 +209,75 @@ func newFakeServer(logf func(format string, args ...any)) *fakeServer {
if logf != nil {
fs.logf = logf
}
fs.mux.Handle("/schemas/ids/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tokens := strings.Split(r.URL.EscapedPath(), "/")
switch {
case len(tokens) == 4:
fs.schemaByID(w, r)
case len(tokens) == 5 && tokens[4] == "versions":
fs.subjectVersionsByID(w, r)
default:
http.NotFound(w, r)
}
}))
fs.mux.Handle("/subjects/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tokens := strings.Split(r.URL.EscapedPath(), "/")
switch {
case len(tokens) == 4 && tokens[3] == "versions":
fs.createSchema(w, r)
case len(tokens) == 5 && tokens[3] == "versions":
fs.schemaBySubjectVersion(w, r)
default:
http.NotFound(w, r)
}
}))
return fs
}

func (fs *fakeServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
fs.logf("%s %s", r.Method, r.RequestURI)
fs.mux.ServeHTTP(w, r)
}

func (fs *fakeServer) createSchema(w http.ResponseWriter, r *http.Request) {
// POST /subjects/{subject}/versions => returns ID
if r.Method != http.MethodPost {
var (
id int
subject string
version int
)
p := r.URL.Path
switch {
case fs.match(p, "/schemas/ids/+", &id) && r.Method == http.MethodGet:
fs.schemaByID(w, r, id)
case fs.match(p, "/schemas/ids/+/versions", &id) && r.Method == http.MethodGet:
fs.subjectVersionsByID(w, r, id)
case fs.match(p, "/subjects/+/versions", &subject) && r.Method == http.MethodPost:
fs.createSchema(w, r, subject)
case fs.match(p, "/subjects/+/versions/+", &subject, &version) && r.Method == http.MethodGet:
fs.schemaBySubjectVersion(w, r, subject, version)
case fs.match(p, "/config/+", &subject) && r.Method == http.MethodPut:
fs.updateConfig(w, r)
default:
http.NotFound(w, r)
return
}
}

// match reports whether path matches the given pattern, which is a
// path with '+' wildcards wherever you want to use a parameter. Path
// parameters are assigned to the pointers in vars (len(vars) must be
// the number of wildcards), which must be of type *string or *int.
// Source: https://github.com/benhoyt/go-routing/blob/master/match/route.go
func (*fakeServer) match(path, pattern string, vars ...interface{}) bool {
for ; pattern != "" && path != ""; pattern = pattern[1:] {
switch pattern[0] {
case '+':
// '+' matches till next slash in path
slash := strings.IndexByte(path, '/')
if slash < 0 {
slash = len(path)
}
segment := path[:slash]
path = path[slash:]
switch p := vars[0].(type) {
case *string:
*p = segment
case *int:
n, err := strconv.Atoi(segment)
if err != nil || n < 0 {
return false
}
*p = n
default:
panic("vars must be *string or *int")
}
vars = vars[1:]
case path[0]:
// non-'+' pattern byte must match path byte
path = path[1:]
default:
return false
}
}
return path == "" && pattern == ""
}

func (fs *fakeServer) createSchema(w http.ResponseWriter, r *http.Request, subject string) {
// POST /subjects/{subject}/versions => returns ID
defer r.Body.Close()
var s sr.Schema
err := json.NewDecoder(r.Body).Decode(&s)
Expand All @@ -255,47 +286,22 @@ func (fs *fakeServer) createSchema(w http.ResponseWriter, r *http.Request) {
return
}

tokens := strings.Split(r.URL.EscapedPath(), "/")
ss := fs.fr.CreateSchema(tokens[2], s)
ss := fs.fr.CreateSchema(subject, s)
fs.json(w, map[string]any{"id": ss.ID})
}

func (fs *fakeServer) schemaBySubjectVersion(w http.ResponseWriter, r *http.Request) {
func (fs *fakeServer) schemaBySubjectVersion(w http.ResponseWriter, _ *http.Request, subject string, version int) {
// GET /subjects/{subject}/versions/{version}
if r.Method != http.MethodGet {
http.NotFound(w, r)
return
}

tokens := strings.Split(r.URL.EscapedPath(), "/")
version, err := strconv.Atoi(tokens[4])
if err != nil {
fs.error(w, http.StatusInternalServerError, cerrors.Errorf("invalid schema version: %w", err))
return
}

ss, ok := fs.fr.SchemaBySubjectVersion(tokens[2], version)
ss, ok := fs.fr.SchemaBySubjectVersion(subject, version)
if !ok {
fs.errorWithCode(w, http.StatusNotFound, errorCodeSubjectNotFound, cerrors.New("subject not found"))
return
}
fs.json(w, ss)
}

func (fs *fakeServer) schemaByID(w http.ResponseWriter, r *http.Request) {
func (fs *fakeServer) schemaByID(w http.ResponseWriter, _ *http.Request, id int) {
// GET /schemas/ids/{id}
if r.Method != http.MethodGet {
http.NotFound(w, r)
return
}

tokens := strings.Split(r.URL.EscapedPath(), "/")
id, err := strconv.Atoi(tokens[3])
if err != nil {
fs.error(w, http.StatusInternalServerError, cerrors.Errorf("invalid schema ID: %w", err))
return
}

s, ok := fs.fr.SchemaByID(id)
if !ok {
fs.errorWithCode(w, http.StatusNotFound, errorCodeSchemaNotFound, cerrors.New("schema not found"))
Expand All @@ -304,22 +310,38 @@ func (fs *fakeServer) schemaByID(w http.ResponseWriter, r *http.Request) {
fs.json(w, s)
}

func (fs *fakeServer) subjectVersionsByID(w http.ResponseWriter, r *http.Request) {
func (fs *fakeServer) subjectVersionsByID(w http.ResponseWriter, _ *http.Request, id int) {
// GET /schemas/ids/{id}/versions
if r.Method != http.MethodGet {
http.NotFound(w, r)
return
}
sss := fs.fr.SubjectVersionsByID(id)
fs.json(w, sss)
}

tokens := strings.Split(r.URL.EscapedPath(), "/")
id, err := strconv.Atoi(tokens[3])
func (fs *fakeServer) updateConfig(w http.ResponseWriter, r *http.Request) {
// PUT /config/{subject}
defer r.Body.Close()
var c struct {
Compatibility string `json:"compatibility"`
}
err := json.NewDecoder(r.Body).Decode(&c)
if err != nil {
fs.error(w, http.StatusInternalServerError, cerrors.Errorf("invalid schema ID: %w", err))
fs.error(w, http.StatusInternalServerError, err)
return
}

sss := fs.fr.SubjectVersionsByID(id)
fs.json(w, sss)
valid := map[string]bool{
"BACKWARD": true,
"BACKWARD_TRANSITIVE": true,
"FORWARD": true,
"FORWARD_TRANSITIVE": true,
"FULL": true,
"FULL_TRANSITIVE": true,
"NONE": true,
}[c.Compatibility]
if !valid {
fs.errorWithCode(w, 42203, http.StatusUnprocessableEntity, cerrors.New("invalid compatibility level"))
return
}
fs.json(w, c)
}

func (fs *fakeServer) json(w http.ResponseWriter, v any) {
Expand Down
18 changes: 15 additions & 3 deletions pkg/processor/schemaregistry/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,25 +190,37 @@ func TestClient_CacheHit(t *testing.T) {
})
is.NoErr(err)

is.Equal(len(rtr.Records()), 3)
is.Equal(len(rtr.Records()), 5)
rtr.AssertRecord(is, 0,
assertMethod("GET"),
assertRequestURI("/subjects/test-cache-hit/versions?deleted=true"),
assertResponseStatus(404),
assertError(nil),
)
rtr.AssertRecord(is, 1,
assertMethod("POST"),
assertRequestURI("/subjects/test-cache-hit/versions"),
assertResponseStatus(200),
assertError(nil),
)
rtr.AssertRecord(is, 1,
rtr.AssertRecord(is, 2,
assertMethod("GET"),
assertRequestURI(fmt.Sprintf("/schemas/ids/%d/versions", want.ID)),
assertResponseStatus(200),
assertError(nil),
)
rtr.AssertRecord(is, 2,
rtr.AssertRecord(is, 3,
assertMethod("GET"),
assertRequestURI("/subjects/test-cache-hit/versions/1"),
assertResponseStatus(200),
assertError(nil),
)
rtr.AssertRecord(is, 4,
assertMethod("PUT"),
assertRequestURI("/config/test-cache-hit?defaultToGlobal=true"),
assertResponseStatus(200),
assertError(nil),
)

rtr.Clear() // clear requests before subtests

Expand Down
90 changes: 90 additions & 0 deletions pkg/processor/schemaregistry/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schemaregistry

import (
"context"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/record"
"github.com/lovromazgon/franz-go/pkg/sr"
)

type Decoder struct {
client *Client
serde *sr.Serde
logger log.CtxLogger
}

func NewDecoder(client *Client, logger log.CtxLogger, serde *sr.Serde) *Decoder {
return &Decoder{
client: client,
serde: serde,
logger: logger.WithComponent("schemaregistry.Decoder"),
}
}

func (d *Decoder) Decode(ctx context.Context, b record.RawData) (record.StructuredData, error) {
var out record.StructuredData
err := d.serde.Decode(b.Raw, &out)
if cerrors.Is(err, sr.ErrNotRegistered) {
err = d.findAndRegisterSchema(ctx, b)
if err != nil {
return nil, err
}
// retry decoding
err = d.serde.Decode(b.Raw, &out)
}
if err != nil {
return nil, cerrors.Errorf("failed to decode raw data: %w", err)
}

return out, nil
}

func (d *Decoder) findAndRegisterSchema(ctx context.Context, b record.RawData) error {
id, _, _ := d.serde.Header().DecodeID(b.Raw) // we know this won't throw an error since Decode didn't return ErrBadHeader
s, err := d.client.SchemaByID(ctx, id)
if err != nil {
return cerrors.Errorf("failed to get schema: %w", err)
}
sf, ok := DefaultSchemaFactories[s.Type]
if !ok {
return cerrors.Errorf("unknown schema type %q (%d)", s.Type.String(), s.Type)
}
schema, err := sf.Parse(s.Schema)
if err != nil {
return cerrors.Errorf("failed to parse schema: %w", err)
}

d.serde.Register(
id,
record.StructuredData{},
sr.EncodeFn(encodeFn(schema, sr.SubjectSchema{ID: id})),
sr.DecodeFn(decodeFn(schema, sr.SubjectSchema{ID: id})),
)
return nil
}

func decodeFn(schema Schema, ss sr.SubjectSchema) func(b []byte, a any) error {
return func(b []byte, a any) error {
err := schema.Unmarshal(b, a)
if err != nil {
return cerrors.Errorf("failed to unmarshal data with schema (ID: %v, subject: %v, version: %v): %w", ss.ID, ss.Subject, ss.Version, err)
}
return nil
}
}

0 comments on commit d036f1f

Please sign in to comment.