Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable user-defined context structs for async UDFs #487

Merged
merged 1 commit into from Jan 16, 2024

Conversation

jbeisen
Copy link
Collaborator

@jbeisen jbeisen commented Jan 12, 2024

Define an async trait UdfContext that users can implement in their UDF definition. The struct can store shared resources used by each invocation of the UDF, like a database connection. An Arc pointer to the context is passed as the first argument to the UDF.

Here's how the example async udf from #483 can be modified to use the context:

/*
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-postgres = "0.7"
async-trait = "0.1.68"

[udfs]
async_results_ordered = true
async_max_concurrency = 10
async_timeout_seconds = 5
*/

use tokio_postgres::{NoTls, Error, Client};
use async_trait::async_trait;
use arroyo_types::UdfContext;
use std::sync::Arc;
use tokio::sync::RwLock;

pub struct Context {
    client: RwLock<Option<Client>>,
}

impl Context {
    pub fn new() -> Self {
        Self {
            client: RwLock::new(None),
        }
    }
}

#[async_trait]
impl UdfContext for Context {
    async fn init(&self) {
        let conn_str = "host=localhost user=arroyo password=arroyo dbname=my_db";

        let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await.unwrap();

        let mut c = self.client.write().await;
        *c = Some(client);

        tokio::spawn(async move {
            if let Err(e) = connection.await {
                println!("connection error: {}", e);
            }
        });
    }
}

pub async fn user_name_from_id(context: Arc<Context>, id: i64) -> String {
    let client = context.client.read().await;
    let rows = client
        .as_ref()
        .unwrap()
        .query("SELECT name FROM users WHERE id = $1", &[&id])
        .await
        .unwrap();

    return if let Some(row) = rows.get(0) {
        row.get(0)
    } else {
        format!("Unknown-{}", id)
    };
}

Note that the struct that implements UdfContext must be called Context. I considered making this flexible but didn't think of a good reason to do so.

@jbeisen jbeisen force-pushed the async-udfs branch 3 times, most recently from 665094f to 9085f50 Compare January 12, 2024 22:45
@jbeisen jbeisen force-pushed the async-udf-context branch 2 times, most recently from 95791ce to 41cfe7a Compare January 13, 2024 00:52
@jbeisen jbeisen marked this pull request as ready for review January 15, 2024 17:39
@jbeisen jbeisen force-pushed the async-udf-context branch 3 times, most recently from 547ea35 to f1a30c6 Compare January 15, 2024 23:41
Base automatically changed from async-udfs to master January 16, 2024 01:39
Define an async trait 'UdfContext' that users can implement in their UDF
definition. The struct can store shared resources used by each
invocation of the UDF, like a database connection. An Arc pointer to the
context is passed as the first argument to the UDF.
@jbeisen jbeisen enabled auto-merge (rebase) January 16, 2024 01:40
@jbeisen jbeisen merged commit 4222981 into master Jan 16, 2024
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants