Skip to content

Commit 212acf9

Browse files
committed
Preserve DLQ producer override_params from config
1 parent 06820c9 commit 212acf9

3 files changed

Lines changed: 56 additions & 14 deletions

File tree

sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ def build_dlq_config(
186186
topic=step.dlq_config.topic,
187187
producer_config=PyKafkaProducerConfig(
188188
bootstrap_servers=step.dlq_config.bootstrap_servers,
189-
override_params=None,
189+
override_params=step.dlq_config.override_params,
190190
),
191191
)
192192

sentry_streams/sentry_streams/pipeline/pipeline.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from abc import ABC, abstractmethod
44
from collections import defaultdict
5-
from dataclasses import dataclass
5+
from dataclasses import dataclass, field
66
from datetime import timedelta
77
from enum import Enum
88
from functools import partial
@@ -235,6 +235,7 @@ class DlqConfig:
235235

236236
topic: str
237237
bootstrap_servers: Sequence[str]
238+
override_params: Mapping[str, str] = field(default_factory=dict)
238239

239240

240241
@dataclass
@@ -267,13 +268,17 @@ def override_config(self, loaded_config: Mapping[str, Any]) -> None:
267268
servers = producer_config.get("bootstrap_servers") or (
268269
self.dlq_config.bootstrap_servers if self.dlq_config else None
269270
)
271+
override_params = producer_config.get("override_params")
272+
if override_params is None:
273+
override_params = self.dlq_config.override_params if self.dlq_config else {}
270274

271275
if topic is None or servers is None:
272276
raise ValueError("DLQ config requires both 'topic' and 'bootstrap_servers'")
273277

274278
self.dlq_config = DlqConfig(
275279
topic=topic,
276280
bootstrap_servers=servers,
281+
override_params=cast(Mapping[str, str], override_params),
277282
)
278283

279284

sentry_streams/tests/test_dlq.py

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,21 +68,28 @@ def test_consumer_creation(
6868

6969

7070
@pytest.mark.parametrize(
71-
"dlq_config, expected_topic, expected_bootstrap_servers",
71+
"dlq_config, expected_topic, expected_bootstrap_servers, expected_override_params",
7272
[
73-
pytest.param(None, None, None, id="no_dlq_config"),
73+
pytest.param(None, None, None, None, id="no_dlq_config"),
7474
pytest.param(
75-
DlqConfig(topic="test-dlq", bootstrap_servers=["localhost:9092"]),
75+
DlqConfig(
76+
topic="test-dlq",
77+
bootstrap_servers=["localhost:9092"],
78+
override_params={"sasl.username": "test"},
79+
),
7680
"test-dlq",
7781
["localhost:9092"],
82+
{"sasl.username": "test"},
7883
id="single_bootstrap_server",
7984
),
8085
pytest.param(
8186
DlqConfig(
82-
topic="my-dlq", bootstrap_servers=["broker1:9092", "broker2:9092", "broker3:9092"]
87+
topic="my-dlq",
88+
bootstrap_servers=["broker1:9092", "broker2:9092", "broker3:9092"],
8389
),
8490
"my-dlq",
8591
["broker1:9092", "broker2:9092", "broker3:9092"],
92+
{},
8693
id="multiple_bootstrap_servers",
8794
),
8895
],
@@ -91,6 +98,7 @@ def test_build_dlq_config(
9198
dlq_config: DlqConfig | None,
9299
expected_topic: str | None,
93100
expected_bootstrap_servers: list[str] | None,
101+
expected_override_params: dict[str, str] | None,
94102
) -> None:
95103
"""Test build_dlq_config returns correct PyDlqConfig for various inputs."""
96104
source = StreamSource(
@@ -109,47 +117,75 @@ def test_build_dlq_config(
109117
assert result.topic == expected_topic
110118
assert result.producer_config is not None
111119
assert result.producer_config.bootstrap_servers == expected_bootstrap_servers
112-
assert result.producer_config.override_params is None
120+
assert result.producer_config.override_params == expected_override_params
113121

114122

115123
@pytest.mark.parametrize(
116-
"initial_dlq_config, override_dlq, expected_topic, expected_bootstrap_servers",
124+
"initial_dlq_config, override_dlq, expected_topic, expected_bootstrap_servers, expected_override_params",
117125
[
118126
pytest.param(
119127
None,
120-
{"topic": "new-dlq", "producer_config": {"bootstrap_servers": ["broker1:9092"]}},
128+
{
129+
"topic": "new-dlq",
130+
"producer_config": {
131+
"bootstrap_servers": ["broker1:9092"],
132+
"override_params": {"security.protocol": "SASL_SSL"},
133+
},
134+
},
121135
"new-dlq",
122136
["broker1:9092"],
137+
{"security.protocol": "SASL_SSL"},
123138
id="create_new_config",
124139
),
125140
pytest.param(
126-
DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]),
141+
DlqConfig(
142+
topic="old-dlq",
143+
bootstrap_servers=["old-broker:9092"],
144+
override_params={"old.param": "old-value"},
145+
),
127146
{"topic": "new-dlq"},
128147
"new-dlq",
129148
["old-broker:9092"],
149+
{"old.param": "old-value"},
130150
id="override_topic_only",
131151
),
132152
pytest.param(
133-
DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]),
134-
{"producer_config": {"bootstrap_servers": ["new-broker:9092", "new-broker2:9092"]}},
153+
DlqConfig(
154+
topic="old-dlq",
155+
bootstrap_servers=["old-broker:9092"],
156+
override_params={"old.param": "old-value"},
157+
),
158+
{
159+
"producer_config": {
160+
"bootstrap_servers": ["new-broker:9092", "new-broker2:9092"],
161+
"override_params": {"new.param": "new-value"},
162+
}
163+
},
135164
"old-dlq",
136165
["new-broker:9092", "new-broker2:9092"],
166+
{"new.param": "new-value"},
137167
id="override_bootstrap_servers_only",
138168
),
139169
pytest.param(
140-
DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]),
170+
DlqConfig(
171+
topic="old-dlq",
172+
bootstrap_servers=["old-broker:9092"],
173+
override_params={"old.param": "old-value"},
174+
),
141175
{"topic": "new-dlq", "producer_config": {"bootstrap_servers": ["new-broker:9092"]}},
142176
"new-dlq",
143177
["new-broker:9092"],
178+
{"old.param": "old-value"},
144179
id="override_both_fields",
145180
),
146181
],
147182
)
148183
def test_stream_source_override_config_dlq(
149184
initial_dlq_config: DlqConfig | None,
150-
override_dlq: dict[str, str | list[str]],
185+
override_dlq: dict[str, object],
151186
expected_topic: str,
152187
expected_bootstrap_servers: list[str],
188+
expected_override_params: dict[str, str],
153189
) -> None:
154190
"""Test that StreamSource.override_config correctly overrides DLQ settings."""
155191
source = StreamSource(
@@ -163,3 +199,4 @@ def test_stream_source_override_config_dlq(
163199
assert source.dlq_config is not None
164200
assert source.dlq_config.topic == expected_topic
165201
assert source.dlq_config.bootstrap_servers == expected_bootstrap_servers
202+
assert source.dlq_config.override_params == expected_override_params

0 commit comments

Comments
 (0)