Skip to content

Add support for Confluent Schema Registry in the druid-avro-extension module#3529

Merged
himanshug merged 1 commit intoapache:masterfrom
ncolomer:feature-confluent-schema-registry
Nov 8, 2016
Merged

Add support for Confluent Schema Registry in the druid-avro-extension module#3529
himanshug merged 1 commit intoapache:masterfrom
ncolomer:feature-confluent-schema-registry

Conversation

@ncolomer
Copy link
Contributor

@ncolomer ncolomer commented Oct 3, 2016

This PR adds support for deserializing Confluent's Schema Registry encoded avro (see documentation) in the druid-avro-extension.

Schema Registry's binary prefix is different from schemarepo and only contains the schema ID (1 null byte + 4-byte int ID).

This submission only adds the io.confluent:kafka-schema-registry-client:3.0.1 dependency to the druid-avro-extension module (no transitive ones).

It was tested on some of our Avro-encoded Kafka topics.

Ideally, we'd like to backport this to the 0.9.1.x branch since we use Imply Druid's distribution (currently stuck to Druid 0.9.1.1). Is another PR necessary?

@fjy
Copy link
Contributor

fjy commented Oct 3, 2016

@himanshug can you take a look?

@gianm
Copy link
Contributor

gianm commented Oct 3, 2016

@ncolomer we (the Druid project) generally don't release patches of old versions unless there's some critical issue; so, this feature would be slated for 0.9.3. You could always build a custom Druid distro in the meantime though.

@fjy fjy added this to the 0.9.3 milestone Oct 4, 2016
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this bring kafka jars as well?
if yes, it doesn't look like the code here needs kafka jars in any way... is it possible to depend on something else that brings the SchemaRegistry stuff without kafka jars?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hola @himanshug, as seen with command mvn -pl extensions-core/avro-extensions dependency:tree, the kafka-schema-registry-client only pulls org.slf4j:slf4j-log4j12:jar:1.7.6 transitive dependency.

@himanshug
Copy link
Contributor

can you add some UTs? there are plenty of examples in same module. thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema registry repo contains the avro serializer/deserializer , is it possible to use those instead of us knowing the format of the message?

Copy link
Contributor Author

@ncolomer ncolomer Oct 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

following my previous comment, sure I could have used other higher-level confluent lib (such as io.confluent:kafka-avro-serializer) but it would have required pulling all kafka stuff... that's why I chose to rely on kafka-schema-registry-client and implement the deserialization logic myself (which is not that complicated in the end). anyway I'm open to any suggestion here :)

@ncolomer
Copy link
Contributor Author

ncolomer commented Oct 5, 2016

Yup, I'll add some

@ncolomer
Copy link
Contributor Author

ncolomer commented Nov 3, 2016

@himanshug tests added, see c597384 and SchemaRegistryBasedAvroBytesDecoderTest.java file
EDIT: rebased branch with latest master commit

@fjy
Copy link
Contributor

fjy commented Nov 8, 2016

👍

@fjy
Copy link
Contributor

fjy commented Nov 8, 2016

@himanshug any more comments?

@himanshug himanshug merged commit 37ecffb into apache:master Nov 8, 2016
@himanshug
Copy link
Contributor

@ncolomer thanks

@fjy
Copy link
Contributor

fjy commented Nov 8, 2016

@ncolomer did you sign the CLA?
http://druid.io/community/cla.html

@ncolomer
Copy link
Contributor Author

ncolomer commented Nov 9, 2016

@fjy done, thanks

@kosii
Copy link

kosii commented Jan 4, 2017

is this really supposed to work with flattenSpec as stated in #3714 ?

@kosii
Copy link

kosii commented Jan 10, 2017

okay, for future reference it only works for non-union nested fields

@maxstreese
Copy link

Hi, apologies but is there any example on how to set this up exactly? I have both the druid-kafka-indexing-service as well as the druid-avro-extensions loaded and can create a spec that discovers my cluster and topic just fine but with the UI there is no Avro parser to be found anywhere. And as for the defining the spec in JSON directly I could not find any example or documentation so far.

@gianm
Copy link
Contributor

gianm commented Aug 12, 2020

Hi, apologies but is there any example on how to set this up exactly? I have both the druid-kafka-indexing-service as well as the druid-avro-extensions loaded and can create a spec that discovers my cluster and topic just fine but with the UI there is no Avro parser to be found anywhere. And as for the defining the spec in JSON directly I could not find any example or documentation so far.

I think as of today (0.19.0) that streaming Avro isn't yet supported by the new inputFormat API or by the web console UI. You can still use the legacy parser API, though. It's documented here: https://druid.apache.org/docs/latest/ingestion/data-formats.html#avro-stream-parser

In the future, we'll need to add streaming Avro inputFormat and web console UI support.

@maxstreese
Copy link

Hi @gianm,

thanks for that fast reply to my depressive comment. Ok now I also understand the docs better with the background information that this is currently only supported in the legacy parser. To me personally the docs seem to not reflect this properly. At least I got quite confused. But in any case I was able to make it work so let me share my final config for some dummy topic in case someone else stumbles across this:

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "ticks",
    "parser": {
      "type": "avro_stream",
      "avroBytesDecoder": {
        "type": "schema_registry",
        "url": "<schema registry url>"
      },
      "parseSpec": {
        "format": "avro",
        "flattenSpec": {
          "fields": [
            {"name": "instrument", "type": "path", "expr": "$.id.instrument"},
            {"name": "currency", "type": "path", "expr": "$.id.currency"}
          ]
        },
        "timestampSpec": {
          "column": "timestamp",
          "format": "millis"
        },
        "dimensionsSpec": {
          "dimensions": [
            "instrument",
            "currency",
            {"name": "value", "type": "double"}
          ]
        }
      }
    }
  },
  "ioConfig": {
    "type": "kafka",
    "topic": "ticks",
    "consumerProperties": {
      "bootstrap.servers": "<bootstrap server addresses>"
    }
  },
  "tuningConfig": {
    "type": "kafka",
    "logParseExceptions": true
  }
}

The above assumes that there is a topic named ticks in your cluster which contains data encoded with the following Avro schema:

{
    "type": "record",
    "name": "Tick",
    "namespace": "<some namespace>",
    "fields": [{
        "name": "id",
        "type": {
            "type": "record",
            "name": "Id",
            "fields": [{
                "name": "instrument",
                "type": "string"
            }, {
                "name": "currency",
                "type": "string"
            }]
        }
    }, {
        "name": "timestamp",
        "type": {
            "type": "long",
            "logicalType": "timestamp-millis"
        }
    }, {
        "name": "value",
        "type": "double"
    }]
}

@gianm
Copy link
Contributor

gianm commented Aug 14, 2020

Thank you for sharing @maxstreese! I, too, am looking forward to supporting this functionality in the new inputFormat API too. It should make things simpler in the docs as well.

seoeun25 pushed a commit to seoeun25/incubator-druid that referenced this pull request Feb 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants