Skip to content

Conversation

@evanh
Copy link
Member

@evanh evanh commented Jun 18, 2025

Rewrite the examples to use the chain style API. The billing example is migrated in a separate PR. Remove some of the helper files since they weren't being used outside of the example and weren't very complicated.

Also skip the alerts.py file since it uses a FlatMap, and that has to be implemented in a separate PR.

Rewrite the examples to use the chain style API. The billing example is migrated in a separate PR.
Remove some of the helper files since they weren't being used outside of the example and weren't
very complicated.
@evanh
Copy link
Member Author

evanh commented Jun 18, 2025

I couldn't get the typing to work for the FlatMap, I get these errors:

sentry_streams/sentry_streams/examples/alerts.py:28: error: Argument 2 to "apply" of "ExtensibleChain" has incompatible type "FlatMap[Never, Never]"; expected "Applier[Message[Event], Message[Never]]"  [arg-type]
sentry_streams/sentry_streams/examples/alerts.py:28: error: Argument "function" to "FlatMap" has incompatible type "Callable[[Message[Event]], list[TimeSeriesDataPoint]]"; expected "Callable[[Message[MutableSequence[Never]]], Never] | str"  [arg-type]
sentry_streams/sentry_streams/examples/alerts.py:34: error: Argument 2 to "apply" of "ExtensibleChain" has incompatible type "Reducer[int, list[TimeSeriesDataPoint], p95AlertData | CountAlertData]"; expected "Applier[Message[Never], Message[p95AlertData | CountAlertData]]"  [arg-type]
Found 3 errors in 1 file (checked 7 source files)

Comment on lines 14 to 37
@dataclass
class Span:
span_id: int
trace_id: int
duration: int
timestamp: int

def to_dict(self) -> dict[str, int]:
return {
"span_id": self.span_id,
"trace_id": self.trace_id,
"duration": self.duration,
"timestamp": self.timestamp,
}


def build_span(value: Message[bytes]) -> Span:
"""
Build a Span object from a JSON str
"""

d: dict[str, Any] = json.loads(value.payload)

return Span(d["span_id"], d["trace_id"], d["duration"], d["timestamp"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the schema from sentry-kafka-schemas and a Parser rather than a custom event type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we were trying to move away from sentry-kafka-schemas?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the replacement will have an interface that will be very close to that one. It should be a quick swap, we can use it till we replace it.

@fpacifici
Copy link
Collaborator

sentry_streams/sentry_streams/examples/alerts.py:28: error: Argument 2 to "apply" of "ExtensibleChain" has incompatible type "FlatMap[Never, Never]"; expected "Applier[Message[Event], Message[Never]]"  [arg-type]
sentry_streams/sentry_streams/examples/alerts.py:28: error: Argument "function" to "FlatMap" has incompatible type "Callable[[Message[Event]], list[TimeSeriesDataPoint]]"; expected "Callable[[Message[MutableSequence[Never]]], Never] | str"  [arg-type]
sentry_streams/sentry_streams/examples/alerts.py:34: error: Argument 2 to "apply" of "ExtensibleChain" has incompatible type "Reducer[int, list[TimeSeriesDataPoint], p95AlertData | CountAlertData]"; expected "Applier[Message[Never], Message[p95AlertData | CountAlertData]]"  [arg-type]
Found 3 errors in 1 file (checked 7 source files)

Yeah it seems the flatMap implementation is just plain broken and we are noticing it only now because the old Step api basically allows any type while the Chain api does some more reasonable type checking.

The issue is the following:

So I think there are two fixes to make. And I would do them in a separate PR:

  1. Change the return type of the function provided to the FlatMap to be iterable as we should get multiple messages out. Iterable seems a better idea than a Sequence.
  2. Make the input type more generic and do not impose a Batch structure.

What do you think ?

@evanh
Copy link
Member Author

evanh commented Jun 19, 2025

@fpacifici It mostly makes sense, just to clarify:

FlatMap takes a batch and produces the messages contained in the batch one by one. The "one by one" is correctly not captured in the output type.

This is confusing me a bit, I thought FlatMap was allowing a 1->N style of processing. So one message in can produce one or more messages out. If this is the case I agree that it shouldn't enforce a Batch style input.

I'm reluctant to have the output be a Generator however. I would think we'd want a more concrete data structure, in case the next step is written in Rust.

@evanh
Copy link
Member Author

evanh commented Jun 19, 2025

I removed the changes to FlatMap from this PR, I'll migrate that example in a separate PR.

@evanh evanh requested a review from fpacifici June 19, 2025 20:44
@fpacifici
Copy link
Collaborator

I'm reluctant to have the output be a Generator however. I would think we'd want a more concrete data structure, in case the next step is written in Rust.

We still generally wrap the function provided by the user into more python code before running rust. Example: https://github.com/getsentry/streams/blob/main/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py#L217-L235

This means we can still have an abstract structure like an Iterable and materializing it into something rust would understand.
This would give us a bit more flexibility in implementing optimizations.

@evanh evanh requested a review from a team as a code owner June 23, 2025 15:18
@evanh
Copy link
Member Author

evanh commented Jun 23, 2025

@fpacifici Could you take another look at this? The flatmap changes went into a separate PR, and I'd like to get this merged soon so it doesn't conflict too much with #152

@fpacifici
Copy link
Collaborator

@fpacifici Could you take another look at this? The flatmap changes went into a separate PR, and I'd like to get this merged soon so it doesn't conflict too much with #152

See my comment on broadcast.py. I don't think that pipeline works.

@evanh evanh requested a review from fpacifici June 24, 2025 19:22
@evanh evanh merged commit 26daf2d into main Jul 8, 2025
19 of 23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants