Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamConsumer commite message from tokio spawn. #522

Closed
BruAPAHE opened this issue Dec 9, 2022 · 2 comments
Closed

StreamConsumer commite message from tokio spawn. #522

BruAPAHE opened this issue Dec 9, 2022 · 2 comments

Comments

@BruAPAHE
Copy link

BruAPAHE commented Dec 9, 2022

Hi,
I have used the example provided for the asynchronous processing. and tried commited message.
But I can't commited borrow message.

tokio = { version = "1.22.0", features = ["full"] }

rdkafka = { version = "0.29.0", features = ["cmake-build"] }



 pub async fn run_async_processor(brokers: String, group_id: String, topic: String, worker: i32) {
    let consumer: Arc<StreamConsumer> = Arc::new(ClientConfig::new()
        .set("group.id", &group_id)
        .set("bootstrap.servers", &brokers)
        .set("enable.auto.commit", "false")
        .create()
        .expect("Consumer creation failed")
    );
    consumer
        .subscribe(&[&topic])
        .expect("Can't subscribe to specified topic");
    let stream_processor = consumer.stream().try_for_each(|borrowed_message| {
        async move {
            let _ = match borrowed_message.payload_view::<str>() {
                None => "",
                Some(Ok(s)) => s,
                Some(Err(e)) => {
                    println!("Error while deserializing message payload: {:?}", e);
                    ""
                }
            };
            consumer.commit_message(&borrowed_message, CommitMode::Async).unwrap();
            Ok(())
        }
    });
    println!("Starting daemon consumer #{}", worker);

    stream_processor.await.expect("stream processing failed");
}

i get error

error[E0507]: cannot move out of `consumer`, a captured variable in an `FnMut` closure
  --> src/broker/mod.rs:26:20
   |
14 |       let consumer: Arc<StreamConsumer> = Arc::new(ClientConfig::new()
   |           -------- captured outer variable
...
25 |       let stream_processor = consumer.stream().try_for_each(|borrowed_message| {
   |                                                             ------------------ captured by this `FnMut` closure
26 |           async move {
   |  ____________________^
27 | |             let _ = match borrowed_message.payload_view::<str>() {
28 | |                 None => "",
29 | |                 Some(Ok(s)) => s,
...  |
35 | |             consumer.commit_message(&borrowed_message, CommitMode::Async).unwrap();
   | |             --------
   | |             |
   | |             variable moved due to use in generator
   | |             move occurs because `consumer` has type `Arc<StreamConsumer>`, which does not implement the `Copy` trait
36 | |             Ok(())
37 | |         }
   | |_________^ move out of `consumer` occurs here

error[E0505]: cannot move out of `consumer` because it is borrowed
  --> src/broker/mod.rs:25:59
   |
25 |     let stream_processor = consumer.stream().try_for_each(|borrowed_message| {
   |                            ----------------- ------------ ^^^^^^^^^^^^^^^^^^ move out of `consumer` occurs here
   |                            |                 |
   |                            |                 borrow later used by call
   |                            borrow of `consumer` occurs here
...
35 |             consumer.commit_message(&borrowed_message, CommitMode::Async).unwrap();
   |             -------- move occurs due to use in closure

Help me please.

@dxt736652384
Copy link

You can try adding Mutex for decoration, Arc is an immutable smart reference.

@BruAPAHE
Copy link
Author

@dxt736652384 yes, I resolve this problem. Thx!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants