Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async External Task Handler Function in run_topic_handlers #11

Open
IdemenB opened this issue Apr 14, 2021 · 12 comments
Open

Async External Task Handler Function in run_topic_handlers #11

IdemenB opened this issue Apr 14, 2021 · 12 comments

Comments

@IdemenB
Copy link

IdemenB commented Apr 14, 2021

Hi all,

In addition to the camunda_client crate I'm utilizing this lib of yours, too. Thank you very much! I've run into a problem which is rather advanced for my level in Rust, I'll appreciate if you can take a look.

The use case is as followsİ

  1. I have tens of external tasks to be handled from numerous different processes. Hence, I have more than one external task "Handler"s.
  2. Because there are multiple Handlers, I'd like to run each together through run_topic_handlers(). To make this run, we should wrap the main() function with a runtime, such as tokio #[tokio::main].
  3. Some of my external task handlers are themselves running on run times. The one particular example is one external task handlers which uses the SQLx library to connect to a Postgres DB to do certain queries and inserts. SQLx itself requires a runtime.
  4. This leads to "'main' panicked at 'Cannot start a runtime from within a runtime." panic.

On the SO, some users suggested to use tokio::task::spawn_blocking. So I've converted run_topic_handlers() into the following, but it still panics where I highlight :/

pub async fn run_topic_handlers<F>(config: &Config, handlers: Vec<Handler<F>>)
where
    F: Fn(LockedExternalTaskDto) -> HandlerResult + Send + 'static,
{
    let mut task_handles = Vec::new();

    for handler in handlers {
       
        let config = config.clone();
        let task_handle =
**// spawn_blocking is used here**
            tokio::task::spawn_blocking(|| async move { run_topic_handler(&config, handler) })
                .await
                .expect("Spawning 'handler functions' in tokio thread pool panicked!");
        task_handles.push(task_handle);
    }

    for task_handle in task_handles {
**// panics here**
        task_handle.await;
    }
}
@Dansvidania
Copy link
Owner

Hi @IdemenB, I'd like to take a look at this issue. Do you have a minimal example of what you are trying to accomplish with SQLx so that I can reproduce the error and start from there?

@IdemenB
Copy link
Author

IdemenB commented Apr 17, 2021

Hi @Dansvidania , if you can clone the master branch from here https://github.com/IdemenB/camunda_lib_tests/blob/master/src/main.rs you'll see two handler functions initiated from main.rs . One of those functions make async sleep for 2 secs, another for 3 secs. I believe this will give the runtime within runtime error.

There is also a commented SQLx example in the handlers.rs file. You can switch from sleep into that. You'll need to install a sample DB for that though.

@Dansvidania
Copy link
Owner

The app in the repo seems not to be using this library so I got confused.

So the issue is reproduced by, if I understand correctly, having two different handlers sleep different amounts of time? I will start from that.

@IdemenB
Copy link
Author

IdemenB commented Apr 17, 2021

Oh I'm sorry, this is a like a sandbox project I created by using the lib code initially. Because there are many different things I'm constantly trying and testing, very new in Rust (and also not a professional programmer) and don't know Git flows very well that's why I did that.

By the way, I may have found the solution for this. The problem is that, in run_handler_functions we spawned a new (async) thread for each run_handler_function which in turn they spawn their own (async) threads. But, in essence run_handler_functions are blocking.

This is not allowed by tokio-runtime. If you cannot find another and better solution, you can try to initiate blocking spawns in run_handler_functions (tokio::task::spawn_blocking) first, and than the rest should work.

@IdemenB
Copy link
Author

IdemenB commented Apr 17, 2021

And one other thing I believe should be fixed is, the following loop in worker.rs

  for task_handle in task_handles {
        task_handle.await.unwrap();
    }

The spawned blocking threads should not be awaited because once they are spawn they will run indefinetely because run_handler_function() is a constant loop.

@Dansvidania
Copy link
Owner

I agree. The code running multiple topic handlers needs changing. I will try to play around with this a while and keep you posted.

@Dansvidania
Copy link
Owner

I am considering focusing the library functionality on worker::run_topic_handler, scrapping the concurrent/parallel run_topic_handlers and basically leaving it to the consumers of the library to decide if and how to parallelize the execution of different topic handlers in the same worker.

My initial objective was to handle and hide the complexity from the consumers, but it seems to be creating more issues than anything else. This would simplify the architecture of the library itself while also giving more control to the consumers.

What do you think?

@IdemenB
Copy link
Author

IdemenB commented Apr 18, 2021

My suggestion would be the opposite :), and here is why. I can't think of a production scenario where there will be only a single type of external task in the universe of the company's business processes. If someone is dealing with these matters, especially in Rust, there is a lot of work and some concurrency in mind. So, this library should focus on easy management of concurrent workers. My tests so far, in the way I described in the previous message is working well with multiple Handlers. I even tried to instantiate the same handler twice, and it worked, too.

In short, if we run the run_topic_handler() functions from the run_topic_handlers() with tokio::spawn_blocking() it seems to be working fine.

My sample:

pub async fn run_topic_handlers<F>(config: &Config, handlers: Vec<Handler<F>>)
where
    F: Fn(LockedExternalTaskDto) -> HandlerResult + Send + 'static,
{
    // let mut task_handles = Vec::new();

    for handler in handlers {
        let config = config.clone();
        let now: DateTime<Local> = Local::now();
        spawn_blocking(move || run_topic_handler(&config, handler));
        //     task_handles.push(task_handle);
    }

    // for task_handle in task_handles {
    //     println!("panic point because of 'run time within run time error'");
    //     task_handle.await.unwrap();
    // }
}
pub fn run_topic_handler<F>(config: &Config, handler: Handler<F>)
where
    F: Fn(LockedExternalTaskDto) -> HandlerResult,
{
    let engine = Engine::new(config);

    // a unique worker id is generated by using the current time and thread id.
    let topic_name = &handler.topic;
    let current_thread_id = thread_id::get().to_string();

    let worker_id = "Worker-".to_owned() + &current_thread_id + &"-".to_owned() + &topic_name;
    loop {
        // TODO lock duration should be configured per each worker
        match &engine.lock_task(&handler.topic, &worker_id, Some(1i32), Some(3000i64)) {
            Ok(tasks) => {
                if tasks.is_empty() {
                    // wait(config.wait_interval);
                    println!("No task to fetch and lock for {:?}", worker_id);
                    continue;
                }
                // println!("No. of received locked tasks :{:?}", tasks.len());
                // println!("Tasks:{:?}", tasks);
                //TODO we will be processing multiple tasks by locking multiple external tasks. this should be made a loop.
                let task = &tasks[0];
                // println!("Received Locked External Task: {:?}", task);
                let result = (handler.handler_function)(task.to_owned());

                match result {
                    Ok(variables) => {
                        complete_task(&engine, &task, &worker_id, variables);
                        println!("handler function is completeted");
                    }
                    Err(topic_handler_error) => {
                        handle_topic_handler_failure(&engine, &task, topic_handler_error)
                    }
                }
            }

            Err(lock_task_error) => {
                handle_lock_task_error(lock_task_error);
                // print the error message
                println!("Error: {:?}", lock_task_error);
            }
        }

        //wait not needed, see asyncResponeTimeout in the Camunda documentation

        // wait(config.wait_interval);
    }
}

and one sample handler function of SQLx example -> this is in itself aync

#[tokio::main]
pub async fn query_orders_handler_function(_locked_task: LockedExternalTaskDto) -> HandlerResult {
    //TODO Pool shold be moved outside and centralizd
    let pool = PgPoolOptions::new()
        .max_connections(5)
        .connect("postgres://postgres:postgres@192.168.1.35:5532/dellstore")
        .await?;

    let now: DateTime<Local> = Local::now();
    println!("query_orders_handler_function is triggered at {:?}", now);

    // println!("{:?}", pool);

    // fetch all orders
    #[derive(Serialize, Debug)]
    struct Order {
        customerid: Option<i32>,
        firstname: Option<String>,
        lastname: Option<String>,
        orderid: Option<i32>,
        totalamount: Option<Decimal>,
        orderdate: Option<NaiveDate>,
        invoice: Option<i32>,
    }
    // select orders without invoice
    let cursor1 = sqlx::query_as_unchecked!(
        Order,
        "
        SELECT c.customerid, c.firstname, c.lastname, o.orderid, o.totalamount, o.orderdate, o.invoice  from orders o
        inner join customers c on o.customerid = c.customerid
        full outer join invoices i on o.invoice = i.invoice_id
        WHERE o.invoice is null or i.processing_result = 's'
        LIMIT 50
        ",
    )
    .fetch_all(&pool).await?;

    //process fetched data into arrays such that a single batch insert can be
    //executed, instead of going to the DB one by one for inserting records

    // println!("cursor1: {:?}", cursor1);
    let mut orderid_array = Vec::new();

    let mut orderdate_array = Vec::new();
    let mut invoice_array = Vec::new();

    for row in &cursor1 {
        orderid_array.push(row.orderid);
        orderdate_array.push(row.orderdate);
        invoice_array.push(row.invoice);
    }

    // copy orders ito your own db (this is to simulate some business need)
    let _cursor2 = sqlx::query_unchecked!(
        "INSERT INTO fetched_orders (orderid, orderdate, invoice) VALUES (
        UNNEST($1::int[]),
        UNNEST($2::date[]),
        UNNEST($3::int[])
    )",
        &orderid_array,
        &orderdate_array,
        &invoice_array
    )
    .execute(&pool)
    .await?;

    let mut fetched_orders_object = Vec::new();

    for row in cursor1 {
        fetched_orders_object.push((row.orderid.unwrap(), row.totalamount.unwrap()));
    }

    let process_variables = utils::generate_process_variables(vec![(
        "fetched_orders".to_owned(),
        json!(&fetched_orders_object),
    )]);


    let now2: DateTime<Local> = Local::now();
    println!("query_orders_handler_function is ended at {:?}", now2);

    Ok(process_variables)
}

main.rs


[tokio::main]
async fn main() {
    // TODO should parsed from a config file or environment. worker_id should not be named here
    // but in run_topic_handlers per each spawned handler
    let config = Config::new(
        "http://localhost:8080/engine-rest".to_owned(),
        "demo".to_owned(),
        "demo".to_owned(),
    );

    let query_orders_handler: Handler<fn(LockedExternalTaskDto) -> HandlerResult> = Handler {
        topic: "queryOrders".to_owned(),
        handler_function: handlers::query_orders_handler_function,
    };

    let process_invoice_handler: Handler<fn(LockedExternalTaskDto) -> HandlerResult> = Handler {
        topic: "processInvoice".to_owned(),
        handler_function: handlers::process_invoice_handler_function,
    };

    let relate_invoice_with_order_handler: Handler<fn(LockedExternalTaskDto) -> HandlerResult> =
        Handler {
            topic: "relateInvoiceWithOrder".to_owned(),
            handler_function: handlers::relate_invoice_with_order_handler_function,
        };

    let send_invoice_order_handler: Handler<fn(LockedExternalTaskDto) -> HandlerResult> = Handler {
        topic: "sendInvoice".to_owned(),
        handler_function: handlers::send_email_handler_function,
    };

    let send_invoice_order_handler2: Handler<fn(LockedExternalTaskDto) -> HandlerResult> =
        Handler {
            topic: "sendInvoice".to_owned(),
            handler_function: handlers::send_email_handler_function,
        };

    //TODO make the number of workers for each topic parametric
    let handlers_vec = vec![
        query_orders_handler,
        process_invoice_handler,
        relate_invoice_with_order_handler,
        send_invoice_order_handler,
        send_invoice_order_handler2,
    ];

    worker::run_topic_handlers(&config, handlers_vec).await;
}

@Dansvidania
Copy link
Owner

Dansvidania commented Apr 18, 2021

I can't think of a production scenario where there will be only a single type of external task in the universe of the company's business processes.

I agree with your premise, but my point is that it does not seem very complex to just parallelize the execution of run_topic_handler in the consumer application itself, leaving freedom to the consumer to use their own runtime and compose the libraries as they prefer.

The documentation of spawn_blocking and this issue seem to hint to the fact that it should be used with caution, and I do not feel like I know enough at the moment to judge when/how it is appropriate. The fact that it works in your use case is great, but it does not guarantee that it'll work as expected in other use cases. Again, this stems mostly from my ignorance.

What I think I will end up doing is moving tokio and the parallel topic handler executor to a different crate/functionality, so that it can be opted in by users and it can be experimented with during the development of camunda-worker

@IdemenB
Copy link
Author

IdemenB commented Apr 18, 2021

Yes, you're right. These are delicate areas and I'm so far away from being an expert in them, either.

Can you think of an implementation where there is no handlerS() but just a single handler, but it is async and also the handler_function it triggers is also async. If you can set the baseline with that the library would really be helpful to start with.

@Dansvidania
Copy link
Owner

I will try to configure the library so that it will be easy for consumers to use tokio::spawn to use multiple run_topic_handler in parallel. I am not sure if that means that they need to be async though.

I need to study a bit more and find the proper way to do that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants
@Dansvidania @IdemenB and others