Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
37ec282
Added driver connection params
nikhilsuri-db Oct 24, 2025
2504053
Added model fields for chunk/result latency
nikhilsuri-db Oct 24, 2025
ef41f4c
fixed linting issues
nikhilsuri-db Oct 24, 2025
2f54be8
lint issue fixing
nikhilsuri-db Oct 27, 2025
db93974
circuit breaker changes using pybreaker
nikhilsuri-db Sep 26, 2025
1f9c4d3
Added interface layer top of http client to use circuit rbeaker
nikhilsuri-db Sep 30, 2025
939b548
Added test cases to validate ciruit breaker
nikhilsuri-db Sep 30, 2025
6c72f86
fixing broken tests
nikhilsuri-db Sep 30, 2025
ac845a5
fixed linting issues
nikhilsuri-db Sep 30, 2025
a602c39
fixed failing test cases
nikhilsuri-db Sep 30, 2025
c1b6e25
fixed urllib3 issue
nikhilsuri-db Sep 30, 2025
e3d85f4
added more test cases for telemetry
nikhilsuri-db Sep 30, 2025
9dfb623
simplified CB config
nikhilsuri-db Oct 6, 2025
e7e8b4b
poetry lock
nikhilsuri-db Nov 4, 2025
dab4b38
fix minor issues & improvement
nikhilsuri-db Nov 5, 2025
e1e08b0
improved circuit breaker for handling only 429/503
nikhilsuri-db Nov 7, 2025
b527e7c
linting issue fixed
nikhilsuri-db Nov 7, 2025
2b45814
raise CB only for 429/503
nikhilsuri-db Nov 11, 2025
1193af7
fix broken test cases
nikhilsuri-db Nov 11, 2025
aa459e9
fixed untyped references
nikhilsuri-db Nov 12, 2025
4cb87b1
Merge remote-tracking branch 'origin/main' into PECOBLR-993
nikhilsuri-db Nov 12, 2025
7cbc4c8
added more test to verify the changes
nikhilsuri-db Nov 13, 2025
c646335
description changed
nikhilsuri-db Nov 13, 2025
bcd6760
remove cb congig class to constants
nikhilsuri-db Nov 17, 2025
4376b6d
removed mocked reponse and use a new exlucded exception in CB
nikhilsuri-db Nov 17, 2025
d9e7c89
fixed broken test
nikhilsuri-db Nov 17, 2025
1b8e47c
added e2e test to verify circuit breaker
nikhilsuri-db Nov 19, 2025
4c75963
Merge remote-tracking branch 'origin/main' into PECOBLR-993
nikhilsuri-db Nov 19, 2025
172e03f
lower log level for telemetry
nikhilsuri-db Nov 19, 2025
dbd915f
Merge remote-tracking branch 'origin/main' into PECOBLR-993
nikhilsuri-db Nov 21, 2025
35b7459
fixed broken test, removed tests on log assertions
nikhilsuri-db Nov 21, 2025
5cfde8c
modified unit to reduce the noise and follow dry principle
nikhilsuri-db Nov 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pyarrow = [
{ version = ">=18.0.0", python = ">=3.13", optional=true }
]
pyjwt = "^2.0.0"
pybreaker = "^1.0.0"
requests-kerberos = {version = "^0.15.0", optional = true}


Expand Down
2 changes: 2 additions & 0 deletions src/databricks/sql/auth/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(
pool_connections: Optional[int] = None,
pool_maxsize: Optional[int] = None,
user_agent: Optional[str] = None,
telemetry_circuit_breaker_enabled: Optional[bool] = None,
):
self.hostname = hostname
self.access_token = access_token
Expand Down Expand Up @@ -83,6 +84,7 @@ def __init__(
self.pool_connections = pool_connections or 10
self.pool_maxsize = pool_maxsize or 20
self.user_agent = user_agent
self.telemetry_circuit_breaker_enabled = bool(telemetry_circuit_breaker_enabled)


def get_effective_azure_login_app_id(hostname) -> str:
Expand Down
47 changes: 46 additions & 1 deletion src/databricks/sql/common/unified_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,42 @@
logger = logging.getLogger(__name__)


def _extract_http_status_from_max_retry_error(e: MaxRetryError) -> Optional[int]:
"""
Extract HTTP status code from MaxRetryError if available.

urllib3 structures MaxRetryError in different ways depending on the failure scenario:
- e.reason.response.status: Most common case when retries are exhausted
- e.response.status: Alternate structure in some scenarios

Args:
e: MaxRetryError exception from urllib3

Returns:
HTTP status code as int if found, None otherwise
"""
# Try primary structure: e.reason.response.status
if (
hasattr(e, "reason")
and e.reason is not None
and hasattr(e.reason, "response")
and e.reason.response is not None
):
http_code = getattr(e.reason.response, "status", None)
if http_code is not None:
return http_code

# Try alternate structure: e.response.status
if (
hasattr(e, "response")
and e.response is not None
and hasattr(e.response, "status")
):
return e.response.status

return None


class UnifiedHttpClient:
"""
Unified HTTP client for all Databricks SQL connector HTTP operations.
Expand Down Expand Up @@ -264,7 +300,16 @@ def request_context(
yield response
except MaxRetryError as e:
logger.error("HTTP request failed after retries: %s", e)
raise RequestError(f"HTTP request failed: {e}")

# Extract HTTP status code from MaxRetryError if available
http_code = _extract_http_status_from_max_retry_error(e)

context = {}
if http_code is not None:
context["http-code"] = http_code
logger.error("HTTP request failed with status code: %d", http_code)

raise RequestError(f"HTTP request failed: {e}", context=context)
except Exception as e:
logger.error("HTTP request error: %s", e)
raise RequestError(f"HTTP request error: {e}")
Expand Down
21 changes: 21 additions & 0 deletions src/databricks/sql/exc.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,24 @@ class SessionAlreadyClosedError(RequestError):

class CursorAlreadyClosedError(RequestError):
"""Thrown if CancelOperation receives a code 404. ThriftBackend should gracefully proceed as this is expected."""


class TelemetryRateLimitError(Exception):
"""Raised when telemetry endpoint returns 429 or 503, indicating rate limiting or service unavailable.
This exception is used exclusively by the circuit breaker to track telemetry rate limiting events."""


class TelemetryNonRateLimitError(Exception):
"""Wrapper for telemetry errors that should NOT trigger circuit breaker.

This exception wraps non-rate-limiting errors (network errors, timeouts, server errors, etc.)
and is excluded from circuit breaker failure counting. Only TelemetryRateLimitError should
open the circuit breaker.

Attributes:
original_exception: The actual exception that occurred
"""

def __init__(self, original_exception: Exception):
self.original_exception = original_exception
super().__init__(f"Non-rate-limit telemetry error: {original_exception}")
112 changes: 112 additions & 0 deletions src/databricks/sql/telemetry/circuit_breaker_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
Circuit breaker implementation for telemetry requests.
This module provides circuit breaker functionality to prevent telemetry failures
from impacting the main SQL operations. It uses pybreaker library to implement
the circuit breaker pattern.
"""

import logging
import threading
from typing import Dict

import pybreaker
from pybreaker import CircuitBreaker, CircuitBreakerError, CircuitBreakerListener

from databricks.sql.exc import TelemetryNonRateLimitError

logger = logging.getLogger(__name__)

# Circuit Breaker Constants
MINIMUM_CALLS = 20 # Number of failures before circuit opens
RESET_TIMEOUT = 30 # Seconds to wait before trying to close circuit
NAME_PREFIX = "telemetry-circuit-breaker"

# Circuit Breaker State Constants (used in logging)
CIRCUIT_BREAKER_STATE_OPEN = "open"
CIRCUIT_BREAKER_STATE_CLOSED = "closed"
CIRCUIT_BREAKER_STATE_HALF_OPEN = "half-open"

# Logging Message Constants
LOG_CIRCUIT_BREAKER_STATE_CHANGED = "Circuit breaker state changed from %s to %s for %s"
LOG_CIRCUIT_BREAKER_OPENED = (
"Circuit breaker opened for %s - telemetry requests will be blocked"
)
LOG_CIRCUIT_BREAKER_CLOSED = (
"Circuit breaker closed for %s - telemetry requests will be allowed"
)
LOG_CIRCUIT_BREAKER_HALF_OPEN = (
"Circuit breaker half-open for %s - testing telemetry requests"
)


class CircuitBreakerStateListener(CircuitBreakerListener):
"""Listener for circuit breaker state changes."""

def before_call(self, cb: CircuitBreaker, func, *args, **kwargs) -> None:
"""Called before the circuit breaker calls a function."""
pass

def failure(self, cb: CircuitBreaker, exc: BaseException) -> None:
"""Called when a function called by the circuit breaker fails."""
pass

def success(self, cb: CircuitBreaker) -> None:
"""Called when a function called by the circuit breaker succeeds."""
pass

def state_change(self, cb: CircuitBreaker, old_state, new_state) -> None:
"""Called when the circuit breaker state changes."""
old_state_name = old_state.name if old_state else "None"
new_state_name = new_state.name if new_state else "None"

logger.info(
LOG_CIRCUIT_BREAKER_STATE_CHANGED, old_state_name, new_state_name, cb.name
)

if new_state_name == CIRCUIT_BREAKER_STATE_OPEN:
logger.warning(LOG_CIRCUIT_BREAKER_OPENED, cb.name)
elif new_state_name == CIRCUIT_BREAKER_STATE_CLOSED:
logger.info(LOG_CIRCUIT_BREAKER_CLOSED, cb.name)
elif new_state_name == CIRCUIT_BREAKER_STATE_HALF_OPEN:
logger.info(LOG_CIRCUIT_BREAKER_HALF_OPEN, cb.name)


class CircuitBreakerManager:
"""
Manages circuit breaker instances for telemetry requests.
Creates and caches circuit breaker instances per host to ensure telemetry
failures don't impact main SQL operations.
"""

_instances: Dict[str, CircuitBreaker] = {}
_lock = threading.RLock()

@classmethod
def get_circuit_breaker(cls, host: str) -> CircuitBreaker:
"""
Get or create a circuit breaker instance for the specified host.
Args:
host: The hostname for which to get the circuit breaker
Returns:
CircuitBreaker instance for the host
"""
with cls._lock:
if host not in cls._instances:
breaker = CircuitBreaker(
fail_max=MINIMUM_CALLS,
reset_timeout=RESET_TIMEOUT,
name=f"{NAME_PREFIX}-{host}",
exclude=[
TelemetryNonRateLimitError
], # Don't count these as failures
)
# Add state change listener for logging
breaker.add_listener(CircuitBreakerStateListener())
cls._instances[host] = breaker
logger.debug("Created circuit breaker for host: %s", host)

return cls._instances[host]
32 changes: 25 additions & 7 deletions src/databricks/sql/telemetry/telemetry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
from databricks.sql.common.feature_flag import FeatureFlagsContextFactory
from databricks.sql.common.unified_http_client import UnifiedHttpClient
from databricks.sql.common.http import HttpMethod
from databricks.sql.telemetry.telemetry_push_client import (
ITelemetryPushClient,
TelemetryPushClient,
CircuitBreakerTelemetryPushClient,
)

if TYPE_CHECKING:
from databricks.sql.client import Connection
Expand Down Expand Up @@ -166,21 +171,21 @@ class TelemetryClient(BaseTelemetryClient):

def __init__(
self,
telemetry_enabled,
session_id_hex,
telemetry_enabled: bool,
session_id_hex: str,
auth_provider,
host_url,
host_url: str,
executor,
batch_size,
batch_size: int,
client_context,
):
) -> None:
logger.debug("Initializing TelemetryClient for connection: %s", session_id_hex)
self._telemetry_enabled = telemetry_enabled
self._batch_size = batch_size
self._session_id_hex = session_id_hex
self._auth_provider = auth_provider
self._user_agent = None
self._events_batch = []
self._events_batch: list = []
self._lock = threading.RLock()
self._driver_connection_params = None
self._host_url = host_url
Expand All @@ -189,6 +194,19 @@ def __init__(
# Create own HTTP client from client context
self._http_client = UnifiedHttpClient(client_context)

# Create telemetry push client based on circuit breaker enabled flag
if client_context.telemetry_circuit_breaker_enabled:
# Create circuit breaker telemetry push client (circuit breakers created on-demand)
self._telemetry_push_client: ITelemetryPushClient = (
CircuitBreakerTelemetryPushClient(
TelemetryPushClient(self._http_client),
host_url,
)
)
else:
# Circuit breaker disabled - use direct telemetry push client
self._telemetry_push_client = TelemetryPushClient(self._http_client)

def _export_event(self, event):
"""Add an event to the batch queue and flush if batch is full"""
logger.debug("Exporting event for connection %s", self._session_id_hex)
Expand Down Expand Up @@ -254,7 +272,7 @@ def _send_telemetry(self, events):
def _send_with_unified_client(self, url, data, headers, timeout=900):
"""Helper method to send telemetry using the unified HTTP client."""
try:
response = self._http_client.request(
response = self._telemetry_push_client.request(
HttpMethod.POST, url, body=data, headers=headers, timeout=timeout
)
return response
Expand Down
Loading
Loading