Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Async UDFs #483

Merged
merged 2 commits into from Jan 16, 2024
Merged

Add Async UDFs #483

merged 2 commits into from Jan 16, 2024

Conversation

jbeisen
Copy link
Collaborator

@jbeisen jbeisen commented Jan 10, 2024

Add a new 'AsyncMapOperator' that applies an async udf to its input.

Async UDFs use the existing interface for defining the UDF and are called in SQL the same as non-async UDFs. Options are specified as a TOML configuration block in the special comment with the dependencies.

Here's an example of an Async UDF:

/*
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-postgres = "0.7"

[udfs]
async_results_ordered = true
async_timeout_seconds = 5
async_max_concurrency = 10

*/

use tokio_postgres::{NoTls, Error};


pub async fn user_name_from_id(id: i64) -> String {
    let conn_str = "host=localhost user=arroyo password=arroyo dbname=my_db";

    let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await.unwrap();

    tokio::spawn(async move {
        if let Err(e) = connection.await {
            println!("connection error: {}", e);
        }
    });

    let rows = client
        .query("SELECT name FROM users WHERE id = $1", &[&id])
        .await
        .unwrap();

    return if let Some(row) = rows.get(0) {
        row.get(0)
    } else {
        format!("Unknown-{}", id)
    };
}

which can be used in a query like this:

select user_name_from_id(person.id) as name from nexmark where person is not null;

I've left these things for a future commit:

  • Retries: if the UDF panics we should allow retrying it certain number of times
  • Async UDF Context: A trait that users can implement to perform setup and shutdown behavior, that each UDF invocation will have access to

@jbeisen jbeisen force-pushed the async-udfs branch 4 times, most recently from a510b56 to 0fb3132 Compare January 10, 2024 23:26
@jbeisen jbeisen marked this pull request as ready for review January 10, 2024 23:36
@jbeisen jbeisen changed the title Add async UDFs Add Async UDFs Jan 11, 2024
Copy link
Contributor

@jacksonrnewhouse jacksonrnewhouse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a couple stylistic things I'd like to see addressed.

arroyo-datastream/src/lib.rs Outdated Show resolved Hide resolved
arroyo-sql/src/operators.rs Outdated Show resolved Hide resolved
arroyo-worker/src/operators/async_map.rs Outdated Show resolved Hide resolved
@mwylde
Copy link
Member

mwylde commented Jan 11, 2024

This query is panicking in the planner:

create table logs (
    ip TEXT,
    city TEXT GENERATED ALWAYS AS (get_city(ip))
) with (
    connector = 'sse',
    endpoint = 'http://127.0.0.1:9563/sse',
    format = 'json'
);

select city
from logs;

with

2024-01-11T23:52:08.418511Z ERROR arroyo_server_common: panicked at arroyo-sql/src/expressions.rs:269:21:
async udf appeared in a non-projection context panic.file="arroyo-sql/src/expressions.rs" panic.line=269 panic.column=21
2024-01-11T23:52:08.418876Z  WARN arroyo_api::pipelines: failed to generate SQL program

It also panics (in a different way) if you move the computation into the query:

create table logs (
    ip TEXT
) with (
    connector = 'sse',
    endpoint = 'http://127.0.0.1:9563/sse',
    format = 'json'
);

select get_city(ip)
from logs;
2024-01-11T23:54:35.174447Z ERROR arroyo_server_common: panicked at /Users/mwylde/.cargo/registry/src/index.crates.io-6f17d22bba15001f/proc-macro2-1.0.67/src/fallback.rs:774:9:
"get_city(logs.ip)" is not a valid Ident panic.file="/Users/mwylde/.cargo/registry/src/index.crates.io-6f17d22bba15001f/proc-macro2-1.0.67/src/fallback.rs" panic.line=774 panic.column=9

It works if you alias it (get_city(ip) as city)

@mwylde
Copy link
Member

mwylde commented Jan 12, 2024

Also panics if you try to group by an async udf:

select 
    get_city(logs.ip) as city, 
    count(*)
from logs
group by 1;

or use it in a WHERE clause:

select 
    get_city(logs.ip) as city
    from logs
    where get_city(logs.ip) = 'San Francisco';
2024-01-12T00:06:33.748816Z ERROR arroyo_server_common: panicked at arroyo-sql/src/expressions.rs:269:21:
async udf appeared in a non-projection context panic.file="arroyo-sql/src/expressions.rs" panic.line=269 panic.column=21
2024-01-12T00:06:33.749935Z  WARN arroyo_api::pipelines: failed to generate SQL program

output_assignments: String,
null_handlers: String,
return_nullable: bool,
timeout_seconds: u64,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be a Duration to avoid unit issues?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so because it gets deserialized from the toml in the UDF definition.

arroyo-macro/src/lib.rs Outdated Show resolved Hide resolved
@mwylde
Copy link
Member

mwylde commented Jan 12, 2024

This is really great! I'm so excited to see how people use this.

arroyo-rpc/src/lib.rs Outdated Show resolved Hide resolved
jbeisen and others added 2 commits January 15, 2024 15:49
Add a new 'AsyncMapOperator' that applies an async UDF to its input.

Async UDFs use the existing interface for defining the UDF and are
called in SQL the same as non-async UDFs. Options are specified as a
TOML configuration block in the special comment with the dependencies.
@jbeisen jbeisen merged commit 9b813bf into master Jan 16, 2024
8 checks passed
@jbeisen jbeisen deleted the async-udfs branch January 16, 2024 01:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants