Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 98 additions & 8 deletions devcycle_python_sdk/managers/config_manager.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ def __init__(
self._config_etag: Optional[str] = None
self._config_lastmodified: Optional[str] = None

# Exponential backoff configuration
self._sse_reconnect_attempts = 0
self._min_reconnect_interval = 5.0 # Start at 5 seconds
self._max_reconnect_interval = 300.0 # Cap at 5 minutes
self._last_reconnect_attempt_time: Optional[float] = None
self._sse_reconnecting = False
self._config_api_client = ConfigAPIClient(self._sdk_key, self._options)

self._polling_enabled = True
Expand All @@ -49,6 +55,48 @@ def __init__(
def is_initialized(self) -> bool:
return self._config is not None

def _recreate_sse_connection(self):
"""Recreate the SSE connection with the current config."""
if self._config is None or self._options.disable_realtime_updates:
logger.debug(
"DevCycle: Skipping SSE recreation - no config or updates disabled"
)
return

# Update timestamp right before attempting connection
self._last_reconnect_attempt_time = time.time()

try:
# Close existing connection if present
if self._sse_manager is not None and self._sse_manager.client is not None:
self._sse_manager.client.close()
if self._sse_manager.read_thread.is_alive():
self._sse_manager.read_thread.join(timeout=1.0)

# Create new SSE manager
self._sse_manager = SSEManager(
self.sse_state,
self.sse_error,
self.sse_message,
)
self._sse_manager.update(self._config)

except Exception as e:
logger.debug(f"DevCycle: Failed to recreate SSE connection: {e}")

def _delayed_sse_reconnect(self, delay_seconds: float):
"""Delayed SSE reconnection with configurable backoff."""
try:
logger.debug(
f"DevCycle: Waiting {delay_seconds}s before reconnecting SSE..."
)
time.sleep(delay_seconds)
self._recreate_sse_connection()
except Exception as e:
logger.error(f"DevCycle: Error during delayed SSE reconnection: {e}")
finally:
self._sse_reconnecting = False

def _get_config(self, last_modified: Optional[float] = None):
try:
lm_header = self._config_lastmodified
Expand Down Expand Up @@ -87,12 +135,10 @@ def _get_config(self, last_modified: Optional[float] = None):
or self._sse_manager.client is None
or not self._sse_manager.read_thread.is_alive()
):
self._sse_manager = SSEManager(
self.sse_state,
self.sse_error,
self.sse_message,
logger.info(
"DevCycle: SSE connection not active, creating new connection"
)
self._sse_manager.update(self._config)
self._recreate_sse_connection()

if (
trigger_on_client_initialized
Expand All @@ -101,7 +147,6 @@ def _get_config(self, last_modified: Optional[float] = None):
try:
self._options.on_client_initialized()
except Exception as e:
# consume any error
logger.warning(
f"DevCycle: Error received from on_client_initialized callback: {str(e)}"
)
Expand All @@ -122,7 +167,6 @@ def run(self):
self._get_config()
except Exception as e:
if self._polling_enabled:
# Only log a warning if we're still polling
logger.warning(
f"DevCycle: Error polling for config changes: {str(e)}"
)
Expand All @@ -137,6 +181,7 @@ def sse_message(self, message: ld_eventsource.actions.Event):
self.sse_state(None)
logger.info(f"DevCycle: Received message: {message.data}")
sse_message = json.loads(message.data)

dvc_data = json.loads(sse_message.get("data"))
if (
dvc_data.get("type") == "refetchConfig"
Expand All @@ -145,15 +190,60 @@ def sse_message(self, message: ld_eventsource.actions.Event):
):
logger.info("DevCycle: Received refetchConfig message - updating config")
self._get_config(dvc_data["lastModified"] / 1000.0)
# SSE connection healthy, reconnect attempts reset.
if dvc_data.get("type") == "ping" or dvc_data.get("type") == "refetchConfig":
self._sse_reconnect_attempts = 0

def sse_error(self, error: ld_eventsource.actions.Fault):
self._sse_connected = False
logger.debug(f"DevCycle: Received SSE error: {error}")
logger.debug(f"DevCycle: SSE connection error: {error.error}")
current_time = time.time()

if self._sse_reconnecting:
logger.debug("DevCycle: Reconnection already in progress, skipping")
return

# Calculate exponential backoff interval (capped at max)
backoff_interval = min(
self._min_reconnect_interval * (2**self._sse_reconnect_attempts),
self._max_reconnect_interval,
)

# Check if we need to wait for remaining backoff time
delay_seconds = backoff_interval
if self._last_reconnect_attempt_time is not None:
time_since_last_attempt = current_time - self._last_reconnect_attempt_time
if time_since_last_attempt < backoff_interval:
delay_seconds = backoff_interval - time_since_last_attempt
logger.debug(
f"DevCycle: Within backoff period, scheduling reconnection in {delay_seconds:.1f}s"
)

self._sse_reconnecting = True
self._sse_reconnect_attempts += 1

logger.debug(
f"DevCycle: Attempting SSE reconnection (attempt #{self._sse_reconnect_attempts}, "
f"backoff: {delay_seconds:.1f}s)"
)

reconnect_thread = threading.Thread(
target=self._delayed_sse_reconnect, args=(delay_seconds,), daemon=True
)
reconnect_thread.start()

def sse_state(self, state: Optional[ld_eventsource.actions.Start]):
if not self._sse_connected:
self._sse_connected = True
logger.info("DevCycle: Connected to SSE stream")

# Clear reconnection state
self._sse_reconnecting = False
self._last_reconnect_attempt_time = None
else:
logger.debug("DevCycle: SSE keepalive received")

def close(self):
self._polling_enabled = False
if self._sse_manager is not None and self._sse_manager.client is not None:
self._sse_manager.client.close()
Comment on lines 158 to +249
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close() currently only closes the SSE client, but it does not stop/join the SSE read thread (SSEManager.read_thread) and does not clear reconnection state. This can leave background threads running after close(). Consider joining the read thread with a timeout and preventing any further reconnect attempts as part of shutdown.

Copilot uses AI. Check for mistakes.
11 changes: 8 additions & 3 deletions devcycle_python_sdk/managers/sse_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ def read_events(
handle_error: Callable[[ld_eventsource.actions.Fault], None],
handle_message: Callable[[ld_eventsource.actions.Event], None],
):
self.client.start()
try:
self.client.start()
logger.info("DevCycle: SSE connection created successfully")
for event in self.client.all:
if isinstance(event, ld_eventsource.actions.Start):
handle_state(event)
Expand All @@ -45,7 +46,11 @@ def read_events(
elif isinstance(event, ld_eventsource.actions.Comment):
handle_state(None)
except Exception as e:
logger.debug(f"DevCycle: failed to read SSE message: {e}")
logger.debug(f"DevCycle SSE: Error in read loop: {e}")
fault_event = ld_eventsource.actions.Fault(error=e)
handle_error(fault_event)
finally:
Comment on lines 48 to +52
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_events() now converts any exception into a Fault and calls handle_error(). If the exception is caused by an intentional shutdown (client.close()), this will trigger the reconnection logic and can fight against EnvironmentConfigManager.close(). Consider detecting/ignoring expected shutdown exceptions (or relying on a shutdown flag in the error handler to suppress reconnects during close).

Copilot uses AI. Check for mistakes.
logger.debug("DevCycle SSE: Connection closed")

def update(self, config: dict):
if self.use_new_config(config["sse"]):
Expand All @@ -66,6 +71,6 @@ def update(self, config: dict):

def use_new_config(self, config: dict) -> bool:
new_url = config["hostname"] + config["path"]
if self.url == "" or self.url is None and new_url != "":
if (self.url == "" or self.url is None) and new_url != "":
return True
return self.url != new_url
2 changes: 1 addition & 1 deletion example/cloud_client_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def main():
client.track(user, event)

except Exception as e:
logger.exception(f"Exception when calling Devcycle API: {e}\n")
logger.exception(f"Exception when calling DevCycle API: {e}\n")


if __name__ == "__main__":
Expand Down
Loading
Loading