diff --git a/devcycle_python_sdk/managers/config_manager.py b/devcycle_python_sdk/managers/config_manager.py index d29d39a..f428772 100644 --- a/devcycle_python_sdk/managers/config_manager.py +++ b/devcycle_python_sdk/managers/config_manager.py @@ -58,44 +58,59 @@ def is_initialized(self) -> bool: def _recreate_sse_connection(self): """Recreate the SSE connection with the current config.""" + # Acquire lock to check state and save references to old connection with self._sse_reconnect_lock: if self._config is None or self._options.disable_realtime_updates: logger.debug( - "Devcycle: Skipping SSE recreation - no config or updates disabled" + "DevCycle: Skipping SSE recreation - no config or updates disabled" ) return - try: - # Close existing connection if present - if ( - self._sse_manager is not None - and self._sse_manager.client is not None - ): - self._sse_manager.client.close() - if self._sse_manager.read_thread.is_alive(): - self._sse_manager.read_thread.join(timeout=1.0) + # Save reference to old SSE manager and clear it to prevent concurrent access + old_sse_manager = self._sse_manager + self._sse_manager = None - # Create new SSE manager + # Perform potentially blocking operations outside the lock to avoid deadlock + # The SSE read thread may call sse_error/sse_state which need the lock + try: + if old_sse_manager is not None and old_sse_manager.client is not None: + old_sse_manager.client.close() + if old_sse_manager.read_thread.is_alive(): + old_sse_manager.read_thread.join(timeout=1.0) + except Exception as e: + logger.debug(f"DevCycle: Error closing old SSE connection: {e}") + + # Re-acquire lock to create new connection and update state + try: + with self._sse_reconnect_lock: + # Re-read config to ensure we use the latest version + if self._config is None: + logger.debug( + "DevCycle: Config was cleared during SSE reconnection, skipping" + ) + return + + # Create new SSE manager with the current config self._sse_manager = SSEManager( self.sse_state, self.sse_error, self.sse_message, ) self._sse_manager.update(self._config) - logger.info("Devcyle: SSE connection created successfully") - except Exception as e: - logger.debug(f"Devcycle: Failed to recreate SSE connection: {e}") + logger.info("DevCycle: SSE connection created successfully") + except Exception as e: + logger.debug(f"DevCycle: Failed to recreate SSE connection: {e}") def _delayed_sse_reconnect(self, delay_seconds: float): """Delayed SSE reconnection with configurable backoff.""" try: logger.debug( - f"Devcycle: Waiting {delay_seconds}s before reconnecting SSE..." + f"DevCycle: Waiting {delay_seconds}s before reconnecting SSE..." ) time.sleep(delay_seconds) self._recreate_sse_connection() except Exception as e: - logger.error(f"Devcycle: Error during delayed SSE reconnection: {e}") + logger.error(f"DevCycle: Error during delayed SSE reconnection: {e}") finally: with self._sse_reconnect_lock: self._sse_reconnecting = False @@ -138,10 +153,21 @@ def _get_config(self, last_modified: Optional[float] = None): or self._sse_manager.client is None or not self._sse_manager.read_thread.is_alive() ): - logger.info( - "DevCycle: SSE connection not active, creating new connection" - ) - self._recreate_sse_connection() + # Only recreate if not already reconnecting from error handler + with self._sse_reconnect_lock: + if not self._sse_reconnecting: + logger.info( + "DevCycle: SSE connection not active, creating new connection" + ) + should_recreate = True + else: + logger.debug( + "DevCycle: SSE reconnection already scheduled, skipping" + ) + should_recreate = False + + if should_recreate: + self._recreate_sse_connection() if ( trigger_on_client_initialized diff --git a/example/cloud_client_example.py b/example/cloud_client_example.py index 0a72f70..1310f82 100644 --- a/example/cloud_client_example.py +++ b/example/cloud_client_example.py @@ -61,7 +61,7 @@ def main(): client.track(user, event) except Exception as e: - logger.exception(f"Exception when calling Devcycle API: {e}\n") + logger.exception(f"Exception when calling DevCycle API: {e}\n") if __name__ == "__main__": diff --git a/test/managers/test_config_manager.py b/test/managers/test_config_manager.py index e8a2a47..b3a3c90 100644 --- a/test/managers/test_config_manager.py +++ b/test/managers/test_config_manager.py @@ -3,10 +3,11 @@ import time import unittest import uuid +import threading from datetime import datetime from email.utils import formatdate from time import mktime -from unittest.mock import patch, MagicMock +from unittest.mock import patch, MagicMock, Mock, PropertyMock from devcycle_python_sdk import DevCycleLocalOptions from devcycle_python_sdk.managers.config_manager import EnvironmentConfigManager @@ -151,6 +152,236 @@ def test_get_config_unchanged(self, mock_get_config): self.assertDictEqual(config_manager._config, self.test_config_json) self.test_local_bucketing.store_config.assert_not_called() + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + def test_recreate_sse_connection_clears_old_manager(self, mock_sse_manager_class, mock_get_config): + """Test that _recreate_sse_connection sets _sse_manager to None before blocking operations.""" + mock_get_config.return_value = ( + self.test_config_json, + self.test_etag, + self.test_lastmodified, + ) + + # Enable realtime updates for this test + self.test_options.disable_realtime_updates = False + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + # Create a mock SSE manager with a thread + mock_old_sse = MagicMock() + mock_old_sse.client = MagicMock() + mock_thread = MagicMock() + mock_thread.is_alive.return_value = True + + # Track when join is called and what _sse_manager is at that point + manager_during_join = [] + + def track_join(timeout=None): + manager_during_join.append(config_manager._sse_manager) + + mock_thread.join = track_join + mock_old_sse.read_thread = mock_thread + + config_manager._sse_manager = mock_old_sse + + # Call _recreate_sse_connection + config_manager._recreate_sse_connection() + + # Verify that _sse_manager was None during the join (blocking operation) + self.assertEqual(len(manager_during_join), 1) + self.assertIsNone(manager_during_join[0]) + + # Verify old manager was closed + mock_old_sse.client.close.assert_called_once() + + config_manager.close() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + def test_recreate_sse_connection_uses_latest_config(self, mock_sse_manager_class, mock_get_config): + """Test that _recreate_sse_connection uses the latest config when re-acquiring lock.""" + initial_config = self.test_config_json.copy() + mock_get_config.return_value = ( + initial_config, + self.test_etag, + self.test_lastmodified, + ) + + # Enable realtime updates + self.test_options.disable_realtime_updates = False + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + # Create a mock SSE manager + mock_old_sse = MagicMock() + mock_old_sse.client = MagicMock() + mock_thread = MagicMock() + mock_thread.is_alive.return_value = True + + # Update config during the blocking operation + updated_config = initial_config.copy() + updated_config["updated"] = True + + def delayed_config_update(timeout=None): + # Simulate config update happening during join + config_manager._config = updated_config + time.sleep(0.05) + + mock_thread.join = delayed_config_update + mock_old_sse.read_thread = mock_thread + + config_manager._sse_manager = mock_old_sse + + # Reset the mock after initialization + mock_sse_manager_class.reset_mock() + + # Create a new mock SSE manager for the recreation + mock_new_sse = MagicMock() + mock_sse_manager_class.return_value = mock_new_sse + + # Call _recreate_sse_connection + config_manager._recreate_sse_connection() + + # Verify the new SSE manager was created and updated with the latest config + mock_sse_manager_class.assert_called_once() + mock_new_sse.update.assert_called_once() + + # The config passed to update should be the updated one + call_args = mock_new_sse.update.call_args[0][0] + self.assertIn("updated", call_args) + self.assertTrue(call_args["updated"]) + + config_manager.close() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + def test_recreate_sse_connection_concurrent_calls(self, mock_sse_manager_class, mock_get_config): + """Test that concurrent calls to _recreate_sse_connection are handled safely.""" + mock_get_config.return_value = ( + self.test_config_json, + self.test_etag, + self.test_lastmodified, + ) + + # Enable realtime updates + self.test_options.disable_realtime_updates = False + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + # Create a mock SSE manager with slow close/join + mock_old_sse = MagicMock() + mock_old_sse.client = MagicMock() + mock_thread = MagicMock() + mock_thread.is_alive.return_value = True + + # Make join slow to allow concurrent calls + def slow_join(timeout=None): + time.sleep(0.2) + + mock_thread.join = slow_join + mock_old_sse.read_thread = mock_thread + + config_manager._sse_manager = mock_old_sse + + # Reset the mock after initialization + mock_sse_manager_class.reset_mock() + + # Mock new SSE managers + mock_new_sse_1 = MagicMock() + mock_new_sse_2 = MagicMock() + mock_sse_manager_class.side_effect = [mock_new_sse_1, mock_new_sse_2] + + # Track completion + results = [] + + def call_recreate(index): + try: + config_manager._recreate_sse_connection() + results.append(f"completed_{index}") + except Exception as e: + results.append(f"error_{index}: {e}") + + # Start two concurrent recreate calls + thread1 = threading.Thread(target=call_recreate, args=(1,)) + thread2 = threading.Thread(target=call_recreate, args=(2,)) + + thread1.start() + time.sleep(0.05) # Small delay to ensure thread1 starts first + thread2.start() + + thread1.join(timeout=2.0) + thread2.join(timeout=2.0) + + # Both should complete without errors + self.assertEqual(len(results), 2) + self.assertIn("completed_1", results) + self.assertIn("completed_2", results) + + # At least one SSE manager should be created + self.assertGreaterEqual(mock_sse_manager_class.call_count, 1) + + config_manager.close() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + def test_recreate_sse_connection_skips_if_config_cleared(self, mock_sse_manager_class, mock_get_config): + """Test that _recreate_sse_connection skips if config is cleared during reconnection.""" + mock_get_config.return_value = ( + self.test_config_json, + self.test_etag, + self.test_lastmodified, + ) + + # Enable realtime updates + self.test_options.disable_realtime_updates = False + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + # Create a mock SSE manager + mock_old_sse = MagicMock() + mock_old_sse.client = MagicMock() + mock_thread = MagicMock() + mock_thread.is_alive.return_value = True + + # Clear config during join + def clear_config_during_join(timeout=None): + config_manager._config = None + time.sleep(0.05) + + mock_thread.join = clear_config_during_join + mock_old_sse.read_thread = mock_thread + + config_manager._sse_manager = mock_old_sse + + # Reset the mock after initialization + mock_sse_manager_class.reset_mock() + + # Call _recreate_sse_connection + config_manager._recreate_sse_connection() + + # Verify old manager was closed + mock_old_sse.client.close.assert_called_once() + + # Verify no new SSE manager was created (because config was None) + mock_sse_manager_class.assert_not_called() + + # _sse_manager should still be None + self.assertIsNone(config_manager._sse_manager) + + config_manager.close() + if __name__ == "__main__": unittest.main()