New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New (key|value).multi.type option for Avro serialization #680

Merged
merged 5 commits into from Jan 12, 2018

Conversation

Projects
None yet
@ept
Contributor

ept commented Dec 1, 2017

In some situations, an application needs to store events of several different types in the same Kafka topic. In particular, when developing a data model in an Event Sourcing style, you might have several kinds of event that affect the state of an entity. For example, for a customer entity there may be customerCreated, customerAddressChanged, customerEnquiryReceived, customerInvoicePaid, etc. events, and the application may require that those events are always read in the same order. Thus, they need to go in the same Kafka partition (to maintain ordering).

The Avro schema registry currently assumes a 1:1 mapping between Kafka topics and Avro schemas, making it difficult to support scenarios like the one above. Users who want several event types in the same topic currently either have to put them in one big Avro union (which works, but gets unwieldy very quickly), or turn off the registry's schema compatibility checking (which would be unfortunate, since the compatibility check is very valuable).

This patch introduces two new boolean config settings, key.multi.type and value.multi.type. When set to true, they allow the key (or value, respectively) of a message to be any Avro record type. The schema of the type is stored in the schema registry as usual; however, instead of
using <topic>-key or <topic>-value as subject, the fully-qualified name of the record type is used as subject.

This has the effect that a Kafka producer will happily accept any mixture of Avro record types and publish them to the same topic. Since the schema registry's ID for a schema is globally unique, the binary message encoding does not need to change, and consumers also handle the mixture of record types without change. When a schema is changed, the registry checks compatibility with previous schemas of the same fully-qualified type name; different record types can be evolved independently without any interference.

@ConfluentCLABot

This comment has been minimized.

ConfluentCLABot commented Dec 1, 2017

It looks like @ept hasn't signed our Contributor License Agreement, yet.

The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence.
Wikipedia

You can read and sign our full Contributor License Agreement here.

Once you've signed reply with [clabot:check] to prove it.

Appreciation of efforts,

clabot

@ept

This comment has been minimized.

Contributor

ept commented Dec 1, 2017

[clabot:check]

@ConfluentCLABot

This comment has been minimized.

ConfluentCLABot commented Dec 1, 2017

@confluentinc It looks like @ept just signed our Contributor License Agreement. 👍

Always at your service,

clabot

@mageshn

This comment has been minimized.

Member

mageshn commented Dec 4, 2017

@ept thanks for your patch. While for most part, it looks good; I'm just thinking gout aloud here if it would help to generalize this. We have different scenarios where users would want to share the same schema across topic. Your solution can be used to fix that scenario as well. So, may be we could call the config to be something key.subject.name.strategy and value.subject.name.strategy. The default strategy could always use topic-ket and topic-value. Let me know your thoughts.

@ept

This comment has been minimized.

Contributor

ept commented Dec 11, 2017

@mageshn Thanks for the suggestion — I think that's a good idea, so I've implemented the key.subject.name.strategy and value.subject.name.strategy configs. They currently have three valid settings:

  • default: <topic>-key or <topic>-value as before
  • "type" setting: uses the fully-qualified Avro record name as subject
  • "topic-type" setting: uses the Kafka topic concatenated with the fully-qualified Avro record name as subject

Both the "type" and "topic-type" settings allow multiple event types in the same topic; the difference is just the scope at which the schema compatibility check is performed (per-topic per-type, or globally per-type).

@rhauch

This is a really great improvement. Thanks, @ept. I do have one suggestion below:

throw new SerializationException("Unknown value for "
+ AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY + ": "
+ valueSubjectNameStrategy);
}

This comment has been minimized.

@rhauch

rhauch Dec 20, 2017

Member

This method is called frequently, and doing all these string comparisons in each call is less than ideal. What do you think about creating a functional interface like:

public interface SubjectNamingStrategy {
    String getSubjectName(String topic, Object value);
}

with a separate implementation for each strategy (see below). The strategy for keys and values could be instantiated once in configureClientProperties(...) above:

if ("topic-key".equals(keySubjectNameStrategy)) {
    keySubjectStrategy = new TopicNamingStrategy();
} else if ("type".equals(keySubjectNameStrategy)) {
    keySubjectStrategy = new RecordSchemaNamingStrategy("");
} else if ("topic-type".equals(keySubjectNameStrategy)) {
    keySubjectStrategy = new RecordSchemaNamingStrategy("topic-");
} else {
    throw new SerializationException("Unknown value for "
                + AbstractKafkaAvroSerDeConfig.KEY_SUBJECT_NAME_STRATEGY + ": "
                + keySubjectNameStrategy);
}
if ("topic-value".equals(valueSubjectNameStrategy)) {
    keySubjectStrategy = new TopicNamingStrategy();
} else if ("type".equals(valueSubjectNameStrategy)) {
    valueSubjectStrategy = new RecordSchemaNamingStrategy("");
} else if ("topic-type".equals(valueSubjectNameStrategy)) {
    valueSubjectStrategy = new RecordSchemaNamingStrategy("topic-");
} else {
    throw new SerializationException("Unknown value for "
                + AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY + ": "
                + valueSubjectNameStrategy);
}

One benefit is that an incorrect value for the configuration property is detected right away. However, the big benefit is that the getSubjectName member method that is called frequently by subclasses can be far more efficient by delegating to the correct strategy and forgoing all of the string comparisons:

protected String getSubjectName(String topic, boolean isKey, Object value) {
    return isKey ? keyStrategy.getSubjectName(topic, isKey, value) : valueStrategy.getSubjectName(topic, isKey, value);
}

Each SubjectNamingStrategy implementation would be quite straightforward. For example, the TopicSubjectNamingStrategy might be implemented as follows:

protected static class TopicNamingStrategy implements SubjectNamingStrategy {
    public String getSubjectName(String topic, boolean isKey, Object value) {
       if (isKey) {
           return topic + "-key";
       }
       return topic + "-value"; 
    }
}

while another RecordSchemaNamingStrategy implementation could handle both type and topic-type options by just using different prefixes:

protected static class RecordSchemaNamingStrategy implements SubjectNamingStrategy {
    private final String prefix;
    public RecordSchemaNamingStrategy(String prefix) {
        this.prefix = prefix != null ? prefix : "";
    }
    public String getSubjectName(String topic, boolean isKey, Object value) {
        // Null is passed through unserialized, since it has special meaning in
        // log-compacted Kafka topics.
        if (value == null) {
          return null;
        }

        if (value instanceof GenericContainer) {
          Schema schema = ((GenericContainer) value).getSchema();
          if (schema.getType() == Schema.Type.RECORD) {
            return prefix + schema.getFullName();
          }
        }

        // isKey is only used to produce more helpful error messages
        if (isKey) {
          throw new SerializationException("In configuration "
              + AbstractKafkaAvroSerDeConfig.KEY_SUBJECT_NAME_STRATEGY + " = "
              + keySubjectNameStrategy + ", the message key must only be an Avro record");
        } else {
          throw new SerializationException("In configuration "
              + AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY + " = "
              + valueSubjectNameStrategy + ", the message value must only be an Avro record");
        }
    }
}

This comment has been minimized.

@mageshn

mageshn Dec 20, 2017

Member

IMO, I would prefer making the Strategy interface public so that users could choose to use their own topic to subject mapping strategy if they need to. Essentially, the config becomes a class name.

@ept

This comment has been minimized.

Contributor

ept commented Jan 5, 2018

@rhauch @mageshn Happy new year! I have updated the patch as you suggested, using different classes to implement the different subject-name choosing strategies. The configuration is now a fully-qualified Java classname, so that people can easily plug in their own strategies if desired. Could you let me know if it looks good now?

@rhauch

rhauch approved these changes Jan 5, 2018

@ept, happy new year to you! Thanks for the changes. I have one really minor question below -- otherwise this looks great!

Approving as is in case it's difficult to find succinct and clear text to add.

TopicNameStrategy.class.getName();
public static final String KEY_SUBJECT_NAME_STRATEGY_DOC =
"Determines how to construct the subject name under which the key schema is registered "
+ "with the schema registry";

This comment has been minimized.

@rhauch

rhauch Jan 5, 2018

Member

Minor nit: perhaps the key and value doc strings could say that by default the older topic naming behavior/strategy is used. It's not essential, but if it can be said clearly and succinctly it might help people understand that the behavior won't change if they use the default.

This comment has been minimized.

@ept

ept Jan 5, 2018

Contributor

Added clarification of default behaviour in bf574fe.

@rhauch

This comment has been minimized.

Member

rhauch commented Jan 5, 2018

BTW, not sure if these pass locally, but the build is failing with the NPEs in the following tests:

  • io.confluent.connect.avro.AvroConverterTest.testPrimitive
  • io.confluent.connect.avro.AvroConverterTest.testVersionExtracted
  • io.confluent.connect.avro.AvroConverterTest.testVersionMaintained
  • io.confluent.connect.avro.AvroConverterTest.testComplex

For example:

java.lang.NullPointerException
at io.confluent.connect.avro.AvroConverterTest.testVersionMaintained(AvroConverterTest.java:210)

@ept

This comment has been minimized.

Contributor

ept commented Jan 5, 2018

Whoops, sorry about the test failures. I had just ran the tests on the kafka-avro-serializer module but not the rest. Pushed 6a9092a to fix the build.

@@ -41,6 +43,8 @@
private static final Map<String, Schema> primitiveSchemas;
protected SchemaRegistryClient schemaRegistry;
protected SubjectNameStrategy keySubjectNameStrategy = new TopicNameStrategy();

This comment has been minimized.

@mageshn

mageshn Jan 5, 2018

Member

nit : default is already specified in the config def. So, this is not necessary.

This comment has been minimized.

@ept

ept Jan 6, 2018

Contributor

It is necessary: if the field is not initialised here, we get the NPEs that @rhauch complained about in tests for the kafka-connect-avro-converter module. It might be that those tests aren't properly configuring the SerDe, but I didn't want to get into debugging tests that are unrelated to the feature at hand.

public class RecordNameStrategy implements SubjectNameStrategy {
@Override
public void configure(Map<String, ?> config) {

This comment has been minimized.

@mageshn

mageshn Jan 5, 2018

Member

I'm not seeing this being invoked after instance creation. Since this is a public API, users would possibly expect it to work if they use it. We should either invoke it or not extend Configurable in the interface.

This comment has been minimized.

@rhauch

rhauch Jan 5, 2018

Member

This is invoked here, right?

I think it's useful to extend Configurable -- it's too hard w/ Java 7 to add it later when we need it.

This comment has been minimized.

@mageshn

mageshn Jan 5, 2018

Member

That's right. I missed it is using getConfiguredInstance. LGTM

@ept

This comment has been minimized.

Contributor

ept commented Jan 9, 2018

Could someone at Confluent please tell me why the Jenkins check is failing? The branch builds fine for me locally, and jenkins.confluent.io is not accessible to the public.

@mageshn

This comment has been minimized.

Member

mageshn commented Jan 10, 2018

@ept Its failing for some find bug errors thats already fixed in master. Can you try rebasing your branch.

ept added some commits Dec 1, 2017

New (key|value).multi.type option for Avro serialization
In some situations, an application needs to store events of several
different types in the same Kafka topic. In particular, when developing
a data model in an Event Sourcing style, you might have several kinds of
event that affect the state of an entity. For example, for a customer
entity there may be customerCreated, customerAddressChanged,
customerEnquiryReceived, customerInvoicePaid, etc. events, and the
application may require that those events are always read in the same
order. Thus, they need to go in the same Kafka partition (to maintain
ordering).

The Avro schema registry currently assumes a 1:1 mapping between Kafka
topics and Avro schemas, making it difficult to support scenarios like
the one above. Users who want several event types in the same topic
currently either have to put them in one big Avro union (which works,
but gets unwieldy very quickly), or turn off the registry's schema
compatibility checking (which would be unfortunate, since the
compatibility check is very valuable).

This patch introduces two new boolean config settings, key.multi.type
and value.multi.type. When set to true, they allow the key (or value,
respectively) of a message to be *any* Avro record type. The schema of
the type is stored in the schema registry as usual; however, instead of
using "<topic>-key" or "<topic>-value" as subject, the fully-qualified
name of the record type is used as subject.

This has the effect that a Kafka producer will happily accept any
mixture of Avro record types and publish them to the same topic. Since
the schema registry's ID for a schema is globally unique, the binary
message encoding does not need to change, and consumers also handle the
mixture of record types without change. When a schema is changed, the
registry checks compatibility with previous schemas of the same
fully-qualified type name; different record types can be evolved
independently without any interference.
Generalize multi.type to (key|value).subject.name.strategy
Using a string value for the configuration allows more than two
settings. Added an option to use topic name + record name as subject.
@ept

This comment has been minimized.

Contributor

ept commented Jan 12, 2018

@mageshn Ok, rebased onto master.

@mageshn mageshn merged commit 10788d4 into confluentinc:master Jan 12, 2018

1 check passed

continuous-integration/jenkins/pr-merge This commit looks good
Details
@arnaudbos

This comment has been minimized.

arnaudbos commented Feb 12, 2018

Hi,
What release of schema-registry (and which docker image tag) is this patch planned for?
Thanks for the hard work 🤓

@defpearlpilot

This comment has been minimized.

defpearlpilot commented Feb 13, 2018

I second @arnaudbos question. What is the release timeline for this?

@mageshn

This comment has been minimized.

Member

mageshn commented Feb 15, 2018

This will be released with the upcoming 4.1 release. I don't have exact timelines but should be tentatively around end of March or early April.

@tPl0ch

This comment has been minimized.

tPl0ch commented Feb 18, 2018

@mageshn Are there any plans to port this behaviour to the REST proxy too? Currently the same restriction applies their by only allowing single key/value schemas with a record batch.

@eventSourcerer

This comment has been minimized.

eventSourcerer commented Feb 20, 2018

This looks like a very nice improvement - Our event schema is getting HUGE ;-)

Will this integrate with Kafka Streams? Currently we provide a concrete keySerde and valueSerde per topic there...

@jxbes

This comment has been minimized.

jxbes commented Mar 14, 2018

I second @eventSourcerer 's question. How will we provide the key/value serde when reading a topic into a stream?

@gphilipp

This comment has been minimized.

gphilipp commented Mar 23, 2018

Users who want several event types in the same topic currently either have to put them in one big Avro union (which works, but gets unwieldy very quickly).

Is there an example of this somewhere (with a union type as the root element of the schema) ?

@Yahampath

This comment has been minimized.

Yahampath commented May 24, 2018

Hi,
Is there any example of this. If any please comment below.

@arnaudbos

This comment has been minimized.

arnaudbos commented May 24, 2018

I don't think Avro supports this out of the box.
What you'd have to do is have a union type field.

Now that 4.1 is out, can we use the multi-schema feature? I didn't see it mentioned in the release note.

We have implemented our own version of schema-registry based on etcd (we had etcd at hand) because we needed this feature before it was released, though it doesn't support schema compatibility enforcements and rather than adding it on top of our own we'd like to migrate whenever possible.

Edit: Yes it has been released in 4.1.0, see changelog and docs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment