Skip to content

Latest commit

 

History

History
269 lines (194 loc) · 7.55 KB

RPC.md

File metadata and controls

269 lines (194 loc) · 7.55 KB

Building an RPC with Puff

Puff can distribute tasks among nodes. These tasks can be triggered from any puff instance, including your server or the command line.

This tutorial will show how to trigger tasks from the command line and strategies for returning the results.

Setup Puff Environment

Install and run Redis in the system or as part of docker. Next setup your Puff project.

cargo new my_puff_proj --bin
cd my_puff_proj
cargo add puff-rs
poetry new my_puff_proj_py
cd my_puff_proj_py
poetry add puff-py

and add cargo plugin to my_puff_proj/my_puff_proj_py/pyproject.toml

[tool.poetry.scripts]
run_cargo = "puff.poetry_plugins:run_cargo"

Option 1

The first option is to use task_queue.wait_for_task_result to get the result from a task. This is the simplest built-in method.

Update my_puff_proj/src/main.rs

use puff_rs::prelude::*;
use puff_rs::program::commands::{WaitForever, PythonCommand};

fn main() -> ExitCode {
    let rc = RuntimeConfig::default()
        .set_asyncio(true)
        .add_default_task_queue();

    Program::new("my_first_app")
        .about("This is my first app")
        .runtime_config(rc)
        .command(PythonCommand::new(
            "queue_task",
            "task_queue_example.put_task_in_queue",
        ))
        .command(WaitForever)
        .run()
}

And update my_puff_proj/my_puff_proj_py/__init__.py

from puff.task_queue import global_task_queue

task_queue = global_task_queue


def put_task_in_queue():
    task = task_queue.schedule_function(my_awesome_task_async, {"hello": "world"})

    result = task_queue.wait_for_task_result(task)
    print(f"{task} returned {result}")


async def my_awesome_task_async(payload):
    print(f"In async task {payload}")
    return payload["hello"]

Now run poetry run run_cargo wait_forever in one or more terminals and poetry run run_cargo queue_task in another terminal repeatedly. Your tasks should be distributed across all listening nodes.

Option 2: Improving responsiveness

wait_for_task_result polls for a result on an interval. This can cause a small delay to retrieve the result. This is because by default, Puff can't know whether you will clean up your tasks, so it stores all task results as a Redis value with a timeout. This value can only be polled and cannot be read in a "blocking" way.

Instead, you can use the blpop pattern to build a more responsive RPC where you know you will consume the result.

First enable redis in Puff:

    let rc = RuntimeConfig::default()
        .set_asyncio(true)
        .add_default_task_queue()
        .add_default_redis;

Update your code so that it instead stores None as the Task result and the real result is sent with lpush.

import secrets
from puff.task_queue import global_task_queue as task_queue
from puff.redis import global_redis as redis
from puff.json import dumpb


def put_task_in_queue():
    # Generate a random channel name to send the task.
    this_channel = secrets.token_hex(6)
    task = task_queue.schedule_function(example_task_lpush_async, {"channel": this_channel, "hello": "world"}, scheduled_time_unix_ms=1)

    # Block forever waiting for the task result
    result = redis.blpop(this_channel, 0)
    print(f"{task} returned {result}")


async def example_task_lpush_async(payload):
    channel = payload["channel"]
    # Do some work here...
    await redis.lpush(channel.encode("utf8"), dumpb({"my_result": f"lpop-{payload['hello']}"}))
    return None

Option 3: Utilize Pubsub Channels

Only one listener can retrieve the result with blpop, but sometimes multiple tasks need the result at the same time. You can use pubsub to accomplish this scenario.

Puff efficiently handles Pubsub for you, utilzing 1 reader connection per node, so that multiple tasks listening to channels on the same node, Puff smartly utilize Redis connections between them.

First enable redis in Puff:

    let rc = RuntimeConfig::default()
        .set_asyncio(true)
        .add_default_task_queue()
        .add_default_pubsub()
;

Now update your code to subscribe to channels:

import secrets
from puff.task_queue import global_task_queue as task_queue
from puff.pubsub import global_pubsub as pubsub


def put_task_in_queue():
    # Generate a random channel name to send the task.
    this_channel = secrets.token_hex(6)
    conn = pubsub.connection()
    conn.subscribe(this_channel)
    listener = task_queue.schedule_function(listen_for_result, {"channel": this_channel}, scheduled_time_unix_ms=1)
    task = task_queue.schedule_function(example_task_lpush_async, {"channel": this_channel, "hello": "world"})

    # Block forever waiting for the task result
    result = conn.receive()
    data = result.json()
    print(f"{task} returned {data}")
    task_queue.wait_for_task_result(listener)


async def listen_for_result(payload):
    channel = payload["channel"]
    conn = pubsub.connection()
    await conn.subscribe(channel)
    result = await conn.receive()
    data = result.json()
    print(f"listener got {data}")
    return None


async def example_task_lpush_async(payload):
    channel = payload["channel"]
    # Do some work here...
    await pubsub.publish_json_as(pubsub.new_connection_id(), channel, {"my_result": f"pubsub-{payload['hello']}"})
    return None

Bonus: Graphql Tasks

You can access all Puff resources from inside your tasks, including a configured GraphQL schema. This is useful if you want to make queries or reuse your logic from GraphQL in your tasks.

First add postgres and the gql_schema_class to your config in Rust.

    let rc = RuntimeConfig::default()
        .set_asyncio(true)
        .add_default_redis()
        .add_default_pubsub()

        .add_default_postgres()
        .set_gql_schema("my_puff_proj_py.Schema")
        .add_default_task_queue();

Now setup the GraphQL schema and modify your task function to call GrapQL.

import secrets
from dataclasses import dataclass
from puff.task_queue import global_task_queue as task_queue
from puff.pubsub import global_pubsub as pubsub
from puff.graphql import global_graphql as gql
from puff import sleep_ms


@dataclass
class HelloWorldResult:
    message: str
    original_value: int


@dataclass
class Query:
    @classmethod
    def hello_world(cls, context, /, my_input: int) -> HelloWorldResult:
        return HelloWorldResult(
            message=f"You said {my_input}",
            original_value=my_input
        )


@dataclass
class Mutation:
    pass


@dataclass
class Subscription:
    pass


@dataclass
class Schema:
    query: Query
    mutation: Mutation
    subscription: Subscription


GQL_QUERY = """
query HelloWorld($my_input: Int!) {
    hello_world(my_input: $my_input) {
       message
       original_value
   }
}
"""

def put_task_in_queue():
    # Generate a random channel name to send the task.
    this_channel = secrets.token_hex(6)
    conn = pubsub.connection()
    conn.subscribe(this_channel)
    task = task_queue.schedule_function(example_task_lpush_async, {"channel": this_channel, "query": GQL_QUERY, "variables": {"my_input": 56}})

    # Block forever waiting for the task result
    result = conn.receive()
    data = result.json()
    print(f"{task} returned {data}")
    task_queue.wait_for_task_result(listener)


async def example_task_lpush_async(payload):
    channel = payload["channel"]
    query = payload["query"]
    variables = payload["variables"]

    # Query the configured GQL schema.
    result = await gql.query(query, variables)

    await pubsub.publish_json_as(pubsub.new_connection_id(), channel, result)
    return None