Skip to content

Commit 26daf2d

Browse files
authored
fix(examples): Rewrite examples using the chain style API (#147)
Rewrite the examples to use the chain style API. The billing example is migrated in a separate PR. Remove some of the helper files since they weren't being used outside of the example and weren't very complicated. Also skip the `alerts.py` file since it uses a FlatMap, and that has to be implemented in a separate PR.
1 parent c219813 commit 26daf2d

File tree

7 files changed

+613
-655
lines changed

7 files changed

+613
-655
lines changed
Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import json
22

3+
from sentry_streams.pipeline.message import Message
4+
35

46
class BroadcastFunctions:
57
"""
@@ -11,15 +13,15 @@ class BroadcastFunctions:
1113
"""
1214

1315
@staticmethod
14-
def no_op_map(value: str) -> str:
15-
return value
16+
def no_op_map(value: Message[bytes]) -> str:
17+
return value.payload.decode("utf-8")
1618

1719
@staticmethod
18-
def hello_map(value: str) -> str:
19-
name = json.loads(value)["name"]
20+
def hello_map(value: Message[str]) -> str:
21+
name = json.loads(value.payload)["name"]
2022
return f"Hello, {name}!"
2123

2224
@staticmethod
23-
def goodbye_map(value: str) -> str:
24-
name = json.loads(value)["name"]
25+
def goodbye_map(value: Message[str]) -> str:
26+
name = json.loads(value.payload)["name"]
2527
return f"Goodbye, {name}."
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import json
2+
from dataclasses import dataclass
3+
from typing import Self
4+
5+
from sentry_kafka_schemas.schema_types.snuba_spans_v1 import SpanEvent
6+
7+
from sentry_streams.pipeline.function_template import (
8+
Accumulator,
9+
)
10+
from sentry_streams.pipeline.message import Message
11+
12+
13+
@dataclass
14+
class Segment:
15+
total_duration: int
16+
spans: list[SpanEvent]
17+
18+
19+
def build_segment_json(message: Message[Segment]) -> str:
20+
"""
21+
Build a JSON str from a Segment
22+
"""
23+
value = message.payload
24+
d = {"segment": value.spans, "total_duration": value.total_duration}
25+
26+
return json.dumps(d)
27+
28+
29+
class SpansBuffer(Accumulator[Message[SpanEvent], Segment]):
30+
"""
31+
Ingests spans into a window. Builds a Segment from each
32+
window, which contains the list of SpanEvents seen as well
33+
as the total duration across SpanEvents.
34+
35+
TODO: Group by trace_id
36+
"""
37+
38+
def __init__(self) -> None:
39+
self.spans_list: list[SpanEvent] = []
40+
self.total_duration = 0
41+
42+
def add(self, value: Message[SpanEvent]) -> Self:
43+
self.spans_list.append(value.payload)
44+
self.total_duration += value.payload["duration_ms"]
45+
46+
return self
47+
48+
def get_value(self) -> Segment:
49+
return Segment(self.total_duration, self.spans_list)
50+
51+
def merge(self, other: Self) -> Self:
52+
self.spans_list = self.spans_list + other.spans_list
53+
self.total_duration = self.total_duration + other.total_duration
54+
55+
return self

sentry_streams/sentry_streams/examples/spans.py

Lines changed: 0 additions & 85 deletions
This file was deleted.
Lines changed: 26 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,10 @@
11
from datetime import timedelta
22

3-
from sentry_streams.examples.spans import SpansBuffer, build_segment_json, build_span
4-
from sentry_streams.pipeline.pipeline import (
5-
Aggregate,
6-
Map,
7-
Pipeline,
8-
StreamSink,
9-
StreamSource,
10-
)
11-
from sentry_streams.pipeline.window import TumblingWindow
12-
13-
pipeline = Pipeline()
3+
from sentry_kafka_schemas.schema_types.snuba_spans_v1 import SpanEvent
144

15-
source = StreamSource(
16-
name="myinput",
17-
ctx=pipeline,
18-
stream_name="events",
19-
)
20-
21-
map = Map(
22-
name="mymap",
23-
ctx=pipeline,
24-
inputs=[source],
25-
function=build_span,
26-
)
5+
from sentry_streams.examples.span_helpers import SpansBuffer, build_segment_json
6+
from sentry_streams.pipeline import Map, Parser, Reducer, StreamSink, streaming_source
7+
from sentry_streams.pipeline.window import TumblingWindow
278

289
# A sample window.
2910
# Windows are open for 5 seconds max
@@ -35,24 +16,26 @@
3516
# Make the trigger and closing windows synonymous, both
3617
# apparent in the API and as part of implementation
3718

38-
reduce = Aggregate(
39-
name="myreduce",
40-
ctx=pipeline,
41-
inputs=[map],
42-
window=reduce_window,
43-
aggregate_func=SpansBuffer,
44-
)
45-
46-
map_str = Map(
47-
name="map_str",
48-
ctx=pipeline,
49-
inputs=[reduce],
50-
function=build_segment_json,
51-
)
52-
53-
sink = StreamSink(
54-
name="kafkasink",
55-
ctx=pipeline,
56-
inputs=[map_str],
57-
stream_name="transformed-events",
19+
pipeline = (
20+
streaming_source(name="myinput", stream_name="events")
21+
.apply("mymap", Parser(msg_type=SpanEvent))
22+
.apply(
23+
"myreduce",
24+
Reducer(
25+
window=reduce_window,
26+
aggregate_func=SpansBuffer,
27+
),
28+
)
29+
.apply(
30+
"map_str",
31+
Map(
32+
function=build_segment_json,
33+
),
34+
)
35+
.sink(
36+
"kafkasink",
37+
StreamSink(
38+
stream_name="transformed-events",
39+
),
40+
)
5841
)
Lines changed: 37 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,48 @@
1-
from sentry_streams.examples.word_counter_fn import (
2-
EventsPipelineFilterFunctions,
3-
EventsPipelineMapFunction,
1+
from sentry_streams.examples.word_counter_helpers import (
42
GroupByWord,
53
WordCounter,
4+
simple_filter,
5+
simple_map,
66
)
7-
from sentry_streams.pipeline.pipeline import (
8-
Aggregate,
9-
Filter,
10-
Map,
11-
Pipeline,
12-
StreamSink,
13-
StreamSource,
14-
)
7+
from sentry_streams.pipeline import Filter, Map, Reducer, streaming_source
8+
from sentry_streams.pipeline.chain import StreamSink
159
from sentry_streams.pipeline.window import TumblingWindow
1610

17-
# pipeline: special name
18-
pipeline = Pipeline()
19-
20-
source = StreamSource(
21-
name="myinput",
22-
ctx=pipeline,
23-
stream_name="events",
24-
)
25-
26-
filter = Filter(
27-
name="myfilter",
28-
ctx=pipeline,
29-
inputs=[source],
30-
function=EventsPipelineFilterFunctions.simple_filter,
31-
)
32-
33-
map = Map(
34-
name="mymap",
35-
ctx=pipeline,
36-
inputs=[filter],
37-
function=EventsPipelineMapFunction.simple_map,
38-
)
39-
4011
# A sample window.
4112
# Windows are assigned 3 elements.
4213
# TODO: Get the parameters for window in pipeline configuration.
4314
reduce_window = TumblingWindow(window_size=3)
4415

45-
reduce: Aggregate[int, tuple[str, int], str] = Aggregate(
46-
name="myreduce",
47-
ctx=pipeline,
48-
inputs=[map],
49-
window=reduce_window,
50-
aggregate_func=WordCounter,
51-
group_by_key=GroupByWord(),
52-
)
53-
54-
sink = StreamSink(
55-
name="kafkasink",
56-
ctx=pipeline,
57-
inputs=[reduce],
58-
stream_name="transformed-events",
16+
# pipeline: special name
17+
pipeline = (
18+
streaming_source(
19+
name="myinput",
20+
stream_name="events",
21+
)
22+
.apply(
23+
"myfilter",
24+
Filter(
25+
function=simple_filter,
26+
),
27+
)
28+
.apply(
29+
"mymap",
30+
Map(
31+
function=simple_map,
32+
),
33+
)
34+
.apply(
35+
"myreduce",
36+
Reducer(
37+
window=reduce_window,
38+
aggregate_func=WordCounter,
39+
group_by_key=GroupByWord(),
40+
),
41+
)
42+
.sink(
43+
"kafkasink",
44+
StreamSink(
45+
stream_name="transformed-events",
46+
),
47+
)
5948
)

sentry_streams/sentry_streams/examples/word_counter_fn.py renamed to sentry_streams/sentry_streams/examples/word_counter_helpers.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from typing import Self
33

44
from sentry_streams.pipeline.function_template import Accumulator, GroupBy
5+
from sentry_streams.pipeline.message import Message
56

67

78
class EventsPipelineMapFunction:
@@ -55,13 +56,25 @@ def get_group_by_key(self, payload: tuple[str, int]) -> str:
5556
return payload[0]
5657

5758

58-
class WordCounter(Accumulator[tuple[str, int], str]):
59+
def simple_filter(value: Message[bytes]) -> bool:
60+
d = json.loads(value.payload)
61+
return True if "name" in d else False
62+
63+
64+
def simple_map(value: Message[bytes]) -> tuple[str, int]:
65+
d = json.loads(value.payload)
66+
word: str = d.get("word", "null_word")
67+
68+
return (word, 1)
69+
70+
71+
class WordCounter(Accumulator[Message[tuple[str, int]], str]):
5972

6073
def __init__(self) -> None:
6174
self.tup = ("", 0)
6275

63-
def add(self, value: tuple[str, int]) -> Self:
64-
self.tup = (value[0], self.tup[1] + value[1])
76+
def add(self, value: Message[tuple[str, int]]) -> Self:
77+
self.tup = (value.payload[0], self.tup[1] + value.payload[1])
6578

6679
return self
6780

0 commit comments

Comments
 (0)