Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

oauth token not refreshing #1485

Open
3 of 7 tasks
sms1190 opened this issue Dec 22, 2022 · 14 comments
Open
3 of 7 tasks

oauth token not refreshing #1485

sms1190 opened this issue Dec 22, 2022 · 14 comments

Comments

@sms1190
Copy link

sms1190 commented Dec 22, 2022

Description

In my project, I am using confluent-kafka-python-1.9.2 to consumer and produce messages onto kafka topic. OAuth provider is already set by the other team and token gets expires after 30 minutes. In My code I have used this configuration for the consumer.

config = {'bootstrap.servers': '<server-url>:9093', 
'group.id': 'consumer-group', 
'auto.offset.reset': 'latest',
'queued.max.messages.kbytes': 100000,
'enable.auto.commit': False,
'fetch.min.bytes': 100000,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER', 
sasl.oauthbearer.config': 'oauth_cb',
'oauth_cb': functools.partial(_get_token,<client_id>,<client_secret>), 
}

_get_token function:

def _get_token(client_id, client_secret, oauth_config):
    realm = KeycloakRealm(
        server_url="<auth_server_url>",
        realm_name="<realm_name>",
    )
    oidc_client = realm.open_id_connect(
        client_id=client_id,
        client_secret=client_secret,
    )
    client_credentials = oidc_client.client_credentials()
    access_token = client_credentials["access_token"]
    expires_in = client_credentials["expires_in"]
    print(client_credentials)
    return access_token, time.time() + expires_in

Consumer code:

#.....
config.update({"key.deserializer": key_deserializer,
                          "value.deserializer": value_deserializer,})
consumer = DeserializingConsumer(config)
while True:
        message = self._client.poll(timeout) if timeout else self._client.poll()
        if message is None:
              logger.debug("No messages found")
              continue
        message_error = message.error()
        if message_error:
              logger.error(message_error)
       # processing message code 

So while running the consumer, it can fetch the token for the first time and can consume the messages without any issue. but as token expires after 30 minutes, I started getting following error.

confluent_kafka.error.ConsumeError: KafkaError{code=TOPIC_AUTHORIZATION_FAILED,val=29,str="Fetch from broker 31 failed: Broker: Topic authorization failed"}

even I set expires_in to 30 seconds or 1 minute, I still get the above error. so I don't understand that _get_token is called after every 1 minute but when after 30 minutes, I get the above error.

I also tried to set oauthbearer_token_refresh_cb but got this error:

cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Property "oauthbearer_token_refresh_cb" must be set through dedicated .._set_..() function"}

So I would like to know how to refresh token?

How to reproduce

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • Apache Kafka broker version:
  • Client configuration: {...}
  • Operating system:
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
@bradsnay
Copy link

I have a similar issue when using the oauth_cb configuration on version 1.9.2.

I start my application and get a new token with an expiration of 15:42:05:

2023-01-10 15:12:05 [1680] [WARNING]  Fetched new Secure Kafka Token: expiration_date=2023-01-10 15:42:05

I consume some messages successfully. Then at 15:36:06 my token is refreshed with an expiration of 16:06:06:

2023-01-10 15:36:06 [1680] [WARNING]  Fetched new Secure Kafka Token: expiration_date=2023-01-10 16:06:06

But at 15:42:05 , right when the first token expires, I get

2023-01-10 15:42:05 [1680] [ERROR] KafkaError{code=TOPIC_AUTHORIZATION_FAILED,val=29,str="Fetch from broker 31 failed: Broker: Topic authorization failed"}

So something appears to be happening where the new token is not being used after it is refreshed.

@bradsnay
Copy link

I confirmed that this is also happening on version 1.9.0.

@bradsnay
Copy link

Went all the way back to version 1.6.1 and it continued to have this issue. I'm having a hard time believing that this would be a bug in the library for almost 2 years. I wonder if there's something else going on.

@bradsnay
Copy link

Something I noticed that may be contributing to this is that librdkafka copies the token value from the handle into another state object: https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_sasl_oauthbearer.c#L1256

If that copying only happens the first time we're connecting and the library only uses the state's token value, I could see that the first token fetched could be the only one passed to the broker. I haven't been able to prove this though as I haven't setup the project to debug locally.

@bradsnay
Copy link

Confirmed that the issue also happens without the oauth_cb configuration option so it's not confined only to that configuration.

"security.protocol": "SASL_SSL",
 "sasl.mechanism": "OAUTHBEARER",
"sasl.oauthbearer.method": "oidc",
"sasl.oauthbearer.client.id": <client_id>,
"sasl.oauthbearer.client.secret": <secret_name>,
"sasl.oauthbearer.token.endpoint.url": <token_url>,

@bradsnay
Copy link

Confirmed this is still happening in version 2.0.2.

@bradsnay
Copy link

bradsnay commented Apr 7, 2023

Confirmed that this can be fixed by setting the connections.max.reauth.ms setting on the broker to a non-zero number. Without this being set to a positive integer, the broker will not allow re-authentication. So even if a new token is presented, the session will end after the original token expires.

The value of connections.max.reauth.ms should be set greater than the typical token expiration time for your use case so that the client is given enough time to reauth without the connection being closed by the broker. In librdkafka, a token refresh occurs at 80% of the token expiration time so it would be best to set connections.max.reauth.ms to be >= 80% of the typical token expiration time.

Circling back here, the above change didn't actually work in the production environment. Both our production and development environment broker configurations are identical. We've also confirmed that the newly generated tokens in the oauth_cb function are valid. As of right now, it all points to either confluent_kafka or librdkafka passing the old token value to the broker as evidenced by the connection to the broker is severed as soon as our initial JWT token expires even after fetching a new token via oauth_cb.

@bradsnay
Copy link

bradsnay commented May 9, 2023

Providing some logs here. You can see the library states that the token was refreshed at 15:01:51.976

%7|1683658911.976|WAKEUP|rdkafka#consumer-1| [thrd:app]: Wake-up sent to 2 broker threads in state >= TRY_CONNECT: OAUTHBEARER token update

You then see the group authorization errors starting at 15:07:51 pm (1683659271.105) which is exactly the time the first token expires.

%7|1683659271.105|REQERR|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/11: OffsetCommitRequest failed: Broker: Group authorization failed: explicit actions Permanent

After a while of this, the session times out and the broker hangs up the connection.

Consumer group session timed out (in join-state steady) after 150083 ms without a successful response from the group coordinator (broker 11, last error was Success): revoking assignment and rejoining group
%7|1683659570.840|NODENAME|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/11: Broker nodename changed from "<redacted hostname>" to ""
%7|1683659570.840|NODEID|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/11: Broker nodeid changed from 11 to -1
%7|1683659570.840|DESTROY|rdkafka#consumer-1| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1683659570.840|TERMINATE|rdkafka#consumer-1| [thrd:app]: Terminating consumer group handler
%7|1683659570.840|TERMINATE|rdkafka#consumer-1| [thrd:app]: Interrupting timers
%7|1683659570.840|TERMINATE|rdkafka#consumer-1| [thrd:main]: Internal main thread terminating
%7|1683659570.840|TERMINATE|rdkafka#consumer-1| [thrd:app]: Sending TERMINATE to internal main thread
%7|1683659570.840|DESTROY|rdkafka#consumer-1| [thrd:main]: Destroy internal
%7|1683659570.840|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1683659570.840|TERMINATE|rdkafka#consumer-1| [thrd:app]: Joining internal main thread
%7|1683659570.840|DESTROY|rdkafka#consumer-1| [thrd:main]: Removing all topics
%7|1683659570.840|TOPBRK|rdkafka#consumer-1| [thrd:sasl_ssl://<redacted hostname>]: sasl_ssl://<redacted hostname>: Topic <redacted topic name> [0]: leaving broker (0 messages in xmitq, next broker (none), rktp 0x7f9bf800a190)
%7|1683659570.840|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Closing connection due to nodename change (after 2099653ms in state UP) (_TRANSPORT)
%7|1683659570.840|DESTROY|rdkafka#consumer-1| [thrd:main]: Sending TERMINATE to sasl_ssl://<redacted hostname>
%7|1683659570.840|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state UP -> DOWN

@bradsnay
Copy link

bradsnay commented May 9, 2023

@edenhill I apologize for the direct mention, but I could use some help. Is it possible that there is bug in which the old token is used instead of the refreshed token that is fetched from the oauth_cb function? That seems to be my only lead but I could use some of your expertise with this codebase to figure it out a bit faster.

confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): 2.0.2
Apache Kafka broker version: '2.5.1-1.el7'
Client configuration:

{
            "security.protocol": "SASL_SSL",
            "sasl.mechanism": "OAUTHBEARER",
            "oauth_cb": functools.partial(fetch_token),
            "bootstrap.servers": <redacted brokers>,
            "sasl.oauthbearer.token.endpoint.url": <redacted endpoint>,
            "ssl.ca.location": <redacted ssl ca location>,
            "debug": "generic,security,broker",
             "group.id": <redacted consumer group>,
            "auto.offset.reset": "earliest",
            "queued.max.messages.kbytes": 100000,
            "enable.auto.commit": False,
            "session.timeout.ms": 150000,
            "max.poll.interval.ms": 600000,
}

Operating system: Rocky Linux 8.5 (Green Obsidian)
Provide client logs: See above comment
Provide broker log excerpts: N/A
Critical issue: yes

@bradsnay
Copy link

I've also seen instances where the consumer stops consuming messages all together with the following error after the initial token expires.

%7|1683724659.379|REQERR|rdkafka#consumer-1| [thrd:main]: sasl_ssl://<redacted hostname>: FindCoordinatorRequest failed: Broker: Group authorization failed: actions Permanent
%7|1683724659.379|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change

@bradsnay
Copy link

It appears that according to KIP-255, the initial token remains in use so long as that initial connection remains intact. Does this mean that in order to maintain a long-lived connection, we need to explicitly track token expiration and restart consumers/producers or is there some other way to trigger a reconnection? The token refresh callback doesn't seem particularly useful if the new token is not used and we need to track our own expiration dates. It's also quite misleading from an API perspective.

@alexanderschapelt
Copy link

alexanderschapelt commented Jun 16, 2023

I have the same issue. with broker version 3.4.0 and confluent-kafka-python 2.1.1. Are there any updates on this?

EDIT:
After some more analysis, I think I found out the root cause. I just want to share it, if someone else has similar issues:

To sum up:
older kafka versions masked the issue, that librdkafka did not refresh the token automatically. Let's hope, that the new version is released soon, so we get rid of the error logs :)

@nwmiller
Copy link

nwmiller commented Aug 3, 2023

It looks like librdkafka 2.2.0 included a possible fix for this issue:

KIP-368:
Allow SASL Connections to Periodically Re-Authenticate
(confluentinc/librdkafka#4301, started by @vctoriawu).

@bradsnay
Copy link

bradsnay commented Nov 27, 2023

We've seen evidence that the issue lies within the oauth_cb configuration setting. That callback function does not always get called at the correct time to update the token being used to authenticate with the broker, especially in the case of the Producer class as it may not always consistently call poll. This results in the client continuing to send the old, expired token to the broker even if a new token was fetched via the callback function.

A working alternative on version 2.2.0 is to use the "sasl.oauthbearer.method": "oidc", "sasl.oauthbearer.client.id", "sasl.oauthbearer.client.secret" and "sasl.oauthbearer.token.endpoint.url" configuration settings to authenticate. This configuration does not exhibit the authentication issues seen when using the oauth_cb function. However, not using oauth_cb eliminates all of the flexibility it offers, such as being able to locally cache tokens for multiple consumers on the same machine, controlling the access to authentication servers via preferred libraries, etc. This can result in unnecessary and excessive load on the authentication servers when there are many (100s) of consumers running in parallel.

Ideally, the bug with oauth_cb can be identified and fixed so that it may work as intended. I've tried checking the source code in this repository as well as librdkafka but since I am not familiar with the inner workings of the code, I haven't been able to efficiently locate issues in the authentication code. Some help here would be greatly appreciated!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants