Skip to content

Commit 73580fe

Browse files
Circuit breaker changes using pybreaker (#705)
* Added driver connection params Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * Added model fields for chunk/result latency Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * fixed linting issues Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * lint issue fixing Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * circuit breaker changes using pybreaker Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * Added interface layer top of http client to use circuit rbeaker Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * Added test cases to validate ciruit breaker Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * fixing broken tests Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * fixed linting issues Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * fixed failing test cases Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * fixed urllib3 issue Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * added more test cases for telemetry Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * simplified CB config Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * poetry lock Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * fix minor issues & improvement Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * improved circuit breaker for handling only 429/503 Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * linting issue fixed Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * raise CB only for 429/503 Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * fix broken test cases Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * fixed untyped references Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * added more test to verify the changes Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * description changed Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * remove cb congig class to constants Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * removed mocked reponse and use a new exlucded exception in CB Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * fixed broken test Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * added e2e test to verify circuit breaker Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * lower log level for telemetry Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * fixed broken test, removed tests on log assertions Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> * modified unit to reduce the noise and follow dry principle Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com> --------- Signed-off-by: Nikhil Suri <nikhil.suri@databricks.com>
1 parent b8494ff commit 73580fe

16 files changed

+1512
-20
lines changed

poetry.lock

Lines changed: 34 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pyarrow = [
2626
{ version = ">=18.0.0", python = ">=3.13", optional=true }
2727
]
2828
pyjwt = "^2.0.0"
29+
pybreaker = "^1.0.0"
2930
requests-kerberos = {version = "^0.15.0", optional = true}
3031

3132

src/databricks/sql/auth/common.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def __init__(
5151
pool_connections: Optional[int] = None,
5252
pool_maxsize: Optional[int] = None,
5353
user_agent: Optional[str] = None,
54+
telemetry_circuit_breaker_enabled: Optional[bool] = None,
5455
):
5556
self.hostname = hostname
5657
self.access_token = access_token
@@ -83,6 +84,7 @@ def __init__(
8384
self.pool_connections = pool_connections or 10
8485
self.pool_maxsize = pool_maxsize or 20
8586
self.user_agent = user_agent
87+
self.telemetry_circuit_breaker_enabled = bool(telemetry_circuit_breaker_enabled)
8688

8789

8890
def get_effective_azure_login_app_id(hostname) -> str:

src/databricks/sql/common/unified_http_client.py

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,42 @@
2828
logger = logging.getLogger(__name__)
2929

3030

31+
def _extract_http_status_from_max_retry_error(e: MaxRetryError) -> Optional[int]:
32+
"""
33+
Extract HTTP status code from MaxRetryError if available.
34+
35+
urllib3 structures MaxRetryError in different ways depending on the failure scenario:
36+
- e.reason.response.status: Most common case when retries are exhausted
37+
- e.response.status: Alternate structure in some scenarios
38+
39+
Args:
40+
e: MaxRetryError exception from urllib3
41+
42+
Returns:
43+
HTTP status code as int if found, None otherwise
44+
"""
45+
# Try primary structure: e.reason.response.status
46+
if (
47+
hasattr(e, "reason")
48+
and e.reason is not None
49+
and hasattr(e.reason, "response")
50+
and e.reason.response is not None
51+
):
52+
http_code = getattr(e.reason.response, "status", None)
53+
if http_code is not None:
54+
return http_code
55+
56+
# Try alternate structure: e.response.status
57+
if (
58+
hasattr(e, "response")
59+
and e.response is not None
60+
and hasattr(e.response, "status")
61+
):
62+
return e.response.status
63+
64+
return None
65+
66+
3167
class UnifiedHttpClient:
3268
"""
3369
Unified HTTP client for all Databricks SQL connector HTTP operations.
@@ -264,7 +300,16 @@ def request_context(
264300
yield response
265301
except MaxRetryError as e:
266302
logger.error("HTTP request failed after retries: %s", e)
267-
raise RequestError(f"HTTP request failed: {e}")
303+
304+
# Extract HTTP status code from MaxRetryError if available
305+
http_code = _extract_http_status_from_max_retry_error(e)
306+
307+
context = {}
308+
if http_code is not None:
309+
context["http-code"] = http_code
310+
logger.error("HTTP request failed with status code: %d", http_code)
311+
312+
raise RequestError(f"HTTP request failed: {e}", context=context)
268313
except Exception as e:
269314
logger.error("HTTP request error: %s", e)
270315
raise RequestError(f"HTTP request error: {e}")

src/databricks/sql/exc.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,24 @@ class SessionAlreadyClosedError(RequestError):
143143

144144
class CursorAlreadyClosedError(RequestError):
145145
"""Thrown if CancelOperation receives a code 404. ThriftBackend should gracefully proceed as this is expected."""
146+
147+
148+
class TelemetryRateLimitError(Exception):
149+
"""Raised when telemetry endpoint returns 429 or 503, indicating rate limiting or service unavailable.
150+
This exception is used exclusively by the circuit breaker to track telemetry rate limiting events."""
151+
152+
153+
class TelemetryNonRateLimitError(Exception):
154+
"""Wrapper for telemetry errors that should NOT trigger circuit breaker.
155+
156+
This exception wraps non-rate-limiting errors (network errors, timeouts, server errors, etc.)
157+
and is excluded from circuit breaker failure counting. Only TelemetryRateLimitError should
158+
open the circuit breaker.
159+
160+
Attributes:
161+
original_exception: The actual exception that occurred
162+
"""
163+
164+
def __init__(self, original_exception: Exception):
165+
self.original_exception = original_exception
166+
super().__init__(f"Non-rate-limit telemetry error: {original_exception}")
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
"""
2+
Circuit breaker implementation for telemetry requests.
3+
4+
This module provides circuit breaker functionality to prevent telemetry failures
5+
from impacting the main SQL operations. It uses pybreaker library to implement
6+
the circuit breaker pattern.
7+
"""
8+
9+
import logging
10+
import threading
11+
from typing import Dict
12+
13+
import pybreaker
14+
from pybreaker import CircuitBreaker, CircuitBreakerError, CircuitBreakerListener
15+
16+
from databricks.sql.exc import TelemetryNonRateLimitError
17+
18+
logger = logging.getLogger(__name__)
19+
20+
# Circuit Breaker Constants
21+
MINIMUM_CALLS = 20 # Number of failures before circuit opens
22+
RESET_TIMEOUT = 30 # Seconds to wait before trying to close circuit
23+
NAME_PREFIX = "telemetry-circuit-breaker"
24+
25+
# Circuit Breaker State Constants (used in logging)
26+
CIRCUIT_BREAKER_STATE_OPEN = "open"
27+
CIRCUIT_BREAKER_STATE_CLOSED = "closed"
28+
CIRCUIT_BREAKER_STATE_HALF_OPEN = "half-open"
29+
30+
# Logging Message Constants
31+
LOG_CIRCUIT_BREAKER_STATE_CHANGED = "Circuit breaker state changed from %s to %s for %s"
32+
LOG_CIRCUIT_BREAKER_OPENED = (
33+
"Circuit breaker opened for %s - telemetry requests will be blocked"
34+
)
35+
LOG_CIRCUIT_BREAKER_CLOSED = (
36+
"Circuit breaker closed for %s - telemetry requests will be allowed"
37+
)
38+
LOG_CIRCUIT_BREAKER_HALF_OPEN = (
39+
"Circuit breaker half-open for %s - testing telemetry requests"
40+
)
41+
42+
43+
class CircuitBreakerStateListener(CircuitBreakerListener):
44+
"""Listener for circuit breaker state changes."""
45+
46+
def before_call(self, cb: CircuitBreaker, func, *args, **kwargs) -> None:
47+
"""Called before the circuit breaker calls a function."""
48+
pass
49+
50+
def failure(self, cb: CircuitBreaker, exc: BaseException) -> None:
51+
"""Called when a function called by the circuit breaker fails."""
52+
pass
53+
54+
def success(self, cb: CircuitBreaker) -> None:
55+
"""Called when a function called by the circuit breaker succeeds."""
56+
pass
57+
58+
def state_change(self, cb: CircuitBreaker, old_state, new_state) -> None:
59+
"""Called when the circuit breaker state changes."""
60+
old_state_name = old_state.name if old_state else "None"
61+
new_state_name = new_state.name if new_state else "None"
62+
63+
logger.info(
64+
LOG_CIRCUIT_BREAKER_STATE_CHANGED, old_state_name, new_state_name, cb.name
65+
)
66+
67+
if new_state_name == CIRCUIT_BREAKER_STATE_OPEN:
68+
logger.warning(LOG_CIRCUIT_BREAKER_OPENED, cb.name)
69+
elif new_state_name == CIRCUIT_BREAKER_STATE_CLOSED:
70+
logger.info(LOG_CIRCUIT_BREAKER_CLOSED, cb.name)
71+
elif new_state_name == CIRCUIT_BREAKER_STATE_HALF_OPEN:
72+
logger.info(LOG_CIRCUIT_BREAKER_HALF_OPEN, cb.name)
73+
74+
75+
class CircuitBreakerManager:
76+
"""
77+
Manages circuit breaker instances for telemetry requests.
78+
79+
Creates and caches circuit breaker instances per host to ensure telemetry
80+
failures don't impact main SQL operations.
81+
"""
82+
83+
_instances: Dict[str, CircuitBreaker] = {}
84+
_lock = threading.RLock()
85+
86+
@classmethod
87+
def get_circuit_breaker(cls, host: str) -> CircuitBreaker:
88+
"""
89+
Get or create a circuit breaker instance for the specified host.
90+
91+
Args:
92+
host: The hostname for which to get the circuit breaker
93+
94+
Returns:
95+
CircuitBreaker instance for the host
96+
"""
97+
with cls._lock:
98+
if host not in cls._instances:
99+
breaker = CircuitBreaker(
100+
fail_max=MINIMUM_CALLS,
101+
reset_timeout=RESET_TIMEOUT,
102+
name=f"{NAME_PREFIX}-{host}",
103+
exclude=[
104+
TelemetryNonRateLimitError
105+
], # Don't count these as failures
106+
)
107+
# Add state change listener for logging
108+
breaker.add_listener(CircuitBreakerStateListener())
109+
cls._instances[host] = breaker
110+
logger.debug("Created circuit breaker for host: %s", host)
111+
112+
return cls._instances[host]

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@
4141
from databricks.sql.common.feature_flag import FeatureFlagsContextFactory
4242
from databricks.sql.common.unified_http_client import UnifiedHttpClient
4343
from databricks.sql.common.http import HttpMethod
44+
from databricks.sql.telemetry.telemetry_push_client import (
45+
ITelemetryPushClient,
46+
TelemetryPushClient,
47+
CircuitBreakerTelemetryPushClient,
48+
)
4449

4550
if TYPE_CHECKING:
4651
from databricks.sql.client import Connection
@@ -166,21 +171,21 @@ class TelemetryClient(BaseTelemetryClient):
166171

167172
def __init__(
168173
self,
169-
telemetry_enabled,
170-
session_id_hex,
174+
telemetry_enabled: bool,
175+
session_id_hex: str,
171176
auth_provider,
172-
host_url,
177+
host_url: str,
173178
executor,
174-
batch_size,
179+
batch_size: int,
175180
client_context,
176-
):
181+
) -> None:
177182
logger.debug("Initializing TelemetryClient for connection: %s", session_id_hex)
178183
self._telemetry_enabled = telemetry_enabled
179184
self._batch_size = batch_size
180185
self._session_id_hex = session_id_hex
181186
self._auth_provider = auth_provider
182187
self._user_agent = None
183-
self._events_batch = []
188+
self._events_batch: list = []
184189
self._lock = threading.RLock()
185190
self._driver_connection_params = None
186191
self._host_url = host_url
@@ -189,6 +194,19 @@ def __init__(
189194
# Create own HTTP client from client context
190195
self._http_client = UnifiedHttpClient(client_context)
191196

197+
# Create telemetry push client based on circuit breaker enabled flag
198+
if client_context.telemetry_circuit_breaker_enabled:
199+
# Create circuit breaker telemetry push client (circuit breakers created on-demand)
200+
self._telemetry_push_client: ITelemetryPushClient = (
201+
CircuitBreakerTelemetryPushClient(
202+
TelemetryPushClient(self._http_client),
203+
host_url,
204+
)
205+
)
206+
else:
207+
# Circuit breaker disabled - use direct telemetry push client
208+
self._telemetry_push_client = TelemetryPushClient(self._http_client)
209+
192210
def _export_event(self, event):
193211
"""Add an event to the batch queue and flush if batch is full"""
194212
logger.debug("Exporting event for connection %s", self._session_id_hex)
@@ -254,7 +272,7 @@ def _send_telemetry(self, events):
254272
def _send_with_unified_client(self, url, data, headers, timeout=900):
255273
"""Helper method to send telemetry using the unified HTTP client."""
256274
try:
257-
response = self._http_client.request(
275+
response = self._telemetry_push_client.request(
258276
HttpMethod.POST, url, body=data, headers=headers, timeout=timeout
259277
)
260278
return response

0 commit comments

Comments
 (0)