Skip to content

[SPARK-31805][SS] Exclude group.id from consumer settings when using the assign strategy#28623

Closed
tashoyan wants to merge 3 commits intoapache:masterfrom
tashoyan:SPARK-31805-assign-no-group-id
Closed

[SPARK-31805][SS] Exclude group.id from consumer settings when using the assign strategy#28623
tashoyan wants to merge 3 commits intoapache:masterfrom
tashoyan:SPARK-31805-assign-no-group-id

Conversation

@tashoyan
Copy link
Contributor

What changes were proposed in this pull request?

Fix for SPARK-31805: do not set the group.id consumer property when using the "assign" strategy.

Why are the changes needed?

With secure Kafka blocker an application fails, because the auto-generated group id is not allowed by the broker:

org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-relation-ecab045d-4ee6-425e-88a0-495d4100a013-driver-0

For the "assign" strategy, consumer group is not used. Therefore the best fix is to exclude the group.id property from the consumer config.

Does this PR introduce any user-facing change?

Yes. When using "assign" strategy:

  1. No need to reconfigure the broker - to add the necessary group ids to ACL
  2. (since Spark 3.0.0) No need to provide a custom group id (SPARK-26350) or a custom prefix (SPARK-26121)

How was this patch tested?

Manually:

  1. Rebuild the module:
    mvn install -pl :spark-sql-kafka-0-10_2.12
  2. Rebuild my application with newly built spark-sql-kafka-0-10_2.12. My application does the following:
    val kafkaDf = spark.read
      .format("kafka")
      // "kafka.bootstrap.servers" and SASL-specific options
      .options(options)
      .option("assign", topicPartitionsJson)
      .option("startingOffsets", startingOffsetsJson)
      .option("endingOffsets", "latest")
      .load()
  3. Run my application with a secure Kafka broker and verify that it does not fail with "GroupAuthorizationException: ..." anymore.

I did not manage to add unit tests, because I do not know, how to set up a secure Kafka broker. Unit tests in spark-sql-kafka-0-10 use the tool KafkaTestUtils. I am not sure if this tool is suitable to simulate a secure Kafka broker with ACLs.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HyukjinKwon
Copy link
Member

cc @HeartSaVioR FYI

override def createConsumer(
kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = {
val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
excludeGroupId(updatedKafkaParams)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not manipulate passed map, as we did in setAuthenticationConfigIfNeeded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now using KafkaConfigUpdater

@HeartSaVioR
Copy link
Contributor

If I'm not missing anything, the assignment is only effective on driver side consumer, so if we want to discard group.id config for assignment, we should do the same with consumers for executors as well. That can be applied for all strategies as consumers for executors always leverage assignment.
(Please turn on DEBUG log for org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer and see how group ID is printed.)

Btw, TBH I'm hesitate to agree the change unless there's no way to deal with such issue even with the changes from Spark 3.0.0, because the change would make an "exception" on specific option, which behavior may not be consistent with existing versions of Kafka & future versions of Kafka.

What I have been heard of group ID security issues from end users in community are that most cases can be dealt with prefixed group ID. Doesn't it help your case?

@tashoyan
Copy link
Contributor Author

tashoyan commented May 24, 2020

Consumers inside executors

Indeed, executors have their own Kafka consumers, and these consumers always have group.id set: KafkaSourceProvider.kafkaParamsForExecutors(). We can see group ids in executor logs, INFO level is enough.

However, the consumers inside executors always do assign() and never subscribe(). Then, there is no reason to set group.id for them. I would suggest to remove this setting for consumers.

Nevertheless, executors do not fail as driver does. My investigation is below in this post.

Kafka consumers behavior

By KafkaConsumer specification, the assign mechanism does not use consumer groups at all. Contrarily to subscribe, assign assumes manual assignment of topic partitions.

If I am not missing anything, Structured Streaming manually assigns Spark executors to Kafka partitions. We do not use consumer groups at all, so we don't need them. Therefore, I am not sure why Structured Streaming provides the option "subscribe".

By not specifying group.id, we can avoid the authorization problem on a secure Kafka broker.

Weird behavior of consumers inside executors

A weird thing is that executors do not fail with GroupAuthorizationException as the driver does. The difference between driver's consumer and executor's consumer is following:

These seek operations make the difference. I made a trivial application with KafkaConsumer - if I insert seekToBeginning() before calling poll(), the consumer successfully reads records without facing GroupAuthorizationException, hence bypasses authorization. Maybe it is a Kafka bug (I tried with Kafka 2.2.0).

@HeartSaVioR
Copy link
Contributor

If I am not missing anything, Structured Streaming manually assigns Spark executors to Kafka partitions. We do not use consumer groups at all, so we don't need them. Therefore, I am not sure why Structured Streaming provides the option "subscribe".

It's not true for driver side. Spark still leverages subscription in driver to receive the metadata updates and avoid dealing with retrieving target topic partitions by itself. Spark will interpret topic partitions in startingOffsets / endingOffsets based on the information. The metadata information is the thing we may need to deal manually with admin client if we want to avoid subscription at all, which is technically not impossible (if I'm not missing anything) but sub-optimal.

Also, the option "assign" leaves many considerations on end users' side - end users should know about target topic partitions and fully understand the query may not consume all messages in topic once topic expands the number of partitions. It's not simple to use and error-prone.

Back to the original topic, my comment is not about the feasibility of doing that. I agree we can do the change technically - the point is that how much it brings value to differentiate assign vs others in Spark side. Integrating with the details means that it's gonna be non-trivial to change. If the workarounds provided by Spark 3.0.0 work for all cases I'm not 100% sure about the value.
(Honestly I don't see the value of ACL on group ID if anyone can ignore the permission and call assign API. ACL should have been also set to the users.)

One interesting thing is that Kafka could ignore the group ID for assign API but didn't look like, which is why you've encountered the issue.

@HeartSaVioR
Copy link
Contributor

But that's only me. Let's hear more voices on this.

cc. @zsxwing @gaborgsomogyi

@gaborgsomogyi
Copy link
Contributor

I've taken a look at it and here are my thoughts:

  • According to KafkaConsumer specification, the consumer group in the "assign" strategy is not used.

This is simply not true. One can commit back offests with a consumer and such case group id is used.
This is not used in Structured Streaming at the moment but I don't want to close this possibility.

  • We've already added a solution in Spark 3.0.0 to solve exactly this issue, I don't see the reason to solve it a different way.
  • There is a configuration solution to this on broker side: bin/kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zk:2181 --add --allow-principal User:'Bon' --operation READ --topic topicName --group='spark-kafka-source-' --resource-pattern-type prefixed
  • Only personal view but I don't see the gain adding this, on the other hand additional code means more maintenance.

Overall I wouldn't merge it but if somebody thinks it worth then I have couple of further comments.

@gaborgsomogyi
Copy link
Contributor

gaborgsomogyi commented Jul 31, 2020

I'm just revisiting this. At the moment working on SPARK-32032 where AdminClient is planned to use on driver side which makes group.id completely useless in the Kafka connector. If it's merged then we can go to this direction but not with the current implementation. I think we shouldn't remove group.id from the map but just remove code references which are using it (for instance in KafkaDataConsumer) + documentation of course.

@github-actions
Copy link

github-actions bot commented Nov 9, 2020

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Nov 9, 2020
@github-actions github-actions bot closed this Nov 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants