Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 46 additions & 20 deletions devcycle_python_sdk/managers/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,44 +58,59 @@ def is_initialized(self) -> bool:

def _recreate_sse_connection(self):
"""Recreate the SSE connection with the current config."""
# Acquire lock to check state and save references to old connection
with self._sse_reconnect_lock:
if self._config is None or self._options.disable_realtime_updates:
logger.debug(
"Devcycle: Skipping SSE recreation - no config or updates disabled"
"DevCycle: Skipping SSE recreation - no config or updates disabled"
)
return

try:
# Close existing connection if present
if (
self._sse_manager is not None
and self._sse_manager.client is not None
):
self._sse_manager.client.close()
if self._sse_manager.read_thread.is_alive():
self._sse_manager.read_thread.join(timeout=1.0)
# Save reference to old SSE manager and clear it to prevent concurrent access
old_sse_manager = self._sse_manager
self._sse_manager = None

# Create new SSE manager
# Perform potentially blocking operations outside the lock to avoid deadlock
# The SSE read thread may call sse_error/sse_state which need the lock
try:
if old_sse_manager is not None and old_sse_manager.client is not None:
old_sse_manager.client.close()
if old_sse_manager.read_thread.is_alive():
old_sse_manager.read_thread.join(timeout=1.0)
except Exception as e:
logger.debug(f"DevCycle: Error closing old SSE connection: {e}")

# Re-acquire lock to create new connection and update state
try:
with self._sse_reconnect_lock:
# Re-read config to ensure we use the latest version
if self._config is None:
logger.debug(
"DevCycle: Config was cleared during SSE reconnection, skipping"
)
return

# Create new SSE manager with the current config
self._sse_manager = SSEManager(
self.sse_state,
self.sse_error,
self.sse_message,
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in log message: "Devcyle" should be "DevCycle" to match the spelling used in most other log messages in this file (e.g., lines 131, 150, 165, 168, 193, 202, 255). While this appears to be a pre-existing issue, since this line was moved as part of the refactoring, it would be a good opportunity to fix the spelling for consistency.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in eb7844f (already corrected before these changes).

)
self._sse_manager.update(self._config)
logger.info("Devcyle: SSE connection created successfully")
except Exception as e:
logger.debug(f"Devcycle: Failed to recreate SSE connection: {e}")
logger.info("DevCycle: SSE connection created successfully")
except Exception as e:
logger.debug(f"DevCycle: Failed to recreate SSE connection: {e}")

def _delayed_sse_reconnect(self, delay_seconds: float):
"""Delayed SSE reconnection with configurable backoff."""
try:
logger.debug(
f"Devcycle: Waiting {delay_seconds}s before reconnecting SSE..."
f"DevCycle: Waiting {delay_seconds}s before reconnecting SSE..."
)
time.sleep(delay_seconds)
self._recreate_sse_connection()
except Exception as e:
logger.error(f"Devcycle: Error during delayed SSE reconnection: {e}")
logger.error(f"DevCycle: Error during delayed SSE reconnection: {e}")
finally:
with self._sse_reconnect_lock:
self._sse_reconnecting = False
Expand Down Expand Up @@ -138,10 +153,21 @@ def _get_config(self, last_modified: Optional[float] = None):
or self._sse_manager.client is None
or not self._sse_manager.read_thread.is_alive()
):
logger.info(
"DevCycle: SSE connection not active, creating new connection"
)
self._recreate_sse_connection()
# Only recreate if not already reconnecting from error handler
with self._sse_reconnect_lock:
if not self._sse_reconnecting:
logger.info(
"DevCycle: SSE connection not active, creating new connection"
)
should_recreate = True
else:
logger.debug(
"DevCycle: SSE reconnection already scheduled, skipping"
)
should_recreate = False

if should_recreate:
self._recreate_sse_connection()

if (
trigger_on_client_initialized
Expand Down
2 changes: 1 addition & 1 deletion example/cloud_client_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def main():
client.track(user, event)

except Exception as e:
logger.exception(f"Exception when calling Devcycle API: {e}\n")
logger.exception(f"Exception when calling DevCycle API: {e}\n")


if __name__ == "__main__":
Expand Down
233 changes: 232 additions & 1 deletion test/managers/test_config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import time
import unittest
import uuid
import threading
from datetime import datetime
from email.utils import formatdate
from time import mktime
from unittest.mock import patch, MagicMock
from unittest.mock import patch, MagicMock, Mock, PropertyMock

from devcycle_python_sdk import DevCycleLocalOptions
from devcycle_python_sdk.managers.config_manager import EnvironmentConfigManager
Expand Down Expand Up @@ -151,6 +152,236 @@ def test_get_config_unchanged(self, mock_get_config):
self.assertDictEqual(config_manager._config, self.test_config_json)
self.test_local_bucketing.store_config.assert_not_called()

@patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config")
@patch("devcycle_python_sdk.managers.config_manager.SSEManager")
def test_recreate_sse_connection_clears_old_manager(self, mock_sse_manager_class, mock_get_config):
"""Test that _recreate_sse_connection sets _sse_manager to None before blocking operations."""
mock_get_config.return_value = (
self.test_config_json,
self.test_etag,
self.test_lastmodified,
)

# Enable realtime updates for this test
self.test_options.disable_realtime_updates = False

config_manager = EnvironmentConfigManager(
self.sdk_key, self.test_options, self.test_local_bucketing
)
time.sleep(0.1)

# Create a mock SSE manager with a thread
mock_old_sse = MagicMock()
mock_old_sse.client = MagicMock()
mock_thread = MagicMock()
mock_thread.is_alive.return_value = True

# Track when join is called and what _sse_manager is at that point
manager_during_join = []

def track_join(timeout=None):
manager_during_join.append(config_manager._sse_manager)

mock_thread.join = track_join
mock_old_sse.read_thread = mock_thread

config_manager._sse_manager = mock_old_sse

# Call _recreate_sse_connection
config_manager._recreate_sse_connection()

# Verify that _sse_manager was None during the join (blocking operation)
self.assertEqual(len(manager_during_join), 1)
self.assertIsNone(manager_during_join[0])

# Verify old manager was closed
mock_old_sse.client.close.assert_called_once()

config_manager.close()

@patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config")
@patch("devcycle_python_sdk.managers.config_manager.SSEManager")
def test_recreate_sse_connection_uses_latest_config(self, mock_sse_manager_class, mock_get_config):
"""Test that _recreate_sse_connection uses the latest config when re-acquiring lock."""
initial_config = self.test_config_json.copy()
mock_get_config.return_value = (
initial_config,
self.test_etag,
self.test_lastmodified,
)

# Enable realtime updates
self.test_options.disable_realtime_updates = False

config_manager = EnvironmentConfigManager(
self.sdk_key, self.test_options, self.test_local_bucketing
)
time.sleep(0.1)

# Create a mock SSE manager
mock_old_sse = MagicMock()
mock_old_sse.client = MagicMock()
mock_thread = MagicMock()
mock_thread.is_alive.return_value = True

# Update config during the blocking operation
updated_config = initial_config.copy()
updated_config["updated"] = True

def delayed_config_update(timeout=None):
# Simulate config update happening during join
config_manager._config = updated_config
time.sleep(0.05)

mock_thread.join = delayed_config_update
mock_old_sse.read_thread = mock_thread

config_manager._sse_manager = mock_old_sse

# Reset the mock after initialization
mock_sse_manager_class.reset_mock()

# Create a new mock SSE manager for the recreation
mock_new_sse = MagicMock()
mock_sse_manager_class.return_value = mock_new_sse

# Call _recreate_sse_connection
config_manager._recreate_sse_connection()

# Verify the new SSE manager was created and updated with the latest config
mock_sse_manager_class.assert_called_once()
mock_new_sse.update.assert_called_once()

# The config passed to update should be the updated one
call_args = mock_new_sse.update.call_args[0][0]
self.assertIn("updated", call_args)
self.assertTrue(call_args["updated"])

config_manager.close()

@patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config")
@patch("devcycle_python_sdk.managers.config_manager.SSEManager")
def test_recreate_sse_connection_concurrent_calls(self, mock_sse_manager_class, mock_get_config):
"""Test that concurrent calls to _recreate_sse_connection are handled safely."""
mock_get_config.return_value = (
self.test_config_json,
self.test_etag,
self.test_lastmodified,
)

# Enable realtime updates
self.test_options.disable_realtime_updates = False

config_manager = EnvironmentConfigManager(
self.sdk_key, self.test_options, self.test_local_bucketing
)
time.sleep(0.1)

# Create a mock SSE manager with slow close/join
mock_old_sse = MagicMock()
mock_old_sse.client = MagicMock()
mock_thread = MagicMock()
mock_thread.is_alive.return_value = True

# Make join slow to allow concurrent calls
def slow_join(timeout=None):
time.sleep(0.2)

mock_thread.join = slow_join
mock_old_sse.read_thread = mock_thread

config_manager._sse_manager = mock_old_sse

# Reset the mock after initialization
mock_sse_manager_class.reset_mock()

# Mock new SSE managers
mock_new_sse_1 = MagicMock()
mock_new_sse_2 = MagicMock()
mock_sse_manager_class.side_effect = [mock_new_sse_1, mock_new_sse_2]

# Track completion
results = []

def call_recreate(index):
try:
config_manager._recreate_sse_connection()
results.append(f"completed_{index}")
except Exception as e:
results.append(f"error_{index}: {e}")

# Start two concurrent recreate calls
thread1 = threading.Thread(target=call_recreate, args=(1,))
thread2 = threading.Thread(target=call_recreate, args=(2,))

thread1.start()
time.sleep(0.05) # Small delay to ensure thread1 starts first
thread2.start()

thread1.join(timeout=2.0)
thread2.join(timeout=2.0)

# Both should complete without errors
self.assertEqual(len(results), 2)
self.assertIn("completed_1", results)
self.assertIn("completed_2", results)

# At least one SSE manager should be created
self.assertGreaterEqual(mock_sse_manager_class.call_count, 1)

config_manager.close()

@patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config")
@patch("devcycle_python_sdk.managers.config_manager.SSEManager")
def test_recreate_sse_connection_skips_if_config_cleared(self, mock_sse_manager_class, mock_get_config):
"""Test that _recreate_sse_connection skips if config is cleared during reconnection."""
mock_get_config.return_value = (
self.test_config_json,
self.test_etag,
self.test_lastmodified,
)

# Enable realtime updates
self.test_options.disable_realtime_updates = False

config_manager = EnvironmentConfigManager(
self.sdk_key, self.test_options, self.test_local_bucketing
)
time.sleep(0.1)

# Create a mock SSE manager
mock_old_sse = MagicMock()
mock_old_sse.client = MagicMock()
mock_thread = MagicMock()
mock_thread.is_alive.return_value = True

# Clear config during join
def clear_config_during_join(timeout=None):
config_manager._config = None
time.sleep(0.05)

mock_thread.join = clear_config_during_join
mock_old_sse.read_thread = mock_thread

config_manager._sse_manager = mock_old_sse

# Reset the mock after initialization
mock_sse_manager_class.reset_mock()

# Call _recreate_sse_connection
config_manager._recreate_sse_connection()

# Verify old manager was closed
mock_old_sse.client.close.assert_called_once()

# Verify no new SSE manager was created (because config was None)
mock_sse_manager_class.assert_not_called()

# _sse_manager should still be None
self.assertIsNone(config_manager._sse_manager)

config_manager.close()


if __name__ == "__main__":
unittest.main()