Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impossible to send a FutureRecord without key #370

Closed
sycured opened this issue Jun 6, 2021 · 3 comments
Closed

Impossible to send a FutureRecord without key #370

sycured opened this issue Jun 6, 2021 · 3 comments

Comments

@sycured
Copy link

sycured commented Jun 6, 2021

Hi, I'm trying to send a payload without a key to Kafka and I'm obtaining this error:

cannot infer type for type parameter K

My IDE shows me this error:

type inside async fn body must be known in this context E0698 cannot infer type for type parameter K declared on the associated function send Note: the type is part of the async fn body because of this await

Without the key:

async fn create_json(data: String, producer: Data<FutureProducer>) {
    producer
        .send(
            FutureRecord::to("test").payload(&format!("{}", Json(AddResp { result: data }).result)),
            Timeout::Never,
        )
        .await;
}

But it's working with the key:

async fn create_json(data: String, producer: Data<FutureProducer>) {
    producer
        .send(
            FutureRecord::to("test")
                .payload(&format!(
                    "{}",
                    Json(AddResp {
                        result: data.clone()
                    })
                    .result
                ))
                .key(&data),
            Timeout::Never,
        )
        .await;
}

Versions:

  • rdkafka: 0.26 (cmake-build)
  • rustc: 1.52.1 (9bc8c42bb 2021-05-09)

Have you any idea?
Thanks

@Zarathustra2
Copy link

Look at the definition of the FutureProducer:

pub struct FutureRecord<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> {
    /// Required destination topic.
    pub topic: &'a str,
    /// Optional destination partition.
    pub partition: Option<i32>,
    /// Optional payload.
    pub payload: Option<&'a P>,
    /// Optional key.
    pub key: Option<&'a K>,
    /// Optional timestamp.
    pub timestamp: Option<i64>,
    /// Optional message headers.
    pub headers: Option<OwnedHeaders>,
}
[...]

    /// Sets the destination payload of the record.
    pub fn payload(mut self, payload: &'a P) -> FutureRecord<'a, K, P> {
        self.payload = Some(payload);
        self
    }

    /// Sets the destination key of the record.
    pub fn key(mut self, key: &'a K) -> FutureRecord<'a, K, P> {
        self.key = Some(key);
        self
    }

As you never set the key the compiler doesn't know the type of K when compiling the program. One way to solve this is by providing a type when initialising the variable.

So in your case you could solve it via:

async fn create_json(data: String, producer: Data<FutureProducer>) {
    let record: FutureRecord<String, String> = FutureRecord::to("test").payload(&format!("{}", Json(AddResp { result: data }).result));
    producer
        .send(
            record,
            Timeout::Never,
        )
        .await;
}

Now K has a type when compiling.

@sycured
Copy link
Author

sycured commented Jun 10, 2021

Ah, it was just that thing 😕 I see that I'm really new on rust… Thank you a lot

@sycured sycured closed this as completed Jun 10, 2021
@rdcm
Copy link

rdcm commented Nov 19, 2023

Hi, I have following method and expect random message distribution between consumers in consumer group, but all produced messages consumed by single consumer in group. How could I fix this? Removing .key("") not solve the issue.

    async fn produce(&self, message: &T) -> Option<()> {
        let json = serde_json::to_string(&message).ok()?;
        let record: FutureRecord<String, String> = FutureRecord::to(&self.topic).payload(&json);

        self.producer
            .send(record, Duration::from_secs(0))
            .await
            .ok()?;

        Some(())
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants