Skip to content

Conversation

@fpacifici
Copy link
Collaborator

@fpacifici fpacifici commented May 27, 2025

Based on #119

While buildign a RustOperatorDelegate for the RunTaskInMultiProcess
I noticed that the delegating logic was very similar to the Reduce
step so I make the OutputRetriever and the ReduceDelegate reusable.

This PR:

  • replaces the OutputRetriever and ReduceDelegate custom logic with
    functions passed to the two classes
  • Moves the generic version of the two into rust_step.py
  • Fixes the return type iof the RustOperatorDeelgate. We were returning
    Message, but that is a type Rust cannot understand.

@fpacifici fpacifici changed the base branch from main to fpacifici/actually_port_reduce May 27, 2025 23:59
@fpacifici fpacifici changed the title Generalize ReuceDelegate Generalize ReduceDelegate May 29, 2025
@fpacifici fpacifici force-pushed the fpacifici/actually_port_reduce branch 2 times, most recently from b0c89f5 to be45209 Compare May 30, 2025 23:04
@fpacifici fpacifici changed the base branch from fpacifici/actually_port_reduce to main May 31, 2025 06:23
@fpacifici fpacifici force-pushed the fpacifici/support_parallelism branch from 0e26379 to b10c52e Compare May 31, 2025 06:29
Comment on lines 266 to 268
- The Arroyo Reduce strategy is an Arroyo strategy, so it needs to be
fed with Arroyo messages, thus the adaptation logic from the
new Streaming platform message that the Rust code deals with.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the idea here, to make it simpler to adapt existing consumers built as arroyo tasks into the streaming platform? That seems like a great way to make the conversion process simpler.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it can be used for that as well.
Here the goal was to adapt Arroyo strategies, but a consumer in the end is a chain of strategies so that works as well.

TStrategyOut = TypeVar("TStrategyOut")


class OutputRetriever(ProcessingStrategy[TStrategyOut], Generic[TStrategyOut]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a nutshell, how is this OutputRetriever different from the old one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think I got it

make them available to rust.
The message flow looks like this:
1. The Rust Arroyo Strategy receives a streaming message to process.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for outlining the steps here. It is much clearer now

Copy link
Contributor

@ayirr7 ayirr7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This overall looks good to me. I'll wait for any others' who might have feedback before I approve

@fpacifici fpacifici merged commit e91cfcb into main Jun 15, 2025
17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants