Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34652][AVRO] Support SchemaRegistry in from_avro method #31771

Closed
wants to merge 1 commit into from
Closed

[SPARK-34652][AVRO] Support SchemaRegistry in from_avro method #31771

wants to merge 1 commit into from

Conversation

blcksrx
Copy link

@blcksrx blcksrx commented Mar 7, 2021

What changes were proposed in this pull request?

Provides a simple polymorphism for from_avro method to fetching schema of the topic from SchemaRegistry
as like as the method that databricks implemented:
https://docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html#example-with-schema-registry

Why are the changes needed?

Does this PR introduce any user-facing change?

How was this patch tested?

I didn't write any test for this method, as you can verify :D
But feel free to ping me to implement the test, the unittest for this method would be a dummy test case I have to
mock the return value of the HTTP request method to SchemaRegisty and IntegrationTest would take a long time to execute cause it needs Kafka, Zookeeper and SchemaRegistry embedded!

@blcksrx blcksrx changed the title [SPARK-34652] Support SchemaRegistry in from_avro method [SPARK-34652][AVRO] Support SchemaRegistry in from_avro method Mar 7, 2021
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

def from_avro(
data: Column,
subject: String,
schemaRegistryUri: String): Column = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would avoid adding this in an API. Can we combine both avroSchema and avroSchemaUrl? You can try Schema.Parser().parse first and falls back to try parsing URL if it fails. In that way, we wouldn't have to add another API.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are your reasons for avoiding it? In my opinion, they are different things, and it's not really complicated to maintain this new API. Also, there are other things that as I mentioned Databricks already implemented this API, so do you agree it is better to keep these same? it would provide a better user experience for developers.
Anyway, you are the boss here, I follow you. What's your idea?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for avoiding is to minimize the number of APIs to the end users, and better usability. Arguably using URL is not very common usage. Adding an option is fine but I would like to avoid adding an API dedicated for it.

This is Apache Spark, not the fork of other companies. I wasn't involved in that API design in Databricks. No one is a boss in Apache projects. I am suggesting and reviewing it in my own perspective. It's up to you to reject and wait for other committers' opinion, and let them merge unless somebody explicitly objects.

What's the reason of avoiding it in one option? Isn't it simpler and easier to reason?

cc @gengliangwang too.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take it, easy man. yes, your reasons make sense about minimizing the number of APIs. I'm going to adjust my PR.
thank you

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine but maybe let's wait for @gengliangwang's opinion too. He's more an expert in this area, and might have a different idea.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, Okay.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Thanks for the ping.
I am fine with either way.
For the current design, I think we need to add another API with a field "options: java.util.Map[String, String]" as the other from_avro API do.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang So this new API is fine and I have to write unittest?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@blcksrx From my side, yes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I am fine with it too. I would follow @gengliangwang's judgement in this area.

@HyukjinKwon
Copy link
Member

cc @dongjoon-hyun FYI

@dongjoon-hyun
Copy link
Member

Thank you for pinging me, @HyukjinKwon .

@gengliangwang
Copy link
Member

@blcksrx please add unit tests for adding such an API, thanks!

@cloud-fan
Copy link
Contributor

Why do we add such an API? It just looks like a variant of the avroSchemaUrl option and does not worth a new API.

You mentioned https://docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html#example-with-schema-registry , but it's a totally different thing. The schema register integration needs to not only get the avro schema from the schema registry server, but also need to recognize the special avro binary data that has the schema register schema ID encoded.

This needs to add the schema register client dependency, that's why the API is not in the upstream Spark, as it changes dependencies.

@blcksrx
Copy link
Author

blcksrx commented Mar 8, 2021

If you pay attention to this on the link:
If your cluster has a Schema Registry service, from_avro can work with it so that you don’t need to specify the Avro schema manually.

So basically its a wrapper on the from_avro function that developers does not specify the schema manually. Thats it. By the way I use this function to stream avro data from our CDC pipeline

@cloud-fan
Copy link
Contributor

I'm the developer of that feature in Databricks and I'm sorry that I didn't make the document very clear.

I'm very sure that it's different from what this PR is doing. The key is to recognize the Avro binary data stored in Kafka which has the schema-registry schema ID encoded in the data. It's very different from the avroSchemaUrl option or any other APIs that specify the Avro schema.

@blcksrx
Copy link
Author

blcksrx commented Mar 8, 2021

@gengliangwang By these words, could you help me with what should I do now, please?

@cloud-fan
Copy link
Contributor

If your goal is to have a better from_avro function that doesn't need to specify the avro schema and get it from schema registry, can we extend the avroSchemaUrl? If the passed url is a schema registry url, get the schema from it, with a new option to specify the subject.

@cloud-fan
Copy link
Contributor

cloud-fan commented Mar 8, 2021

And are you sure that's the common way to use schema registry? I'm not a streaming expert but AFAIK the key advantage of schema registry is to support schema evolution. It's not like the Hive catalog that just stores the metadata. The data should also contain the schema to properly support schema evolution. It's an integrated solution (Kafka + Avro + schema registry).

@gengliangwang
Copy link
Member

gengliangwang commented Mar 8, 2021

@blcksrx I don't think the implementation of this PR can work with Kafka + Avro + schema registry as https://docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html#example-with-schema-registry describes. I was hoping you realize the problem after you create unit tests for the use case.

For what has done in this PR, it can be done by using the option avroSchemaUrl anyway. If your purpose is to get the latest Avro schema from the schema registry and use it on a stream of Avro format without schema-registry schema ID, there is no need to add a new API.

@HeartSaVioR
Copy link
Contributor

From what I understand is, the real problem is that Confluent SR requires using their deserializer as well, which requires adding Confluent SR client to the dependency. I can easily find the article that built-in from_avro doesn't work with record associated with Confluent SR. With aware of this, you could probably understand comments from @cloud-fan a bit better.

@blcksrx
Copy link
Author

blcksrx commented Mar 9, 2021

@HeartSaVioR @gengliangwang @cloud-fan Thank you guys for your explanation in detail. I'm going to change my PR as you guys suggested. but I have a question, do you guys have this idea to add the Confluent SR client or not?

@blcksrx blcksrx requested a review from HyukjinKwon March 9, 2021 19:10
@blcksrx
Copy link
Author

blcksrx commented Mar 9, 2021

@HyukjinKwon I did it, would you mind recheck it, please?

@blcksrx
Copy link
Author

blcksrx commented Mar 9, 2021

@dongjoon-hyun thanks for the comments, would you like to recheck it again, please?

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Mar 9, 2021

I guess we haven't decided the important things. Let's stop making progress and decide before continue.

  1. Would we want to pull the dependency from Confluent Schema Registry? There's no issue on license, but I also see we are trying to be "vendor-neutral" via avoiding vendor related things directly. No preference on this as I might not know about some decision being taken before.
  2. If the answer for 1 is yes (meaningless if the answer is no), given SR is only working with Kafka topic, are we sure doing it via from_avro/to_avro is the right way? I'd rather say Spark SQL Kafka should be smart to support schema based on SR instead, but I agree it requires a bunch of hurdles, so it's OK if we just like to go through from_avro / to_avro.
  3. If the answer for 1 is yes (same, meaningless if the answer is no), where we'd like to add the dependency? We've been concerned about adding external artifacts into core/sql so my assumption is to add to external. That said, from_avro/to_avro wouldn't be the place to extend.

@dongjoon-hyun
Copy link
Member

@HeartSaVioR Are you suggest to ban someone-else's comments? Actually, I only commented a few editorial changes.

I guess we haven't decided the important things. Let's stop making progress and decide before continue.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Mar 9, 2021

I'm not sure about your point. I see points populated out from @cloud-fan and @gengliangwang which are not addressed (please look through the previous comments), so I just like to make it clear to author that without addressing them I don't expect this can continue. (And also asking to reviewers that whether we want to allow more changes to unblock this.) Is there something I'm missing?

@HeartSaVioR
Copy link
Contributor

Say, suppose we don't make a decision on pulling Confluent SR but just add the schema URL in any way. While it might be able to retrieve the schema from SR but deserializing record will fail so will turn out to be no use. The main discussion here should be whether we want to support Confluent SR officially or not, which is completely different perspective from allowing the small code change. That's why I suggest stop making changes before we decide.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Mar 9, 2021

@HeartSaVioR . As you see, I was here from the beginning and followed all discussion. That's the reason why I didn't expression any indention of merging this PR. So, please don't get me wrong at this time. I respect your opinion and the others'.

@dongjoon-hyun
Copy link
Member

Here is the example: Even for the PR which I approved once before, I didn't merge.

@HeartSaVioR
Copy link
Contributor

As you're following the discussion here, I guess you have an opinion on my comment about the direction. Could you please share your voice instead? Otherwise, could I interpret your reviews as you're agreeing with the current direction as it is?

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Mar 9, 2021

I was the one who closed https://issues.apache.org/jira/browse/SPARK-26314 Support Confluent encoded Avro in Spark Structured Streaming on December 2018. I've been negative to have a direct Confluent dependency.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Mar 10, 2021

For me, the API URL path and the knowledge on how to read the response are also dependencies with Confluent SR. The change is just to support Confluent SR, and I don't see any difference compared to have a direct dependency. Such dependency is even being added to the SQL module.

That said, if our consensus for supporting Confluent SR is not positive, I'm not positive with this change. I'm waiting for more voices as someone might claim to have support Confluent SR out of the box.

If we are positive with supporting Confluent SR, there should be much better way to support it, respecting schema evolution.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Mar 10, 2021

BTW, back to the original comment, I believe you don't need to rant like Let's stop making progress and decide before continue. in the community based on your presumption. Stop making progress is a quite too strong to the contributor(author), isn't it?

@HeartSaVioR
Copy link
Contributor

I'm sorry but I disagree. According to the long experience on being contributors (2+ years), I think the worst situation for contributors is that reviewers keep guiding to make changes, and in a sudden reviewers tell it's not going to work and the PR is not acceptable. The signal is better to be made sooner than later if the direction is not the way to go. Reviewers have a responsibility to make the direction right.

@dongjoon-hyun
Copy link
Member

Well, if you disagree, I'm fine with that.

@HeartSaVioR
Copy link
Contributor

Btw, I'm sorry I admit about my bad. There should be probably question 4 in addition to 1~3.

  1. Do we think only pulling schema from SR and don't use deserializer from there is a valid use case we'd like to support?

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Mar 10, 2021

Yes, I thought there might be some beneficial use cases missed by us and suggested by the author because I'm also open for the non-schema-evolution cases if there is a clear benefit. However, it depends on the perspective, of course. Since you clear claim like the following that a logical dependency is also a dependency we should avoid, there is no room for us to find some potential use cases in that perspective.

I don't see any difference compared to have a direct dependency.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Mar 10, 2021

We still need to hear voices but if we are OK with 4 I'm fine with it. We could probably get "full URL" from outside to avoid soft dependency on API path. Parsing response is another story, this has to be coupled in anyway, or probably ends up with adding new signature.

I feel I wasn't clear about my voice. Sorry about this. Actually I'm some sort of +0 (I'm supportive if others want it) on supporting Confluent SR, via external module instead of making changes on SQL module directly. (Just roughly saying Flink supports it) That said, the options were written as neutral position - I didn't/don't answer no to any options, but my honest preference is that if we are OK with 4 then I hope we could reconsider 1 and evaluate others. Just doing 4 sounds a bit limited.

This doc describes about Confluent SR support on Flink.
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/avro-confluent.html

@cloud-fan
Copy link
Contributor

I think it's a good addition to support the Confluent SR integration, the question is how. Making it built-in looks a bit overkill, putting it in "external" or third-party library sounds better.

I think Confluent SR + kafka + avro can be treated as an advanced kafka data source and should be possible to implement it outside of Spark, with data source APIs. We have several projects which were implemented outside of Spark and later on got merged to Spark. We can do the same here.

@dongjoon-hyun
Copy link
Member

cc @viirya, too, since this is SS area.

@viirya
Copy link
Member

viirya commented Mar 10, 2021

Thanks for ping me, @dongjoon-hyun.

I quickly go through previous comments and hope I don't miss anything. If anything I miss or misunderstand, please let me know.

I think I can understand the comments from @cloud-fan @gengliangwang @HeartSaVioR. I agree that what this PR does currently is far from the Avro + Kafka + SR integration. Seems to me this only wants to fetch the latest schema from SR and so it does nothing with schema evolution here. I also agree that avroSchemaUrl looks like an option to achieve the goal. Although I also worry about it might be a bit confusing for users who wrongly think it as SR integration support. It sounds like @HeartSaVioR's option 4? I'm fine if others think it is okay to do.

I guess @cloud-fan's idea about an advanced Kafka data source is another story. Actually I am not sure if the author expects a full integration of SR? Or just want something like option 4.

@cloud-fan
Copy link
Contributor

Do we have real use cases behind option 4? If no one uses Confluent SR like this, then option 4 can be confusing.

@viirya
Copy link
Member

viirya commented Mar 10, 2021

Do we have real use cases behind option 4? If no one uses Confluent SR like this, then option 4 can be confusing.

Yea, that is what I meant I worry about it.

@HeartSaVioR
Copy link
Contributor

Looks like option 4 isn't something we prefer.

I guess below is one of known way to leverage Confluent SR (found from SPARK-26314 JIRA issue), using latest schema as DataFrame's schema but deserialize each record as Confluent's deserializer:

https://github.com/xebia-france/spark-structured-streaming-blog/blob/master/src/main/scala/AvroConsumer.scala

That is something I could live with it - so if we support option 1 and willing to support as a built-in, it could be supported in various ways 1) simply one of UDF implementation 2) could be another Kafka data source integrating serializer/deserializer in data source level 3) etc? . The point is simply we'd like to support Confluent SR out of the box with pulling artifacts. Looks like it's not also quite supportive to pull into Spark codebase, unfortunately.

@LeonardoZV
Copy link

LeonardoZV commented Apr 1, 2021

My humble opinion: I think Spark should full support Confluent SR.

The use of event driven architecture is skyrockting and big companies like to control event metadata so they don't lose control of what is being shared in the message bus (data governance) and Confluent SR is the main player here. I think that not supporting will make big companies go elsewhere. Projects like Apache Camel and AWS products (ex: Glue) already support it due to its importance. I work for the biggest private Brazillian Bank and i dont see us using Spark Streaming without it.

I just started learning Spark but i know Kafka very well and i think there's one more thing to discuss here: Confluent SR supports multiple schemas per topic (TopicRecordNameStrategy), so if Spark should full support Schema Registry, it needs to somehow have a way to deal with it in addition to the schema evolution. Something like a DynamicFrame (https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html).

To do that today we would need different consumers (one for each schema) with filters right? Thats not good. Tell me if there's other way, because i'm still learning Spark.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
9 participants