Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Managing schema registry's schema references #49

Conversation

FrancescoPessina
Copy link
Contributor

This PR enables Avro schema parsing with new schema registry's schema references feature.

@Strech
Copy link
Owner

Strech commented Nov 4, 2020

Hi @FrancescoPessina thanks a lot for your PR, I appreciate it. I've checked the proposed solution and I think that I discover a bit more. First of all, to save space reference in the target schema will not be de-referenced, it means that if you apply the same approach as I did you will inject it in and it can/will potentially be registered as a new schema version with a de-referenced schema.

I think since a schema reference is introduced and Avrora already has something similar we should come to a common behavior that will work for both cases.

As a guess, we can keep a reference, but internally store schema as de-referenced and in case if we can't have a reference (old confluent) we can inject it.

@FrancescoPessina
Copy link
Contributor Author

@Strech thank you for your reply :) I'm sorry but I don't understand what are you proposing. The approach in this PR takes the schema with the reference and de-references it only to parse the message read from Kafka.
If a schema does not contain a reference, the approach is the same as before.

How would you change this approach?

@Strech
Copy link
Owner

Strech commented Nov 5, 2020

@FrancescoPessina The issue will happen with such sequence:

  1. You load schema with references from the registry
  2. You decode message, so far so good
  3. You encode message – auto-registration will happen and schema is different from original (nested schemas were injected)
  4. New ambigous version will be registered, and it's the issue

My proposal is to keep track of references without de-reference.

  1. We have JSON schema and we also have erlavro schema storage. To be able to decode message in fact you need to have all types inside the erlavro, so I suggest to stuff it instead of altering the original JSON schema
  2. We would need to slightly adjust registering to keep in mind the original JSON schema and nested schemas (should they also be registered?)

@FrancescoPessina
Copy link
Contributor Author

FrancescoPessina commented Nov 5, 2020

Ok, I've understood a bit better :)
On the "Java" side, the problem of the registration is "solved" by Confluent setting the following properties of the Avro serializer:

  • auto.register.schemas=false
  • use.latest.version=true
    This disables auto registration and force to use the latest version of the Avro schema. Could we implement something similar here in Avrora?

Still in Java, the Avro maven plugin works the same way of this PR: to decode Avro messages uses the de-referenced schema (with the import feature) but registering is managed by Confluent's Schema Registry maven plugin, which registers schemas with reference. In this way schemas are registered during the CI pipeline and not automatically when the message is produced.

About your first point, how can I prevent this instruction to fail when is received a Json with the reference?
:avro_json_decoder.decode_schema(payload, allow_bad_references: true)

The problem is that the decoder looks for a type (the type referenced) which is not a native type, so I think the avro_json_decoder cannot properly decode the message.

On registering side yes, this should be fixed a bit. Both referenced and referencing schemas should be registered.

@Strech
Copy link
Owner

Strech commented Nov 5, 2020

@FrancescoPessina I've read about references a bit more and what I can confirm

On the "Java" side, the problem of the registration is "solved" by Confluent setting the following properties of the Avro serializer:

auto.register.schemas=false
use.latest.version=true

This already possible with current settings registry_schemas_autoreg: false

About your first point, how can I prevent this instruction to fail when is received a Json with the reference?
:avro_json_decoder.decode_schema(payload, allow_bad_references: true)

You are right and it's already used here

avrora/lib/avrora/schema.ex

Lines 159 to 164 in 81e1831

defp do_parse(payload) do
{:ok, :avro_json_decoder.decode_schema(payload, allow_bad_references: true)}
rescue
error in ArgumentError -> {:error, error.message}
error in ErlangError -> {:error, error.original}
end

The problem is that the decoder looks for a type (the type referenced) which is not a native type, so I think the avro_json_decoder cannot properly decode the message.

So I refresh a bit of how the references have done now. Schema during parsing collect all references and then resolves them, I think this is a more seamless way to add the functionality you have built

avrora/lib/avrora/schema.ex

Lines 125 to 133 in 81e1831

with {:ok, schema} <- do_parse(payload),
{:ok, references} <- ReferenceCollector.collect(schema),
lookup_table <- :avro_schema_store.add_type(schema, lookup_table) do
payloads =
references
|> Enum.reject(&:avro_schema_store.lookup_type(&1, lookup_table))
|> Enum.map(fn reference ->
reference |> reference_lookup_fun.() |> unwrap!()
end)

If we can collect references anyway, we also can resolve them via registry I guess. I think it's still not puzzled in my head to complete the solution, but I think we can then leverage on the existing way or enhance it. And then it should work for both cases with maybe some new settings (or existing)


UPD1: Here Schema.parse is used, maybe a potential place for reference lookup?

{:ok, schema} <- Schema.parse(schema) do

@FrancescoPessina
Copy link
Contributor Author

@Strech ok, I dug a bit more into the code and understood how reference lookup works.
There is one issue which I'll try to explain you, regarding this function:

defp do_collect({:avro_record_type, _, namespace, _, aliases, fields, fullname, _}) do

This collects the references contained into a schema inspecting the schema itself. For example, from this schema:

{
  "type": "record",
  "name": "Account",
  "namespace": "io.confluent",
  "aliases": ["Profile"],
  "fields": [
    {
      "name": "payment_history",
      "type": "io.confluent.PaymentHistory"
    },
    {
      "name": "messenger",
      "type": "io.confluent.Messenger"
    },
    {
      "name": "emails",
      "type": {
        "type": "map",
        "values": "io.confluent.Email"
      }
    },
    {
      "name": "settings",
      "type": {
        "type": "map",
        "values": {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "value",
              "type": "string"
            }
          ]
        }
      }
    }
  ]
}

the references extracted will be ["io.confluent.Email", "io.confluent.Messenger", "io.confluent.PaymentHistory"].
It seems fine and with this approach we could ignore what the CSR passes us in the references attribute of the response, and in the reference_lookup function we could call the schema registry and get references schemas, like this:

def reference_lookup(r) do
    Logger.info("Called reference lookup with reference: " <> inspect(r))
    Avrora.Storage.Registry.get(r)
  end

But there is one problem: the subject name could be slightly different from the Avro record name. For example, using the TopicRecordNameStrategy (see https://www.confluent.io/blog/put-several-event-types-kafka-topic/) the schema name would be something like <topic-name>-io.confluent.Email. In this case our reference_lookup function would fail because the subject which is searching for (io.confluent.Email) which does not exists in the CSR (because the subject name is <topic-name>-io.confluent.Email.

So, we have to pass down to Schema.parse, in some way, the data contained into the references attribute got from CSR response.

@Strech
Copy link
Owner

Strech commented Nov 6, 2020

@RafaelCamarda I think I have an idea.

Sine the reference lookup will be defined in the schema registry storage, we can use a closure functionality to resolve the naming issue and not expose the references anywhere, the potential code might look like this

def get(key) when is_binary(key) do

  def get(key) when is_binary(key) do
    with {:ok, schema_name} <- Name.parse(key),
         {name, version} <- {schema_name.name, schema_name.version || "latest"},
         {:ok, response} <- http_client_get("subjects/#{name}/versions/#{version}"),
         {:ok, id} <- Map.fetch(response, "id"),
         {:ok, references} <- Map.fetch(response, "$ref") # <---- Meta-code of extracting references
         {:ok, version} <- Map.fetch(response, "version"),
         {:ok, schema} <- Map.fetch(response, "schema") do

      # Meta-code of mapping a subject name to reference name
      references = %{
        "io.confluent.Payment" => "topic-io.confluent.Payment"
      }

      lookup_function = fun r -> do
        Logger.info("Called reference lookup with reference: " <> inspect(r))
        Avrora.Storage.Registry.get(Map.get(references, r)) # <--- Meta-code of getting a real reference subject name
      end

      {:ok, schema} = Schema.parse(schema)
      Logger.debug("obtaining schema `#{schema_name.name}` with version `#{version}`")

      {:ok, %{schema | id: id, version: version}}
    end
  end

This approach allows us to keep reference knowledge within the Storage.SchemaRegistry (which it suppose to be) and at the same time we will re-use existing code of referencing

@FrancescoPessina
Copy link
Contributor Author

FrancescoPessina commented Nov 8, 2020

@Strech I pushed a new PR (#50 ) developing your idea :) I close this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants