-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add DynamicConfigProvider for Schema Registry #11362
Add DynamicConfigProvider for Schema Registry #11362
Conversation
Hi, @clintropolis, happy weekend! Follow your suggestion, I've integrate schema registry with DynamicConfigProvider. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -26,7 +26,7 @@ title: "Apache Avro" | |||
|
|||
This Apache Druid extension enables Druid to ingest and understand the Apache Avro data format. This extension provides | |||
two Avro Parsers for stream ingestion and Hadoop batch ingestion. | |||
See [Avro Hadoop Parser](../../ingestion/data-formats.md#avro-hadoop-parser) and [Avro Stream Parser](../../ingestion/data-formats.md#avro-stream-parser) | |||
See [Avro Hadoop Parser](../../ingestion/data-formats.md#avro-hadoop-parser) and [Avro Stream Parser](../../ingestion/c.md#avro-stream-parser) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
accidental change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
docs/ingestion/data-formats.md
Outdated
@@ -380,8 +380,8 @@ For details, see the Schema Registry [documentation](http://docs.confluent.io/cu | |||
| url | String | Specifies the url endpoint of the Schema Registry. | yes | | |||
| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no | | |||
| urls | Array<String> | Specifies the url endpoints of the multiple Schema Registry instances. | yes(if `url` is not provided) | | |||
| config | Json | To send additional configurations, configured for Schema Registry | no | | |||
| headers | Json | To send headers to the Schema Registry | no | | |||
| config | Json | To send additional configurations, configured for Schema Registry. User can implement a `DynamicConfigProvider` to supply some properties at runtime, by adding `"druid.dynamic.config.provider"`:`{"type": "<registered_dynamic_config_provider_name>", ...}` in json. | no | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest linking to operations/dynamic-config-provider.md
instead of providing an example inline, maybe something like
| config | Json | To send additional configurations, configured for Schema Registry. User can implement a `DynamicConfigProvider` to supply some properties at runtime, by adding `"druid.dynamic.config.provider"`:`{"type": "<registered_dynamic_config_provider_name>", ...}` in json. | no | | |
| config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a [DynamicConfigProvider](../../operations/dynamic-config-provider.md). | no | |
or something similar (and for the other sections that have been updated)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion! I'll change it.
) | ||
{ | ||
this.url = url; | ||
this.capacity = capacity == null ? Integer.MAX_VALUE : capacity; | ||
this.urls = urls; | ||
this.config = config; | ||
this.headers = headers; | ||
this.mapper = new ObjectMapper(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think instead of creating a new ObjectMapper this should be @JacksonInject
as a constructor argument.
) | ||
{ | ||
this.url = url; | ||
this.capacity = capacity == null ? Integer.MAX_VALUE : capacity; | ||
this.urls = urls; | ||
this.config = config; | ||
this.headers = headers; | ||
this.mapper = new ObjectMapper(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about injecting instead of making a new ObjectMapper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I'll change it.
{ | ||
return headers; | ||
} | ||
|
||
protected Map<String, String> createRegistryHeader() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These methods all look basically the same for both avro and protobuf (and kafka).
I suggest making a static method or two, perhaps in a new class, maybe DynamicConfigProviderUtils
or something similar in druid-core
, that takes a Map<String, Object>
and performs this logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reviewing! I add a DynamicConfigProviderUtils which provides function like createRegistryHeader()
, but it can not adapt to kafka. Should I adapt to kafka in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, it looks like my suggestion made coverage check fail because the tests that cover DynamicConfigProviderUtils
live in a different module. If it isn't too much trouble could add a test for it and it should make it go away.
I think its ok to do the changes to kafka in a follow-up if you would prefer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unit tests added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm thanks! fix up the conflicts and I think this is ready to merge 👍
006d1c7
to
9a6d0bd
Compare
Hi, @clintropolis. I've resolved conflicts, but integration test failed. I find some 503 errors in tests and I think it caused by some network errors. Can you help me rerun them? Thanks. |
@bananaaggle I've re-triggered what appears to be flaky tests 🤞 it passes on this run |
Thanks! |
Hi, @clintropolis. I've resolved conflicts. Can you help me merge it? I want to change DynamicConfigProviderUtils.java in #11372 to adapt to kafka properties, do you think it's properly? Thanks! |
Very sorry for the delay! I took a much needed couple week break and am just getting back to this 😅, will merge now 👍 |
Ahhhhhhh, welcome back! |
DynamicConfigProvider is an interface for users to to get secure configuration in various places in an extensible way. This feature allow users who use Confluent schema registry can pass config and headers with this interface.
This PR has: