Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

errorType:'READER_FIELD_MISSING_DEFAULT_VALUE' while checking compatibility #3144

Open
ThinhLe30 opened this issue Jun 6, 2024 · 0 comments

Comments

@ThinhLe30
Copy link

In the official document with BACKWARD compatibility, we can add an optional field to an old schema, which is ignored by default value,
here is my code for the client to check schema compatibility:

  • client.go
package compatibility

import (
	"bytes"
	"context"
	"fmt"
	"github.com/hamba/avro/v2"
	"github.com/hamba/avro/v2/registry"
	jsoniter "github.com/json-iterator/go"
	"io"
	"net"
	"net/http"
	"net/url"
	"path"
	"strconv"
	"strings"
	"time"
)

// credentials are used to store the basic auth credentials.
type credentials struct {
	username string
	password string
}

// Client is a client for the schema registry.
type Client struct {
	client *http.Client
	base   *url.URL
	creds  credentials
}

// Error is returned by the registry when there is an error.
type Error struct {
	StatusCode int    `json:"-"`
	Code       int    `json:"error_code"`
	Message    string `json:"message"`
}

type ClientFunc func(*Client)

// Request is the request to test compatibility.
type Request struct {
	Schema     string                     `json:"schema"`
	References []registry.SchemaReference `json:"references"`
}

// Response is the response from the compatibility check.
type Response struct {
	IsCompatible bool     `json:"is_compatible"`
	Messages     []string `json:"messages"`
}

// defaultClient is the default HTTP client.
var defaultClient = &http.Client{
	Transport: &http.Transport{
		Proxy: http.ProxyFromEnvironment,
		DialContext: (&net.Dialer{
			Timeout:   15 * time.Second,
			KeepAlive: 90 * time.Second,
		}).DialContext,
		TLSHandshakeTimeout: 3 * time.Second,
		IdleConnTimeout:     90 * time.Second,
	},
	Timeout: 10 * time.Second,
}

const contentType = "application/vnd.schemaregistry.v1+json"

// WithBasicAuth sets the basic auth credentials for the client.
func WithBasicAuth(username, password string) ClientFunc {
	return func(c *Client) {
		c.creds = credentials{username: username, password: password}
	}
}

// WithHTTPClient sets the HTTP client for the client.
func WithHTTPClient(client *http.Client) ClientFunc {
	return func(c *Client) {
		c.client = client
	}
}

// NewClient creates a new schema registry client.
func NewClient(baseURL string, opts ...ClientFunc) (*Client, error) {
	u, err := url.Parse(baseURL)
	if err != nil {
		return nil, err
	}
	if !strings.HasSuffix(u.Path, "/") {
		u.Path += "/"
	}

	c := &Client{
		client: defaultClient,
		base:   u,
	}

	for _, opt := range opts {
		opt(c)
	}

	return c, nil
}

// request performs a request to the schema registry.
func (c *Client) request(ctx context.Context, method, path string, in, out any) error {
	var body io.Reader
	if in != nil {
		b, _ := jsoniter.Marshal(in)
		body = bytes.NewReader(b)
	}

	// These errors are not possible as we have already parse the base URL.
	u, _ := c.base.Parse(path)
	req, _ := http.NewRequestWithContext(ctx, method, u.String(), body)
	req.Header.Set("Content-Type", contentType)

	if len(c.creds.username) > 0 || len(c.creds.password) > 0 {
		req.SetBasicAuth(c.creds.username, c.creds.password)
	}

	resp, err := c.client.Do(req)
	if err != nil {
		return fmt.Errorf("could not perform request: %w", err)
	}
	defer func() {
		_, _ = io.Copy(io.Discard, resp.Body)
		_ = resp.Body.Close()
	}()

	if resp.StatusCode >= http.StatusBadRequest {
		err := Error{StatusCode: resp.StatusCode}
		_ = jsoniter.NewDecoder(resp.Body).Decode(&err)
		return err
	}

	if out != nil {
		return jsoniter.NewDecoder(resp.Body).Decode(out)
	}
	return nil
}

// TestCompatibility tests the compatibility of a schema with the schema registry.
//
//   - The schema is tested against the subject and version provided.
//
//   - If the schema is compatible with the subject and version, the response will be true.
//
//   - If the schema is not compatible with the subject and version, the response will be false
//     and the reason will be provided in the messages.
func (c *Client) TestCompatibility(
	ctx context.Context,
	subject,
	version string,
	schema avro.Schema,
	references []registry.SchemaReference,
	verbose bool,
) (response Response, err error) {
	p := path.Join("compatibility", "subjects", subject, "versions", version)
	if verbose {
		p += "?verbose=true"
	}
	requestBody := Request{
		Schema:     schema.String(),
		References: references,
	}
	if err := c.request(ctx, http.MethodPost, p, requestBody, &response); err != nil {
		return response, err
	}
	return response, nil
}

// Error returns the error message.
func (e Error) Error() string {
	if e.Message != "" {
		return e.Message
	}
	return "registry error: " + strconv.Itoa(e.StatusCode)
}
  • before suit test setup:
package compatibility

import (
	"github.com/hamba/avro/v2/registry"
	"github.com/testcontainers/testcontainers-go/network"
	"source.vtvlive.vn/cellutions/testutils/containers/common"
	"source.vtvlive.vn/cellutions/testutils/containers/kafka"
	"source.vtvlive.vn/cellutions/testutils/containers/schemaregistry"
	"source.vtvlive.vn/cellutions/testutils/containers/zookeeper"
	"testing"
	"time"

	"context"
	"github.com/docker/docker/client"
	. "github.com/onsi/ginkgo/v2"
	. "github.com/onsi/gomega"
)

var dockerClient *client.Client
var logFollower *common.LogFollower
var registryClient *registry.Client
var networkName string
var compatibilityClient *Client

func TestCompatibility(t *testing.T) {
	RegisterFailHandler(Fail)
	RunSpecs(t, "Compatibility Suite")
}

var _ = BeforeSuite(func() {
	ctx, _ := context.WithTimeout(context.Background(), 10*time.Minute)
	nw, err := network.New(ctx)
	Expect(err).ToNot(HaveOccurred())
	DeferCleanup(nw.Remove, context.Background())
	networkName = nw.Name

	dockerClient, err = common.NewDockerClient(ctx)
	Expect(err).ToNot(HaveOccurred())
	DeferCleanup(dockerClient.Close)

	logFollower, err = common.NewLogFollower(dockerClient, 2)
	Expect(err).ToNot(HaveOccurred())
	DeferCleanup(logFollower.Close)

	zookeeperC, err := zookeeper.ZooKeeperContainer(ctx, dockerClient, logFollower, networkName)
	Expect(err).ToNot(HaveOccurred())
	DeferCleanup(zookeeperC.Terminate, context.Background())

	kafkaC, err := kafka.NewKafkaCluster(
		ctx, dockerClient, logFollower, networkName,
		zookeeperC.InternalEndpoint(zookeeper.ZookeeperClientPort, ""), 1)
	Expect(err).ToNot(HaveOccurred())
	DeferCleanup(kafkaC.Terminate, context.Background())

	brokerEndpoint, err := kafkaC.InternalEndpoint(0, kafka.KafkaBrokerPort, "")
	Expect(err).ToNot(HaveOccurred())
	schemaRegistryC, err := schemaregistry.SchemaRegistryContainer(ctx, dockerClient, logFollower, networkName, brokerEndpoint)
	Expect(err).ToNot(HaveOccurred())
	DeferCleanup(schemaRegistryC.Terminate, context.Background())

	srEndpoint, err := schemaRegistryC.ExternalEndpoint("8085/tcp", "http")
	Expect(err).ToNot(HaveOccurred())
	registryClient, err = registry.NewClient(srEndpoint)
	Expect(err).ToNot(HaveOccurred())
	compatibilityClient, err = NewClient(srEndpoint)

	err = registryClient.SetGlobalCompatibilityLevel(ctx, "BACKWARD")
	Expect(err).ToNot(HaveOccurred())
})
  • client_test.go
package compatibility

import (
	"context"
	"github.com/brianvoe/gofakeit/v6"
	"github.com/hamba/avro/v2"
	"github.com/hamba/avro/v2/registry"
	. "github.com/onsi/ginkgo/v2"
	. "github.com/onsi/gomega"
	"source.vtvlive.vn/cellutions/cases"
)

var _ = Describe("Client", Ordered, func() {
	var subject string
	var references []registry.SchemaReference
	Describe("TestCompatibility", func() {
		When("the old schema does not use references", func() {
			BeforeAll(func() {
				subject = cases.ToKebab(gofakeit.LoremIpsumSentence(4))
				references = make([]registry.SchemaReference, 0)
				oldSchema := recordSchema(
					"seenow.dev.temp",
					`
						{
						  "type": "record",
						  "name": "SimpleRecord",
						  "fields": [
							{
							  "name": "name",
							  "type": "string"
							}
						  ]
						}`)
				_, _, err := registryClient.CreateSchema(context.Background(), subject, oldSchema.String())
				Expect(err).ToNot(HaveOccurred())
			})
			DescribeTable("should works",
				func(newSchema avro.Schema, isCompatible bool) {
					ctx := context.Background()
					res, err := compatibilityClient.TestCompatibility(ctx, subject, "latest", newSchema, references, true)
					Expect(err).ToNot(HaveOccurred())
					Expect(res.IsCompatible).To(Equal(isCompatible))
				},
				Entry("add an optional field to a record",
					recordSchema("seenow.dev.temp",
						`
						{
						  "type": "record",
						  "name": "SimpleRecord",
						  "fields": [
							{
							  "name": "name",
							  "type": "string"
							},
							{
							  "name": "age",
							  "type": ["int", "null"],
                              "default": 0
							}
						  ]
						}`), false,
				),
				Entry("delete a field from a record", Skip),
				Entry("add non-optional field to a record", Skip),
			)
		})

		When("the old schema is a union referencing records", func() {
			DescribeTable("should works",
				func() {

				},
				Entry("add an optional field to a record", Skip),
				Entry("delete a field from a record", Skip),
				Entry("add non-optional field to a record", Skip),
			)
		})
	})
})

func recordSchema(namespace, schemaString string) *avro.RecordSchema {
	GinkgoHelper()
	defer GinkgoRecover()
	schema, err := avro.ParseWithCache(schemaString, namespace, avro.DefaultSchemaCache)
	Expect(err).ToNot(HaveOccurred())
	return schema.(*avro.RecordSchema)
}

Im expect IsCompatible is return to true but the value is false and the error message is:

0 = {string} "{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 'age' at path '/fields/1' in the new schema has no default value and is missing in the old schema', additionalInfo:'age'}"
1 = {string} "{oldSchemaVersion: 1}"
2 = {string} "{oldSchema: '{"type":"record","name":"SimpleRecord","namespace":"seenow.dev.temp","fields":[{"name":"name","type":"string"}]}'}"
3 = {string} "{compatibility: 'BACKWARD'}"

Am I wrong?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant