Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for OAuth #442

Merged
merged 3 commits into from
Mar 24, 2022
Merged

Conversation

jsurany-bloomberg
Copy link
Contributor

This PR is meant to provide the user a way to build a Client that uses the oauthbearer_refresh_token_cb functionality in rdkafka-sys. It also provides the plumbing to define their own callback through the ClientContext trait.

  • ClientConfig has a pub(crate) flag that tells the Client to set the callback function to that of the given <impl>ClientContext type.
  • Default implementation of the callback just logs a message at the error level, the user needs to create their own callback function.

Example

// Define a type that overrides the default refresh_oauth_token method
struct RefreshContext {
    // relevant fields
}

impl RefreshContext {
    // other impl details
}

impl ClientContext for RefreshContext {
    fn refresh_oauth_token(&self, client: &mut RDKafka, oauthbearer_config: *const i8) {
         // set the token with rdkafka_sys::rd_kafka_oauthbearer_set_token
    }
}

let config = ClientConfig::new();
config.use_oauth_token_refresh_cb();
// other settings

let context = RefreshContext::new();
let producer = FutureProducer<RefreshContext> = config
    .create_with_context(context)
    .expect("Producer creation failed");

// etc.

Let me know what you think. The prototyping I've done for this has been on an older version of this crate, so testing is needed for these changes, particularly for Consumer types.

@jsurany-bloomberg
Copy link
Contributor Author

Should resolve issue #415

@jsurany-bloomberg
Copy link
Contributor Author

cc @benesch

@jsurany-bloomberg jsurany-bloomberg changed the title add token refresh callback functionality add support for OAuth Feb 16, 2022
@benesch
Copy link
Collaborator

benesch commented Feb 22, 2022

Sorry, I don't have time to look at this at the moment! I'll ping some of the other maintainers.

src/client.rs Outdated Show resolved Hide resolved
src/client.rs Outdated
opaque: *mut c_void,
) {
let context = &mut *(opaque as *mut C);
context.refresh_oauth_token(client, oauthbearer_config);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it return a boolean so we can then call oauthbearer_set_token() or
oauthbearer_set_token_failure()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hoping that I'm understanding what you want here...

It'd be cool to put the unsafe calls to those functions in this block, but I'm not sure it makes sense.

For instance, I don't think we should assume what the value of parameters like md_lifetime_ms should be, and I don't think we should complicate ClientContext so that the user has a way to provide that value through context by default.

Also, I don't really want to assume anything about how the user handles errors in this process. By moving the call to oauthbearer_set_token into this block, we make it so that the user has a lot of flexibility in handling errors in generating the token (since that's code that they are going to write), but they have no flexibility in handling errors that get reported back via the errstr value - they would be bound to whatever we decide to do in this block.

I can write a pseudo prototype of a callback function to make things a bit clearer if needed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callback could return the information needed for us to call oauthbearer_set_token here. One option would be to return Result<SetTokenData, Error>. The user can still have some custom error handling logic in the callback, but we'll take care of enforcing the contract that we must call oauthbearer_set_token_failure or oauthbearer_set_token in the callback.

Alternatively, if we really want the user to have all the flexibility in handling errors, we can return Option<SetTokenData>.

In any case, should this PR also include bindings for oauthbearer_set_token_failure or oauthbearer_set_token?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at the most recent commit and let me know what you think. The only thing that's missing right now is support for SASL extensions, for which I would be happy to add support in my free time after this PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!

@jsurany-bloomberg
Copy link
Contributor Author

@duarten Tested these changes and discovered some issues. I've fixed them in the latest commit, along with a small QoL change to the OAuthTokenError struct.

@@ -515,7 +516,11 @@ pub(crate) unsafe extern "C" fn native_oauth_refresh_cb<C: ClientContext>(
) {
// generate the token using generate_oauth_token
let context = &mut *(opaque as *mut C);
let oauthbearer_config = CStr::from_ptr(oauthbearer_config).to_string_lossy();
let oauthbearer_config = match oauthbearer_config.is_null() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oauthbearer_config may be null if there is no configuration. This check protects against the resulting seg fault.

@@ -542,7 +547,7 @@ pub(crate) unsafe extern "C" fn native_oauth_refresh_cb<C: ClientContext>(
}
};

let errstr = match CString::new(vec![0_u8; token_info.errstr_size]) {
let errstr = match CString::new(vec![u8::MAX; token_info.errstr_size]) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from documentation CString::new returns an error if the bytes contain a 0, causing this part of the code to only error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from_vec_unchecked may be a better choice. I would think that we want the vector to have only zeros.

@duarten duarten merged commit 04b634a into fede1024:master Mar 24, 2022
benesch added a commit that referenced this pull request Apr 4, 2022
Tweak the support for the OAuth token refresh callback added in #442 to
avoid some memory unsafety and to be more in line with crate
conventions. In particular:

  * Don't pass `&str` to C code expecting a C string directly, as the
    Rust string slice is not guaranteed to be null terminated and so the
    C code might read off the end of a string. Instead always construct
    a CString.

  * Don't allow the user to specify an `errstr_size`. Crate convention
    is to use an `ErrBuf` with a fixed size of 512.

  * Rename `generate_oauth_token` to `refresh_oauth_token`, to hew
    closer to the librdkafka naming for the feature.

  * Move the `Config::use_token_refresh_cb` method to an associated
    constant on the `ClientContext`, so that the configuration of the
    token refresh callback is entirely contained within `ClientContext.`
benesch added a commit that referenced this pull request Apr 4, 2022
Tweak the support for the OAuth token refresh callback added in #442 to
avoid some memory unsafety and to be more in line with crate
conventions. In particular:

  * Don't pass `&str` to C code expecting a C string directly, as the
    Rust string slice is not guaranteed to be null terminated and so the
    C code might read off the end of a string. Instead always construct
    a CString.

  * Don't allow the user to specify an `errstr_size`. Crate convention
    is to use an `ErrBuf` with a fixed size of 512.

  * Rename `generate_oauth_token` to `refresh_oauth_token`, to hew
    closer to the librdkafka naming for the feature.

  * Move the `Config::use_token_refresh_cb` method to an associated
    constant on the `ClientContext`, so that the configuration of the
    token refresh callback is entirely contained within `ClientContext.`
dopuskh3 pushed a commit to DataDog/rust-rdkafka that referenced this pull request Apr 23, 2022
This PR is meant to provide the user a way to build a Client that uses the oauthbearer_refresh_token_cb functionality in rdkafka-sys. It also provides the plumbing to define their own callback through the ClientContext trait.

ClientConfig has a pub(crate) flag that tells the Client to set the callback function to that of the given <impl>ClientContext type.
Default implementation of the callback just logs a message at the error level, the user needs to create their own callback function.
Example

```rust
// Define a type that overrides the default refresh_oauth_token method
struct RefreshContext {
    // relevant fields
}

impl RefreshContext {
    // other impl details
}

impl ClientContext for RefreshContext {
    fn refresh_oauth_token(&self, client: &mut RDKafka, oauthbearer_config: *const i8) {
         // set the token with rdkafka_sys::rd_kafka_oauthbearer_set_token
    }
}

let config = ClientConfig::new();
config.use_oauth_token_refresh_cb();
// other settings

let context = RefreshContext::new();
let producer = FutureProducer<RefreshContext> = config
    .create_with_context(context)
    .expect("Producer creation failed");

// etc.
```

Let me know what you think. The prototyping I've done for this has been on an older version of this crate, so testing is needed for these changes, particularly for Consumer types.
benesch added a commit that referenced this pull request Oct 29, 2022
Tweak the support for the OAuth token refresh callback added in #442 to
avoid some memory unsafety and to be more in line with crate
conventions. In particular:

  * Don't pass `&str` to C code expecting a C string directly, as the
    Rust string slice is not guaranteed to be null terminated and so the
    C code might read off the end of a string. Instead always construct
    a CString.

  * Don't allow the user to specify an `errstr_size`. Crate convention
    is to use an `ErrBuf` with a fixed size of 512.

  * Move the `Config::use_token_refresh_cb` method to an associated
    constant on the `ClientContext`, so that the configuration of the
    token refresh callback is entirely contained within `ClientContext.`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants