Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ideas for structuring consumer definitions and isolating independent components #156

Open
cmanallen opened this issue Dec 7, 2022 · 5 comments
Labels
enhancement New feature or request help wanted Extra attention is needed

Comments

@cmanallen
Copy link
Member

cmanallen commented Dec 7, 2022

Arroyo Ideas

Intro

I’m thinking of this as more of a discussion point and not a definitive answer to the question: “how should we structure pipelines in our consumer”. If you like it – great, wants changes – great, or think its useless – that’s fine too! I’m happy with the current Arroyo.

Goal

Users of the library should rarely define strategies. Not because its bad but because its unnecessary. We should be able to expose 99% of the range of behavior as primitives. The user can then mix and match primitives to achieve a result.

Another goal is to remove all thinking about Kafka. Allow the maintainers of arroyo primitives (in sentry or arroyo) to define sane interactions with the consumer that can be blindly reused by junior developers.

Problems with the current implementation

“Problem” is used loosely here. The current implementation is really good. This refactor changes how steps are composed and normalizes some naming to “XYZ Step” or “XYZ Pipeline”. I think doing so gives better reuse and clearer semantics for new users trying to implement their consumers.

Naming can be updated. These are just examples. Any naming outside of the “Step” and “Pipeline” suffixes should be ignored. The definition language is the most important aspect.

Thoughts About the Data Model

The current implementation uses the decorator pattern. Behavior encloses other behavior. Whereas a pipeline is generally thought of as "my output is your input". Behavior is not enclosed, components are distinct.

I will illustrate this. Consider a pipeline of three functions A, B, and C. The way a developer would mentally conceive of this pipeline is illustrated below:

Input => A => B => C => Output

However, Arroyo defines pipelines that generate this call stack:

Input => A => B => C => B => A => Output

This difference can be described in code. You can see the inversion of the function ordering. This is required to satisfy the decorator pattern. A component must be defined after and execute before its dependency.

# Pipeline Pattern
A = lambda: ...
B = lambda: ...
C = lambda: ...

A()
B()
C()

# Decorator Pattern
C = lambda: ...
B = lambda: C()
A = lambda: B()

A()

This is not a problem per se. It could cause confusion in certain circumstances but I think with the right interface much of that confusion could be limited without changes to the underlying data model.

Desired implementation

Consider the following pipeline. A consumer receives a payload containing type and value fields. We want to multiply the message’s value field by 2 if it is of the multiply type. Otherwise we drop the message.

steps = [
    FilterStep(lambda msg: msg["type"] == "multiply"),
    ApplyStep(lambda msg: msg["value"] = msg["value"] * 2)  # this is broken psuedo code.  pretend it returns the mutated msg.
]

The steps are defined as a list and not a linked-list. The steps function identically to the current steps and implement the submit, poll workflow.

FilterStep removes items from processing. ApplyStep applies functions to a message and returns the output. These are provided by Arroyo. The lambdas are user-defined. Notice the steps do not enclose one another. They are totally independent and can be composed in any order.

However, this is not a complete pipeline. How are the steps called? For this we need another layer.

steps = [
    FilterStep(lambda m: m["type"] == "multiply"),
    ApplyStep(lambda m: m["value"] = m["value"] * 2)
]

pipeline = SynchronousPipeline(steps=steps)
pipeline.submit(message)
pipeline.poll()

The SynchronousPipeline would be provided by Arroyo in this hypothetical. It works how you would think. It calls submit and poll on all its steps. At this layer we can do things like catch exceptions. We can manage backpressure.

Almost done but not quite. We need to commit the message.

pipeline = SynchronousPipeline(
    steps=[
        FilterStep(lambda m: m["type"] == "multiply"),
        ApplyStep(lambda m: m["value"] = m["value"] * 2)
    ],
    next_step=CommitStep(commit)
)

pipeline.submit(message)
pipeline.poll()

Pipelines implement a linked list pattern for accessing next steps. They function almost identically to the current Arroyo implementation. Pipelines decorate their next steps. Pipeline steps (the sub-steps defined as a list) do not decorate one another and are independent components.

Why isn’t CommitStep in the steps list? Well it can be but it makes sense to call it as a linked next_step. This will become more clear in the next example. next_step could also be a nested pipeline.

Real world example (Replays)

Replays is like attachments. Chunked messages come in. A capstone comes in. The capstone commits. If the capstone fails the chunks can be retried because they never commit. If a message raises an exception, it should log but NOT commit.

# Catches exceptions and logs them. Exception prevents commit.
pipeline = LogExceptionPipeline(
    message="Recording pipeline failed.",
    steps=[
        # Unpack the message.
        TransformStep(msgpack.unpackb),
        # Cache it if its the right type.
        TransformStep(cache_if_chunk),
        # Filter out the messages we just cached.  They should not be committed.
        FilterStep(filter_chunk_message),
        # In a thread store the chunks in GCS (chunks are fetched from cache).
        RunTaskInThreads(
            store_chunks,
            concurrency=16,
            max_pending_futures=32,
        ),
    ],

    # Next step is composed the same way Arroyo composes now.
    next_step=CommitStep(commit),  # Can be a pipeline or a step.
)
return pipeline

With the CommitStep defined as a next step it exists outside of the try catch pipeline and is unimpacted by its behavior. In other words, the CommitStep is free to raise.

Compare to the current arroyo implementation:

# Commits are made in batches.
commit_strategy = CommitOffsets(commit)
# In a thread store the chunks in GCS (chunks are fetched from cache).
store_strategy = RunTaskInThreads(
    processing_function=store_chunks,
    concurrency=16,
    max_pending_futures=32,
    next_step=commit_strategy,
)
# Filter out the messages we just cached.  They should not be committed.
filter_chunks_strategy = FilterStep(filter_chunks, next_step=store_strategy)
# Cache it if its the right type.
cache_strategy = TransformStep(cache_if_chunk, next_step=filter_chunks_strategy)
# Unpack the message.
deserialize_strategy = TransformStep(deserialize, next_step=cache_strategy)
# Catches exceptions and logs them.  Pipeline stops.  Commit is never reached.
log_error_strategy = LogExceptionStep(next_step=deserialize_strategy)

return log_error_strategy

Its less clear how things are composed. How Steps interact with one another. By refactoring the steps to be more isolated, we can clearly understand how all of the components interact with one another. The proposed implementation also has the benefit of presenting the pipeline steps in order.

Internal of SynchronousPipeline

# THIS IS PSUEDO CODE :D

class SynchronousPipeline:
    def __init__(self, steps: list[Callable], next_step=None):
        self.steps = steps
        self.next_step = next_step

    def submit(self, message):
        for step in self.steps:
            message = step(message)

        if self.next_step:
            self.next_step.submit(message)

    def poll(self):
        for step in self.steps:
            step.poll()

        if self.next_step:
            self.next_step.poll()

Random Notes

  • Step types do not have a next_step property (they do but its always None). Only pipelines can move to a next step.
@cmanallen
Copy link
Member Author

cmanallen commented Dec 8, 2022

This is a functioning partial implementation of the idea. https://github.com/getsentry/sentry/blob/replays-use-run-task-strategy/src/sentry/replays/consumers/recording/factory.py

return Pipeline(
    steps=[
        # Catch and log any exceptions that occur during processing.
        Partial(LogExceptionStep, message="Invalid recording specified."),
        # Deserialize the msgpack payload.
        Apply(deserialize),
        # Initialize a sentry transaction.
        Apply(init_context),
        # Cache chunk messages.
        Apply(cache_chunks),
        # Remove chunk messages from pipeline.  They should never be committed.
        Filter(filter_chunks),
        # Run the capstone messages in a thread-pool.
        Partial(
            RunTaskInThreads,
            processing_function=store,
            concurrency=16,
            max_pending_futures=32,
        ),
    ],
    # Batch capstone messages and commit when called.
    next_pipeline=CommitOffsets(commit),
)

And this is the supporting library code.

def Apply(
    function: Callable[[Message[TPayload]], TReplaced]
) -> Callable[[ProcessingStrategy[TReplaced]], TransformStep[TPayload]]:
    return lambda next_step: TransformStep(function=function, next_step=next_step)


def Filter(
    function: Callable[[Message[TPayload]], bool]
) -> Callable[[ProcessingStrategy[TPayload]], FilterStep[TPayload]]:
    return lambda next_step: FilterStep(function=function, next_step=next_step)


def Partial(
    strategy: Callable[[ProcessingStrategy[TReplaced]], ProcessingStrategy[TPayload]],
    **kwargs: Any,
) -> Callable[[ProcessingStrategy[TReplaced]], ProcessingStrategy[TPayload]]:
    return lambda next_step: strategy(next_step=next_step, **kwargs)


def Pipeline(
    steps: List[Callable[[ProcessingStrategy[TPayload]], ProcessingStrategy[TReplaced]]],
    next_pipeline: Optional[ProcessingStrategy[TPayload]] = None,
) -> ProcessingStrategy[TPayload]:
    if not steps:
        raise ValueError("Pipeline misconfigured.  Missing required step functions.")

    return functools.reduce(
        lambda prev_step, step_fn: step_fn(prev_step),
        sequence=reversed(steps),
        initial=next_pipeline,
    )

I'm currying the __init__ method.

Having done this I think we can simplify the ask in the original issue significantly. This system can be supported without library code if we allow the next_step property to be defined after initialization. Then we could initialize these classes in the steps list and in the Pipeline function run a set_next_step(ns) method.

@untitaker
Copy link
Member

untitaker commented Jun 6, 2023

i'm curious how this could be made type-safe so that you can have strategies transform T1 into T2, then ensure that the next step actually accepts T2 as payload. that step potentially returns a T3. in your examples, every payload passed to each step in Pipeline takes the same input and output message type.

API like this would probably work:

return Pipeline()
    .and_then(lambda next_step: FilterStep(next_step, function=filter_chunks))
    .and_then(lambda next_step: RunTask(next_step, function=preprocess_chunks))
    .and_then(lambda next_step: RunTask(next_step, function=process_chunks))
    .finish(CommitOffsets(commit))

you want to somehow ensure that the first RunTask returns a payload type that is compatible with the second RunTask. And that after that you are not finishing with a strategy that takes the wrong type.

I am not even sure if type inference works properly even if you use the builder pattern.

the closures/lambdas are probably necessary because when forwarding **kwargs, the type information about the parameters disappears

@untitaker
Copy link
Member

also @cmanallen i see that the file in your OP no longer exists. have you stopped pursuing this style altogether, and for what reason?

@untitaker untitaker added enhancement New feature or request help wanted Extra attention is needed labels Jun 6, 2023
@cmanallen
Copy link
Member Author

cmanallen commented Jun 6, 2023

@untitaker I adopted the existing formula and eventually the consumer was so simplified that the pipeline concept was not needed.

@cmanallen
Copy link
Member Author

cmanallen commented Jun 6, 2023

@untitaker I have stopped pursuing this. In general the less code, frameworks, boilerplate, etc. to learn the better. I'd much rather duplicate simple work than have to learn highly abstracted library code. Having lots of abstractions between the developer and Kafka is just going to slow people down.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

2 participants