From 228d13bb1786ab847a2471b02ec5d1967b0a0f43 Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Thu, 28 May 2026 16:26:02 +0530 Subject: [PATCH 01/11] feat!: introduce EventProcessor, drop flag-evaluation autocapture Rename PipelineAnalyticsProcessor to EventProcessor and surface a single `track_event` method on it. Remove `record_evaluation_event` and the per-flag autocapture wiring in `Flags.get_flag`; flag-evaluation telemetry is no longer emitted automatically. The PipelineAnalyticsConfig dataclass is renamed to EventProcessorConfig and the matching client kwarg becomes `event_processor_config`. BREAKING CHANGE: PipelineAnalyticsProcessor/PipelineAnalyticsConfig are renamed to EventProcessor/EventProcessorConfig. The `pipeline_analytics_config` kwarg on `Flagsmith` is renamed to `event_processor_config`. Automatic flag-evaluation events are no longer recorded by the event processor. --- flagsmith/__init__.py | 4 +- flagsmith/analytics.py | 51 ++------- flagsmith/flagsmith.py | 42 +++---- flagsmith/models.py | 38 +------ tests/conftest.py | 18 +-- tests/test_event_processor.py | 104 ++++++++++++++++++ tests/test_flagsmith.py | 78 ++----------- tests/test_pipeline_analytics.py | 182 ------------------------------- 8 files changed, 148 insertions(+), 369 deletions(-) create mode 100644 tests/test_event_processor.py delete mode 100644 tests/test_pipeline_analytics.py diff --git a/flagsmith/__init__.py b/flagsmith/__init__.py index d36feb8..92e8b9a 100644 --- a/flagsmith/__init__.py +++ b/flagsmith/__init__.py @@ -1,6 +1,6 @@ from flagsmith import webhooks -from flagsmith.analytics import PipelineAnalyticsConfig +from flagsmith.analytics import EventProcessorConfig from flagsmith.flagsmith import Flagsmith from flagsmith.version import __version__ -__all__ = ("Flagsmith", "PipelineAnalyticsConfig", "webhooks", "__version__") +__all__ = ("Flagsmith", "EventProcessorConfig", "webhooks", "__version__") diff --git a/flagsmith/analytics.py b/flagsmith/analytics.py index f0d0bab..dc94a70 100644 --- a/flagsmith/analytics.py +++ b/flagsmith/analytics.py @@ -72,24 +72,22 @@ def track_feature(self, feature_name: str) -> None: @dataclass -class PipelineAnalyticsConfig: +class EventProcessorConfig: analytics_server_url: str max_buffer_items: int = 1000 flush_interval_seconds: float = 10.0 -class PipelineAnalyticsProcessor: +class EventProcessor: """ - Buffered analytics processor that sends per-evaluation and custom events - to the Flagsmith pipeline analytics endpoint in batches. - - Evaluation events are deduplicated within each flush window. Events are - flushed periodically via a background timer or when the buffer is full. + Buffered event processor that batches custom events and POSTs them to the + Flagsmith event endpoint. Flushes on a background timer or when the buffer + fills. """ def __init__( self, - config: PipelineAnalyticsConfig, + config: EventProcessorConfig, environment_key: str, ) -> None: url = config.analytics_server_url @@ -101,44 +99,10 @@ def __init__( self._flush_interval_seconds = config.flush_interval_seconds self._buffer: typing.List[typing.Dict[str, typing.Any]] = [] - self._dedup_keys: typing.Dict[str, str] = {} self._lock = threading.Lock() self._timer: typing.Optional[threading.Timer] = None - def record_evaluation_event( - self, - flag_key: str, - enabled: bool, - value: typing.Any, - identity_identifier: typing.Optional[str] = None, - traits: typing.Optional[typing.Dict[str, typing.Any]] = None, - ) -> None: - fingerprint = f"{identity_identifier or 'none'}|{enabled}|{value}" - should_flush = False - - with self._lock: - if self._dedup_keys.get(flag_key) == fingerprint: - return - self._dedup_keys[flag_key] = fingerprint - self._buffer.append( - { - "event_id": flag_key, - "event_type": "flag_evaluation", - "evaluated_at": int(time.time() * 1000), - "identity_identifier": identity_identifier, - "enabled": enabled, - "value": value, - "traits": dict(traits) if traits else None, - "metadata": {"sdk_version": __version__}, - } - ) - if len(self._buffer) >= self._max_buffer: - should_flush = True - - if should_flush: - self.flush() - - def record_custom_event( + def track_event( self, event_name: str, identity_identifier: typing.Optional[str] = None, @@ -172,7 +136,6 @@ def flush(self) -> None: return events = self._buffer self._buffer = [] - self._dedup_keys.clear() payload = json.dumps( {"events": events, "environment_key": self._environment_key} diff --git a/flagsmith/flagsmith.py b/flagsmith/flagsmith.py index 8242948..ea8fadc 100644 --- a/flagsmith/flagsmith.py +++ b/flagsmith/flagsmith.py @@ -10,8 +10,8 @@ from flagsmith.analytics import ( AnalyticsProcessor, - PipelineAnalyticsConfig, - PipelineAnalyticsProcessor, + EventProcessor, + EventProcessorConfig, ) from flagsmith.exceptions import FlagsmithAPIError, FlagsmithClientError from flagsmith.mappers import ( @@ -74,7 +74,7 @@ def __init__( environment_refresh_interval_seconds: typing.Union[int, float] = 60, retries: typing.Optional[Retry] = None, enable_analytics: bool = False, - pipeline_analytics_config: typing.Optional[PipelineAnalyticsConfig] = None, + event_processor_config: typing.Optional[EventProcessorConfig] = None, default_flag_handler: typing.Optional[ typing.Callable[[str], DefaultFlag] ] = None, @@ -120,9 +120,7 @@ def __init__( self.default_flag_handler = default_flag_handler self.enable_realtime_updates = enable_realtime_updates self._analytics_processor: typing.Optional[AnalyticsProcessor] = None - self._pipeline_analytics_processor: typing.Optional[ - PipelineAnalyticsProcessor - ] = None + self._event_processor: typing.Optional[EventProcessor] = None self.__evaluation_context: typing.Optional[SDKEvaluationContext] = None self._segment_overrides_index: SegmentOverridesIndex = {} self._environment_updated_at: typing.Optional[datetime] = None @@ -189,25 +187,25 @@ def __init__( self._initialise_analytics( environment_key=environment_key, enable_analytics=enable_analytics, - pipeline_analytics_config=pipeline_analytics_config, + event_processor_config=event_processor_config, ) def _initialise_analytics( self, environment_key: str, enable_analytics: bool, - pipeline_analytics_config: typing.Optional[PipelineAnalyticsConfig], + event_processor_config: typing.Optional[EventProcessorConfig], ) -> None: if enable_analytics: self._analytics_processor = AnalyticsProcessor( environment_key, self.api_url, timeout=self.request_timeout_seconds ) - if pipeline_analytics_config: - self._pipeline_analytics_processor = PipelineAnalyticsProcessor( - config=pipeline_analytics_config, + if event_processor_config: + self._event_processor = EventProcessor( + config=event_processor_config, environment_key=environment_key, ) - self._pipeline_analytics_processor.start() + self._event_processor.start() def _initialise_local_evaluation(self) -> None: # To ensure that the environment is set before allowing subsequent @@ -331,12 +329,12 @@ def track_event( traits: typing.Optional[TraitMapping] = None, metadata: typing.Optional[typing.Dict[str, typing.Any]] = None, ) -> None: - if not self._pipeline_analytics_processor: + if not self._event_processor: raise ValueError( - "Pipeline analytics is not configured. " - "Provide pipeline_analytics_config to use track_event." + "Event processor is not configured. " + "Provide event_processor_config to use track_event." ) - self._pipeline_analytics_processor.record_custom_event( + self._event_processor.track_event( event_name=event_name, identity_identifier=identity_identifier, traits=resolve_trait_values(traits), @@ -418,7 +416,6 @@ def _get_environment_flags_from_document(self) -> Flags: evaluation_result=evaluation_result, analytics_processor=self._analytics_processor, default_flag_handler=self.default_flag_handler, - pipeline_analytics_processor=self._pipeline_analytics_processor, ) def _get_identity_flags_from_document( @@ -442,9 +439,6 @@ def _get_identity_flags_from_document( overrides_index=self._segment_overrides_index, analytics_processor=self._analytics_processor, default_flag_handler=self.default_flag_handler, - pipeline_analytics_processor=self._pipeline_analytics_processor, - identity_identifier=identifier, - traits=resolve_trait_values(traits), ) def _get_environment_flags_from_api(self) -> Flags: @@ -456,7 +450,6 @@ def _get_environment_flags_from_api(self) -> Flags: api_flags=json_response, analytics_processor=self._analytics_processor, default_flag_handler=self.default_flag_handler, - pipeline_analytics_processor=self._pipeline_analytics_processor, ) except FlagsmithAPIError: if self.offline_handler: @@ -489,9 +482,6 @@ def _get_identity_flags_from_api( api_flags=json_response["flags"], analytics_processor=self._analytics_processor, default_flag_handler=self.default_flag_handler, - pipeline_analytics_processor=self._pipeline_analytics_processor, - identity_identifier=identifier, - traits=resolve_trait_values(traits), ) except FlagsmithAPIError: if self.offline_handler: @@ -525,5 +515,5 @@ def __del__(self) -> None: if hasattr(self, "event_stream_thread"): self.event_stream_thread.stop() - if self._pipeline_analytics_processor: - self._pipeline_analytics_processor.stop() + if self._event_processor: + self._event_processor.stop() diff --git a/flagsmith/models.py b/flagsmith/models.py index 9e48724..cfb5ff4 100644 --- a/flagsmith/models.py +++ b/flagsmith/models.py @@ -6,7 +6,7 @@ from flag_engine import engine from flag_engine.context.types import SegmentContext -from flagsmith.analytics import AnalyticsProcessor, PipelineAnalyticsProcessor +from flagsmith.analytics import AnalyticsProcessor from flagsmith.exceptions import FlagsmithFeatureDoesNotExistError from flagsmith.types import ( FeatureMetadata, @@ -85,9 +85,6 @@ class Flags: flags: typing.Dict[str, Flag] = field(default_factory=dict) default_flag_handler: typing.Optional[typing.Callable[[str], DefaultFlag]] = None _analytics_processor: typing.Optional[AnalyticsProcessor] = None - _pipeline_analytics_processor: typing.Optional[PipelineAnalyticsProcessor] = None - _identity_identifier: typing.Optional[str] = None - _traits: typing.Optional[typing.Dict[str, typing.Any]] = None # Lazy-evaluation state. When `_context` is set, `flags` is a # per-feature memo rather than a fully-materialised snapshot; unseen # features are resolved on demand via the engine primitives and @@ -103,11 +100,6 @@ def from_evaluation_result( evaluation_result: SDKEvaluationResult, analytics_processor: typing.Optional[AnalyticsProcessor], default_flag_handler: typing.Optional[typing.Callable[[str], DefaultFlag]], - pipeline_analytics_processor: typing.Optional[ - PipelineAnalyticsProcessor - ] = None, - identity_identifier: typing.Optional[str] = None, - traits: typing.Optional[typing.Dict[str, typing.Any]] = None, ) -> Flags: return cls( flags={ @@ -117,9 +109,6 @@ def from_evaluation_result( }, default_flag_handler=default_flag_handler, _analytics_processor=analytics_processor, - _pipeline_analytics_processor=pipeline_analytics_processor, - _identity_identifier=identity_identifier, - _traits=traits, ) @classmethod @@ -129,11 +118,6 @@ def from_evaluation_context( overrides_index: SegmentOverridesIndex, analytics_processor: typing.Optional[AnalyticsProcessor], default_flag_handler: typing.Optional[typing.Callable[[str], DefaultFlag]], - pipeline_analytics_processor: typing.Optional[ - PipelineAnalyticsProcessor - ] = None, - identity_identifier: typing.Optional[str] = None, - traits: typing.Optional[typing.Dict[str, typing.Any]] = None, ) -> Flags: """Build a lazy `Flags` backed by an evaluation context. @@ -146,9 +130,6 @@ def from_evaluation_context( flags={}, default_flag_handler=default_flag_handler, _analytics_processor=analytics_processor, - _pipeline_analytics_processor=pipeline_analytics_processor, - _identity_identifier=identity_identifier, - _traits=traits, _context=context, _overrides_index=overrides_index, ) @@ -159,11 +140,6 @@ def from_api_flags( api_flags: typing.Sequence[typing.Mapping[str, typing.Any]], analytics_processor: typing.Optional[AnalyticsProcessor], default_flag_handler: typing.Optional[typing.Callable[[str], DefaultFlag]], - pipeline_analytics_processor: typing.Optional[ - PipelineAnalyticsProcessor - ] = None, - identity_identifier: typing.Optional[str] = None, - traits: typing.Optional[typing.Dict[str, typing.Any]] = None, ) -> Flags: flags = { flag_data["feature"]["name"]: Flag.from_api_flag(flag_data) @@ -174,9 +150,6 @@ def from_api_flags( flags=flags, default_flag_handler=default_flag_handler, _analytics_processor=analytics_processor, - _pipeline_analytics_processor=pipeline_analytics_processor, - _identity_identifier=identity_identifier, - _traits=traits, ) def all_flags(self) -> typing.List[Flag]: @@ -252,15 +225,6 @@ def get_flag(self, feature_name: str) -> typing.Union[DefaultFlag, Flag]: if self._analytics_processor and hasattr(flag, "feature_name"): self._analytics_processor.track_feature(flag.feature_name) - if self._pipeline_analytics_processor and hasattr(flag, "feature_name"): - self._pipeline_analytics_processor.record_evaluation_event( - flag_key=flag.feature_name, - enabled=flag.enabled, - value=flag.value, - identity_identifier=self._identity_identifier, - traits=self._traits, - ) - return flag def _resolve_flag(self, feature_name: str) -> Flag: diff --git a/tests/conftest.py b/tests/conftest.py index 9c58c96..f2f6173 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,8 +13,8 @@ from flagsmith import Flagsmith from flagsmith.analytics import ( AnalyticsProcessor, - PipelineAnalyticsConfig, - PipelineAnalyticsProcessor, + EventProcessor, + EventProcessorConfig, ) from flagsmith.api.types import EnvironmentModel from flagsmith.mappers import map_environment_document_to_context @@ -31,16 +31,16 @@ def analytics_processor() -> AnalyticsProcessor: @pytest.fixture() -def pipeline_analytics_config() -> PipelineAnalyticsConfig: - return PipelineAnalyticsConfig(analytics_server_url="http://test_analytics/") +def event_processor_config() -> EventProcessorConfig: + return EventProcessorConfig(analytics_server_url="http://test_analytics/") @pytest.fixture() -def pipeline_analytics_processor( - pipeline_analytics_config: PipelineAnalyticsConfig, -) -> PipelineAnalyticsProcessor: - return PipelineAnalyticsProcessor( - config=pipeline_analytics_config, +def event_processor( + event_processor_config: EventProcessorConfig, +) -> EventProcessor: + return EventProcessor( + config=event_processor_config, environment_key="test_key", ) diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py new file mode 100644 index 0000000..178623d --- /dev/null +++ b/tests/test_event_processor.py @@ -0,0 +1,104 @@ +import json +from concurrent.futures import Future +from unittest import mock + +from flagsmith.analytics import ( + EventProcessor, + EventProcessorConfig, +) + + +def test_track_event_buffers_event(event_processor: EventProcessor) -> None: + event_processor.track_event( + event_name="purchase", + identity_identifier="user1", + traits={"plan": "premium"}, + metadata={"amount": 99}, + ) + # Events are never deduplicated + event_processor.track_event( + event_name="purchase", + identity_identifier="user1", + ) + + assert len(event_processor._buffer) == 2 + event = event_processor._buffer[0] + assert event["event_id"] == "purchase" + assert event["event_type"] == "custom_event" + assert event["identity_identifier"] == "user1" + assert event["enabled"] is None + assert event["value"] is None + assert event["traits"] == {"plan": "premium"} + assert event["metadata"]["amount"] == 99 + assert "sdk_version" in event["metadata"] + assert isinstance(event["evaluated_at"], int) + + +def test_auto_flush_on_buffer_full() -> None: + config = EventProcessorConfig( + analytics_server_url="http://test/", max_buffer_items=5 + ) + processor = EventProcessor(config=config, environment_key="key") + + with mock.patch("flagsmith.analytics.session"): + for i in range(5): + processor.track_event(event_name=f"event_{i}") + + assert len(processor._buffer) == 0 + + +def test_flush_sends_correct_http_request(event_processor: EventProcessor) -> None: + with mock.patch("flagsmith.analytics.session") as mock_session: + event_processor.track_event(event_name="purchase", identity_identifier="user1") + event_processor.flush() + + mock_session.post.assert_called_once() + call_kwargs = mock_session.post.call_args + assert call_kwargs[0][0] == "http://test_analytics/v1/analytics/batch" + + headers = call_kwargs[1]["headers"] + assert headers["X-Environment-Key"] == "test_key" + assert headers["Content-Type"] == "application/json; charset=utf-8" + assert "flagsmith-python-client/" in headers["Flagsmith-SDK-User-Agent"] + + body = json.loads(call_kwargs[1]["data"]) + assert body["environment_key"] == "test_key" + assert len(body["events"]) == 1 + assert body["events"][0]["event_id"] == "purchase" + + +def test_flush_noop_when_empty(event_processor: EventProcessor) -> None: + with mock.patch("flagsmith.analytics.session") as mock_session: + event_processor.flush() + + mock_session.post.assert_not_called() + + +def test_failed_flush_requeues_events(event_processor: EventProcessor) -> None: + future: Future[None] = Future() + future.set_exception(Exception("connection error")) + + with mock.patch("flagsmith.analytics.session") as mock_session: + mock_session.post.return_value = future + event_processor.track_event(event_name="purchase") + event_processor.flush() + + assert len(event_processor._buffer) == 1 + assert event_processor._buffer[0]["event_id"] == "purchase" + + +def test_start_stop_lifecycle() -> None: + config = EventProcessorConfig( + analytics_server_url="http://test/", flush_interval_seconds=100 + ) + processor = EventProcessor(config=config, environment_key="key") + + processor.start() + assert processor._timer is not None + assert processor._timer.is_alive() + + with mock.patch("flagsmith.analytics.session"): + processor.track_event(event_name="purchase") + processor.stop() + + assert len(processor._buffer) == 0 diff --git a/tests/test_flagsmith.py b/tests/test_flagsmith.py index b5891ed..0a4f2c5 100644 --- a/tests/test_flagsmith.py +++ b/tests/test_flagsmith.py @@ -10,7 +10,7 @@ from responses import matchers from flagsmith import Flagsmith, __version__ -from flagsmith.analytics import PipelineAnalyticsConfig +from flagsmith.analytics import EventProcessorConfig from flagsmith.api.types import EnvironmentModel from flagsmith.exceptions import ( FlagsmithAPIError, @@ -953,88 +953,28 @@ def test_flagsmith__init__expected_headers_sent( def test_track_event_raises_without_config(api_key: str) -> None: flagsmith = Flagsmith(environment_key=api_key) - with pytest.raises(ValueError, match="Pipeline analytics is not configured"): + with pytest.raises(ValueError, match="Event processor is not configured"): flagsmith.track_event("purchase") -@responses.activate() -def test_pipeline_analytics_records_events( - mocker: MockerFixture, api_key: str, flags_json: str +def test_track_event_delegates_to_event_processor( + mocker: MockerFixture, api_key: str ) -> None: - config = PipelineAnalyticsConfig(analytics_server_url="http://test/") - flagsmith = Flagsmith(environment_key=api_key, pipeline_analytics_config=config) - - mock_eval = mocker.patch.object( - flagsmith._pipeline_analytics_processor, "record_evaluation_event" - ) - mock_custom = mocker.patch.object( - flagsmith._pipeline_analytics_processor, "record_custom_event" - ) + config = EventProcessorConfig(analytics_server_url="http://test/") + flagsmith = Flagsmith(environment_key=api_key, event_processor_config=config) - responses.add(method="GET", url=flagsmith.environment_flags_url, body=flags_json) - flags = flagsmith.get_environment_flags() - flags.get_flag("some_feature") - - mock_eval.assert_called_once_with( - flag_key="some_feature", - enabled=True, - value="some-value", - identity_identifier=None, - traits=None, - ) + mock_track = mocker.patch.object(flagsmith._event_processor, "track_event") flagsmith.track_event( "purchase", identity_identifier="user1", - traits={"plan": "premium"}, + traits={"plan": {"value": "premium", "transient": True}}, metadata={"amount": 99}, ) - mock_custom.assert_called_once_with( + mock_track.assert_called_once_with( event_name="purchase", identity_identifier="user1", traits={"plan": "premium"}, metadata={"amount": 99}, ) - - -@responses.activate() -def test_identity_flags_records_evaluation_with_resolved_traits( - mocker: MockerFixture, api_key: str, identities_json: str -) -> None: - config = PipelineAnalyticsConfig(analytics_server_url="http://test/") - flagsmith = Flagsmith(environment_key=api_key, pipeline_analytics_config=config) - - mock_record = mocker.patch.object( - flagsmith._pipeline_analytics_processor, "record_evaluation_event" - ) - - responses.add(method="POST", url=flagsmith.identities_url, body=identities_json) - responses.add(method="POST", url=flagsmith.identities_url, body=identities_json) - - flags = flagsmith.get_identity_flags("user123", traits={"plan": "premium"}) - flags.get_flag("some_feature") - - mock_record.assert_called_once_with( - flag_key="some_feature", - enabled=True, - value="some-value", - identity_identifier="user123", - traits={"plan": "premium"}, - ) - - mock_record.reset_mock() - - flags = flagsmith.get_identity_flags( - "user123", - traits={"plan": {"value": "premium", "transient": True}}, - ) - flags.get_flag("some_feature") - - mock_record.assert_called_once_with( - flag_key="some_feature", - enabled=True, - value="some-value", - identity_identifier="user123", - traits={"plan": "premium"}, - ) diff --git a/tests/test_pipeline_analytics.py b/tests/test_pipeline_analytics.py deleted file mode 100644 index 2a45899..0000000 --- a/tests/test_pipeline_analytics.py +++ /dev/null @@ -1,182 +0,0 @@ -import json -from concurrent.futures import Future -from unittest import mock - -import pytest - -from flagsmith.analytics import ( - PipelineAnalyticsConfig, - PipelineAnalyticsProcessor, -) - - -def test_record_evaluation_event_buffers_event( - pipeline_analytics_processor: PipelineAnalyticsProcessor, -) -> None: - pipeline_analytics_processor.record_evaluation_event( - flag_key="my_flag", - enabled=True, - value="variant_a", - identity_identifier="user123", - traits={"plan": "premium"}, - ) - - assert len(pipeline_analytics_processor._buffer) == 1 - event = pipeline_analytics_processor._buffer[0] - assert event["event_id"] == "my_flag" - assert event["event_type"] == "flag_evaluation" - assert event["identity_identifier"] == "user123" - assert event["enabled"] is True - assert event["value"] == "variant_a" - assert event["traits"] == {"plan": "premium"} - assert "sdk_version" in event["metadata"] - assert isinstance(event["evaluated_at"], int) - - -@pytest.mark.parametrize( - "second_enabled, expected_count", - [ - (True, 1), # same fingerprint -> deduplicated - (False, 2), # different fingerprint -> both kept - ], -) -def test_evaluation_event_deduplication( - pipeline_analytics_processor: PipelineAnalyticsProcessor, - second_enabled: bool, - expected_count: int, -) -> None: - pipeline_analytics_processor.record_evaluation_event( - flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1" - ) - pipeline_analytics_processor.record_evaluation_event( - flag_key="my_flag", - enabled=second_enabled, - value="v1", - identity_identifier="user1", - ) - - assert len(pipeline_analytics_processor._buffer) == expected_count - - -def test_dedup_keys_cleared_after_flush( - pipeline_analytics_processor: PipelineAnalyticsProcessor, -) -> None: - with mock.patch("flagsmith.analytics.session"): - pipeline_analytics_processor.record_evaluation_event( - flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1" - ) - pipeline_analytics_processor.flush() - - pipeline_analytics_processor.record_evaluation_event( - flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1" - ) - - assert len(pipeline_analytics_processor._buffer) == 1 - - -def test_auto_flush_on_buffer_full() -> None: - config = PipelineAnalyticsConfig( - analytics_server_url="http://test/", max_buffer_items=5 - ) - processor = PipelineAnalyticsProcessor(config=config, environment_key="key") - - with mock.patch("flagsmith.analytics.session"): - for i in range(5): - processor.record_evaluation_event( - flag_key=f"flag_{i}", enabled=True, value=str(i) - ) - - assert len(processor._buffer) == 0 - - -def test_flush_sends_correct_http_request( - pipeline_analytics_processor: PipelineAnalyticsProcessor, -) -> None: - with mock.patch("flagsmith.analytics.session") as mock_session: - pipeline_analytics_processor.record_evaluation_event( - flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1" - ) - pipeline_analytics_processor.flush() - - mock_session.post.assert_called_once() - call_kwargs = mock_session.post.call_args - assert call_kwargs[0][0] == "http://test_analytics/v1/analytics/batch" - - headers = call_kwargs[1]["headers"] - assert headers["X-Environment-Key"] == "test_key" - assert headers["Content-Type"] == "application/json; charset=utf-8" - assert "flagsmith-python-client/" in headers["Flagsmith-SDK-User-Agent"] - - body = json.loads(call_kwargs[1]["data"]) - assert body["environment_key"] == "test_key" - assert len(body["events"]) == 1 - assert body["events"][0]["event_id"] == "my_flag" - - -def test_flush_noop_when_empty( - pipeline_analytics_processor: PipelineAnalyticsProcessor, -) -> None: - with mock.patch("flagsmith.analytics.session") as mock_session: - pipeline_analytics_processor.flush() - - mock_session.post.assert_not_called() - - -def test_failed_flush_requeues_events( - pipeline_analytics_processor: PipelineAnalyticsProcessor, -) -> None: - future: Future[None] = Future() - future.set_exception(Exception("connection error")) - - with mock.patch("flagsmith.analytics.session") as mock_session: - mock_session.post.return_value = future - pipeline_analytics_processor.record_evaluation_event( - flag_key="my_flag", enabled=True, value="v1" - ) - pipeline_analytics_processor.flush() - - assert len(pipeline_analytics_processor._buffer) == 1 - assert pipeline_analytics_processor._buffer[0]["event_id"] == "my_flag" - - -def test_record_custom_event( - pipeline_analytics_processor: PipelineAnalyticsProcessor, -) -> None: - pipeline_analytics_processor.record_custom_event( - event_name="purchase", - identity_identifier="user1", - traits={"plan": "premium"}, - metadata={"amount": 99}, - ) - # Custom events are never deduplicated - pipeline_analytics_processor.record_custom_event( - event_name="purchase", - identity_identifier="user1", - ) - - assert len(pipeline_analytics_processor._buffer) == 2 - event = pipeline_analytics_processor._buffer[0] - assert event["event_id"] == "purchase" - assert event["event_type"] == "custom_event" - assert event["enabled"] is None - assert event["value"] is None - assert event["traits"] == {"plan": "premium"} - assert event["metadata"]["amount"] == 99 - assert "sdk_version" in event["metadata"] - - -def test_start_stop_lifecycle() -> None: - config = PipelineAnalyticsConfig( - analytics_server_url="http://test/", flush_interval_seconds=100 - ) - processor = PipelineAnalyticsProcessor(config=config, environment_key="key") - - processor.start() - assert processor._timer is not None - assert processor._timer.is_alive() - - with mock.patch("flagsmith.analytics.session"): - processor.record_evaluation_event(flag_key="my_flag", enabled=True, value="v1") - processor.stop() - - assert len(processor._buffer) == 0 From ad8c89142662b2f93a53c505d0b331380160c08b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 28 May 2026 10:58:28 +0000 Subject: [PATCH 02/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_event_processor.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index 178623d..1468826 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -2,10 +2,7 @@ from concurrent.futures import Future from unittest import mock -from flagsmith.analytics import ( - EventProcessor, - EventProcessorConfig, -) +from flagsmith.analytics import EventProcessor, EventProcessorConfig def test_track_event_buffers_event(event_processor: EventProcessor) -> None: From 53d19f1e19f79807ab00cee3bef7b8cf0a225718 Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Fri, 29 May 2026 12:55:40 +0530 Subject: [PATCH 03/11] feat: add get_experiment_flag and track_exposure_event MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `Flagsmith.track_exposure_event(feature_name, identifier, value, traits, metadata, timestamp)` and `EventProcessor.track_exposure_event` for emitting "flag_exposure" events. Value is narrowed to Optional[str] since variants are named. Add `Flagsmith.get_experiment_flag(feature_name, identifier, traits)` that calls `get_identity_flags` + `get_flag`, fires `track_exposure_event` (skipped for DefaultFlag results to keep experiment data clean), and returns the Flag/DefaultFlag. Reshape `track_event` and `track_exposure_event` signatures: rename `event_name` → `event`, `identity_identifier` → `identifier`; add `value`, `timestamp` params. Extract a private `_buffer_event` helper to share buffer + auto-flush logic between the two event types. --- flagsmith/analytics.py | 59 ++++++++++++++++--- flagsmith/flagsmith.py | 59 +++++++++++++++++-- tests/test_event_processor.py | 78 +++++++++++++++++++----- tests/test_flagsmith.py | 108 +++++++++++++++++++++++++++++++++- 4 files changed, 274 insertions(+), 30 deletions(-) diff --git a/flagsmith/analytics.py b/flagsmith/analytics.py index dc94a70..eca88e8 100644 --- a/flagsmith/analytics.py +++ b/flagsmith/analytics.py @@ -2,7 +2,6 @@ import json import logging import threading -import time import typing from dataclasses import dataclass from datetime import datetime @@ -104,22 +103,64 @@ def __init__( def track_event( self, - event_name: str, - identity_identifier: typing.Optional[str] = None, + event: str, + identifier: typing.Optional[str] = None, + value: typing.Optional[typing.Union[str, int, float]] = None, traits: typing.Optional[typing.Dict[str, typing.Any]] = None, metadata: typing.Optional[typing.Dict[str, typing.Any]] = None, + timestamp: typing.Optional[datetime] = None, ) -> None: - should_flush = False + self._buffer_event( + event_id=event, + event_type="custom_event", + identifier=identifier, + value=value, + traits=traits, + metadata=metadata, + timestamp=timestamp, + ) + def track_exposure_event( + self, + feature_name: str, + identifier: typing.Optional[str] = None, + value: typing.Optional[str] = None, + traits: typing.Optional[typing.Dict[str, typing.Any]] = None, + metadata: typing.Optional[typing.Dict[str, typing.Any]] = None, + timestamp: typing.Optional[datetime] = None, + ) -> None: + self._buffer_event( + event_id=feature_name, + event_type="flag_exposure", + identifier=identifier, + value=value, + traits=traits, + metadata=metadata, + timestamp=timestamp, + ) + + def _buffer_event( + self, + event_id: str, + event_type: str, + identifier: typing.Optional[str], + value: typing.Optional[typing.Union[str, int, float]], + traits: typing.Optional[typing.Dict[str, typing.Any]], + metadata: typing.Optional[typing.Dict[str, typing.Any]], + timestamp: typing.Optional[datetime], + ) -> None: + should_flush = False with self._lock: self._buffer.append( { - "event_id": event_name, - "event_type": "custom_event", - "evaluated_at": int(time.time() * 1000), - "identity_identifier": identity_identifier, + "event_id": event_id, + "event_type": event_type, + "evaluated_at": int( + (timestamp or datetime.now()).timestamp() * 1000 + ), + "identity_identifier": identifier, "enabled": None, - "value": None, + "value": value, "traits": dict(traits) if traits else None, "metadata": {**(metadata or {}), "sdk_version": __version__}, } diff --git a/flagsmith/flagsmith.py b/flagsmith/flagsmith.py index ea8fadc..6bdb455 100644 --- a/flagsmith/flagsmith.py +++ b/flagsmith/flagsmith.py @@ -23,6 +23,7 @@ ) from flagsmith.models import ( DefaultFlag, + Flag, Flags, Segment, SegmentOverridesIndex, @@ -322,12 +323,37 @@ def get_identity_segments( return map_segment_results_to_identity_segments(evaluation_result["segments"]) + def get_experiment_flag( + self, + feature_name: str, + identifier: str, + traits: typing.Optional[TraitMapping] = None, + ) -> typing.Union[DefaultFlag, Flag]: + """ + Resolve a flag for an identity and record an exposure event. + + Skips the exposure event when the resolved flag is a `DefaultFlag` + (i.e. the feature was not present and was served via the + `default_flag_handler`), to keep experimentation data clean. + """ + flag = self.get_identity_flags(identifier, traits).get_flag(feature_name) + if not flag.is_default: + self.track_exposure_event( + feature_name=feature_name, + identifier=identifier, + value=str(flag.value) if flag.value is not None else None, + traits=traits, + ) + return flag + def track_event( self, - event_name: str, - identity_identifier: typing.Optional[str] = None, + event: str, + identifier: typing.Optional[str] = None, + value: typing.Optional[typing.Union[str, int, float]] = None, traits: typing.Optional[TraitMapping] = None, metadata: typing.Optional[typing.Dict[str, typing.Any]] = None, + timestamp: typing.Optional[datetime] = None, ) -> None: if not self._event_processor: raise ValueError( @@ -335,10 +361,35 @@ def track_event( "Provide event_processor_config to use track_event." ) self._event_processor.track_event( - event_name=event_name, - identity_identifier=identity_identifier, + event=event, + identifier=identifier, + value=value, + traits=resolve_trait_values(traits), + metadata=metadata, + timestamp=timestamp, + ) + + def track_exposure_event( + self, + feature_name: str, + identifier: typing.Optional[str] = None, + value: typing.Optional[str] = None, + traits: typing.Optional[TraitMapping] = None, + metadata: typing.Optional[typing.Dict[str, typing.Any]] = None, + timestamp: typing.Optional[datetime] = None, + ) -> None: + if not self._event_processor: + raise ValueError( + "Event processor is not configured. " + "Provide event_processor_config to use track_exposure_event." + ) + self._event_processor.track_exposure_event( + feature_name=feature_name, + identifier=identifier, + value=value, traits=resolve_trait_values(traits), metadata=metadata, + timestamp=timestamp, ) def update_environment(self) -> None: diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index 1468826..277312e 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -1,5 +1,6 @@ import json from concurrent.futures import Future +from datetime import datetime, timezone from unittest import mock from flagsmith.analytics import EventProcessor, EventProcessorConfig @@ -7,16 +8,13 @@ def test_track_event_buffers_event(event_processor: EventProcessor) -> None: event_processor.track_event( - event_name="purchase", - identity_identifier="user1", + event="purchase", + identifier="user1", + value=99.5, traits={"plan": "premium"}, - metadata={"amount": 99}, - ) - # Events are never deduplicated - event_processor.track_event( - event_name="purchase", - identity_identifier="user1", + metadata={"sku": "abc"}, ) + event_processor.track_event(event="purchase", identifier="user1") assert len(event_processor._buffer) == 2 event = event_processor._buffer[0] @@ -24,13 +22,65 @@ def test_track_event_buffers_event(event_processor: EventProcessor) -> None: assert event["event_type"] == "custom_event" assert event["identity_identifier"] == "user1" assert event["enabled"] is None - assert event["value"] is None + assert event["value"] == 99.5 assert event["traits"] == {"plan": "premium"} - assert event["metadata"]["amount"] == 99 + assert event["metadata"]["sku"] == "abc" assert "sdk_version" in event["metadata"] assert isinstance(event["evaluated_at"], int) +def test_track_event_defaults_timestamp_to_now( + event_processor: EventProcessor, +) -> None: + before = int(datetime.now().timestamp() * 1000) + event_processor.track_event(event="ping") + after = int(datetime.now().timestamp() * 1000) + + assert before <= event_processor._buffer[0]["evaluated_at"] <= after + + +def test_track_event_uses_explicit_timestamp( + event_processor: EventProcessor, +) -> None: + ts = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + + event_processor.track_event(event="ping", timestamp=ts) + + assert event_processor._buffer[0]["evaluated_at"] == int(ts.timestamp() * 1000) + + +def test_track_exposure_event_buffers_with_flag_exposure_type( + event_processor: EventProcessor, +) -> None: + event_processor.track_exposure_event( + feature_name="checkout_v2", + identifier="user1", + value="variant_b", + traits={"plan": "premium"}, + metadata={"source": "homepage"}, + ) + + assert len(event_processor._buffer) == 1 + event = event_processor._buffer[0] + assert event["event_id"] == "checkout_v2" + assert event["event_type"] == "flag_exposure" + assert event["identity_identifier"] == "user1" + assert event["value"] == "variant_b" + assert event["traits"] == {"plan": "premium"} + assert event["metadata"]["source"] == "homepage" + assert "sdk_version" in event["metadata"] + + +def test_track_exposure_event_uses_explicit_timestamp( + event_processor: EventProcessor, +) -> None: + ts = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + + event_processor.track_exposure_event(feature_name="checkout_v2", timestamp=ts) + + assert event_processor._buffer[0]["evaluated_at"] == int(ts.timestamp() * 1000) + + def test_auto_flush_on_buffer_full() -> None: config = EventProcessorConfig( analytics_server_url="http://test/", max_buffer_items=5 @@ -39,14 +89,14 @@ def test_auto_flush_on_buffer_full() -> None: with mock.patch("flagsmith.analytics.session"): for i in range(5): - processor.track_event(event_name=f"event_{i}") + processor.track_event(event=f"event_{i}") assert len(processor._buffer) == 0 def test_flush_sends_correct_http_request(event_processor: EventProcessor) -> None: with mock.patch("flagsmith.analytics.session") as mock_session: - event_processor.track_event(event_name="purchase", identity_identifier="user1") + event_processor.track_event(event="purchase", identifier="user1") event_processor.flush() mock_session.post.assert_called_once() @@ -77,7 +127,7 @@ def test_failed_flush_requeues_events(event_processor: EventProcessor) -> None: with mock.patch("flagsmith.analytics.session") as mock_session: mock_session.post.return_value = future - event_processor.track_event(event_name="purchase") + event_processor.track_event(event="purchase") event_processor.flush() assert len(event_processor._buffer) == 1 @@ -95,7 +145,7 @@ def test_start_stop_lifecycle() -> None: assert processor._timer.is_alive() with mock.patch("flagsmith.analytics.session"): - processor.track_event(event_name="purchase") + processor.track_event(event="purchase") processor.stop() assert len(processor._buffer) == 0 diff --git a/tests/test_flagsmith.py b/tests/test_flagsmith.py index 0a4f2c5..18ae622 100644 --- a/tests/test_flagsmith.py +++ b/tests/test_flagsmith.py @@ -967,14 +967,116 @@ def test_track_event_delegates_to_event_processor( flagsmith.track_event( "purchase", - identity_identifier="user1", + identifier="user1", + value=99.5, traits={"plan": {"value": "premium", "transient": True}}, metadata={"amount": 99}, ) mock_track.assert_called_once_with( - event_name="purchase", - identity_identifier="user1", + event="purchase", + identifier="user1", + value=99.5, traits={"plan": "premium"}, metadata={"amount": 99}, + timestamp=None, ) + + +def test_track_exposure_event_raises_without_config(api_key: str) -> None: + flagsmith = Flagsmith(environment_key=api_key) + with pytest.raises(ValueError, match="Event processor is not configured"): + flagsmith.track_exposure_event("checkout_v2") + + +def test_track_exposure_event_delegates_to_event_processor( + mocker: MockerFixture, api_key: str +) -> None: + config = EventProcessorConfig(analytics_server_url="http://test/") + flagsmith = Flagsmith(environment_key=api_key, event_processor_config=config) + + mock_track = mocker.patch.object( + flagsmith._event_processor, "track_exposure_event" + ) + + flagsmith.track_exposure_event( + "checkout_v2", + identifier="user1", + value="variant_b", + traits={"plan": {"value": "premium", "transient": True}}, + metadata={"source": "homepage"}, + ) + + mock_track.assert_called_once_with( + feature_name="checkout_v2", + identifier="user1", + value="variant_b", + traits={"plan": "premium"}, + metadata={"source": "homepage"}, + timestamp=None, + ) + + +@responses.activate() +def test_get_experiment_flag_returns_flag_and_tracks_exposure( + mocker: MockerFixture, api_key: str, identities_json: str +) -> None: + config = EventProcessorConfig(analytics_server_url="http://test/") + flagsmith = Flagsmith(environment_key=api_key, event_processor_config=config) + + mock_track = mocker.patch.object( + flagsmith._event_processor, "track_exposure_event" + ) + responses.add(method="POST", url=flagsmith.identities_url, body=identities_json) + + result = flagsmith.get_experiment_flag( + feature_name="some_feature", + identifier="user1", + traits={"plan": "premium"}, + ) + + assert isinstance(result, Flag) + assert result.is_default is False + assert result.feature_name == "some_feature" + assert result.value == "some-value" + mock_track.assert_called_once_with( + feature_name="some_feature", + identifier="user1", + value="some-value", + traits={"plan": "premium"}, + metadata=None, + timestamp=None, + ) + + +@responses.activate() +def test_get_experiment_flag_skips_exposure_for_default_flag( + mocker: MockerFixture, api_key: str +) -> None: + config = EventProcessorConfig(analytics_server_url="http://test/") + + def default_flag_handler(feature_name: str) -> DefaultFlag: + return DefaultFlag(enabled=True, value="default-variant") + + flagsmith = Flagsmith( + environment_key=api_key, + event_processor_config=config, + default_flag_handler=default_flag_handler, + ) + mock_track = mocker.patch.object( + flagsmith._event_processor, "track_exposure_event" + ) + responses.add( + method="POST", + url=flagsmith.identities_url, + body=json.dumps({"flags": [], "traits": []}), + ) + + result = flagsmith.get_experiment_flag( + feature_name="missing_feature", identifier="user1" + ) + + assert isinstance(result, DefaultFlag) + assert result.is_default is True + assert result.value == "default-variant" + mock_track.assert_not_called() From d63a6ab4081e424bab4f59666372110ec27a5aca Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 07:28:08 +0000 Subject: [PATCH 04/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_flagsmith.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/tests/test_flagsmith.py b/tests/test_flagsmith.py index 18ae622..6247a17 100644 --- a/tests/test_flagsmith.py +++ b/tests/test_flagsmith.py @@ -995,9 +995,7 @@ def test_track_exposure_event_delegates_to_event_processor( config = EventProcessorConfig(analytics_server_url="http://test/") flagsmith = Flagsmith(environment_key=api_key, event_processor_config=config) - mock_track = mocker.patch.object( - flagsmith._event_processor, "track_exposure_event" - ) + mock_track = mocker.patch.object(flagsmith._event_processor, "track_exposure_event") flagsmith.track_exposure_event( "checkout_v2", @@ -1024,9 +1022,7 @@ def test_get_experiment_flag_returns_flag_and_tracks_exposure( config = EventProcessorConfig(analytics_server_url="http://test/") flagsmith = Flagsmith(environment_key=api_key, event_processor_config=config) - mock_track = mocker.patch.object( - flagsmith._event_processor, "track_exposure_event" - ) + mock_track = mocker.patch.object(flagsmith._event_processor, "track_exposure_event") responses.add(method="POST", url=flagsmith.identities_url, body=identities_json) result = flagsmith.get_experiment_flag( @@ -1063,9 +1059,7 @@ def default_flag_handler(feature_name: str) -> DefaultFlag: event_processor_config=config, default_flag_handler=default_flag_handler, ) - mock_track = mocker.patch.object( - flagsmith._event_processor, "track_exposure_event" - ) + mock_track = mocker.patch.object(flagsmith._event_processor, "track_exposure_event") responses.add( method="POST", url=flagsmith.identities_url, From 322dab1a42c939c93ca22d99205290298769cbab Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Fri, 29 May 2026 13:35:17 +0530 Subject: [PATCH 05/11] feat!: update event payload to flat schema and switch endpoint to /v1/events Align wire format with the analytics-pipeline RFC (Flagsmith/flagsmith-analytics-pipeline#9): - Fields: event, feature_name, identifier, value, traits, metadata, timestamp. - Fold event_type discriminator into the event field; use reserved literal "$flag_exposure" for exposure events (FLAG_EXPOSURE_EVENT constant). - Drop event_id, event_type, identity_identifier, enabled, evaluated_at, and the duplicated top-level environment_key from the POST body. - Switch endpoint path from /v1/analytics/batch to /v1/events. --- flagsmith/analytics.py | 33 +++++++++++++++---------------- tests/test_event_processor.py | 37 +++++++++++++++++++---------------- 2 files changed, 36 insertions(+), 34 deletions(-) diff --git a/flagsmith/analytics.py b/flagsmith/analytics.py index eca88e8..5371314 100644 --- a/flagsmith/analytics.py +++ b/flagsmith/analytics.py @@ -17,6 +17,8 @@ # Used to control how often we send data(in seconds) ANALYTICS_TIMER: typing.Final[int] = 10 +FLAG_EXPOSURE_EVENT: typing.Final[str] = "$flag_exposure" + session = FuturesSession(max_workers=4) @@ -92,7 +94,7 @@ def __init__( url = config.analytics_server_url if not url.endswith("/"): url = f"{url}/" - self._batch_endpoint = f"{url}v1/analytics/batch" + self._batch_endpoint = f"{url}v1/events" self._environment_key = environment_key self._max_buffer = config.max_buffer_items self._flush_interval_seconds = config.flush_interval_seconds @@ -111,8 +113,8 @@ def track_event( timestamp: typing.Optional[datetime] = None, ) -> None: self._buffer_event( - event_id=event, - event_type="custom_event", + event=event, + feature_name=None, identifier=identifier, value=value, traits=traits, @@ -130,8 +132,8 @@ def track_exposure_event( timestamp: typing.Optional[datetime] = None, ) -> None: self._buffer_event( - event_id=feature_name, - event_type="flag_exposure", + event=FLAG_EXPOSURE_EVENT, + feature_name=feature_name, identifier=identifier, value=value, traits=traits, @@ -141,8 +143,8 @@ def track_exposure_event( def _buffer_event( self, - event_id: str, - event_type: str, + event: str, + feature_name: typing.Optional[str], identifier: typing.Optional[str], value: typing.Optional[typing.Union[str, int, float]], traits: typing.Optional[typing.Dict[str, typing.Any]], @@ -153,16 +155,15 @@ def _buffer_event( with self._lock: self._buffer.append( { - "event_id": event_id, - "event_type": event_type, - "evaluated_at": int( - (timestamp or datetime.now()).timestamp() * 1000 - ), - "identity_identifier": identifier, - "enabled": None, + "event": event, + "feature_name": feature_name, + "identifier": identifier, "value": value, "traits": dict(traits) if traits else None, "metadata": {**(metadata or {}), "sdk_version": __version__}, + "timestamp": int( + (timestamp or datetime.now()).timestamp() * 1000 + ), } ) if len(self._buffer) >= self._max_buffer: @@ -178,9 +179,7 @@ def flush(self) -> None: events = self._buffer self._buffer = [] - payload = json.dumps( - {"events": events, "environment_key": self._environment_key} - ) + payload = json.dumps({"events": events}) try: future = session.post( self._batch_endpoint, diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index 277312e..4bdedee 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -3,7 +3,11 @@ from datetime import datetime, timezone from unittest import mock -from flagsmith.analytics import EventProcessor, EventProcessorConfig +from flagsmith.analytics import ( + FLAG_EXPOSURE_EVENT, + EventProcessor, + EventProcessorConfig, +) def test_track_event_buffers_event(event_processor: EventProcessor) -> None: @@ -18,15 +22,14 @@ def test_track_event_buffers_event(event_processor: EventProcessor) -> None: assert len(event_processor._buffer) == 2 event = event_processor._buffer[0] - assert event["event_id"] == "purchase" - assert event["event_type"] == "custom_event" - assert event["identity_identifier"] == "user1" - assert event["enabled"] is None + assert event["event"] == "purchase" + assert event["feature_name"] is None + assert event["identifier"] == "user1" assert event["value"] == 99.5 assert event["traits"] == {"plan": "premium"} assert event["metadata"]["sku"] == "abc" assert "sdk_version" in event["metadata"] - assert isinstance(event["evaluated_at"], int) + assert isinstance(event["timestamp"], int) def test_track_event_defaults_timestamp_to_now( @@ -36,7 +39,7 @@ def test_track_event_defaults_timestamp_to_now( event_processor.track_event(event="ping") after = int(datetime.now().timestamp() * 1000) - assert before <= event_processor._buffer[0]["evaluated_at"] <= after + assert before <= event_processor._buffer[0]["timestamp"] <= after def test_track_event_uses_explicit_timestamp( @@ -46,10 +49,10 @@ def test_track_event_uses_explicit_timestamp( event_processor.track_event(event="ping", timestamp=ts) - assert event_processor._buffer[0]["evaluated_at"] == int(ts.timestamp() * 1000) + assert event_processor._buffer[0]["timestamp"] == int(ts.timestamp() * 1000) -def test_track_exposure_event_buffers_with_flag_exposure_type( +def test_track_exposure_event_buffers_with_flag_exposure_event_name( event_processor: EventProcessor, ) -> None: event_processor.track_exposure_event( @@ -62,9 +65,9 @@ def test_track_exposure_event_buffers_with_flag_exposure_type( assert len(event_processor._buffer) == 1 event = event_processor._buffer[0] - assert event["event_id"] == "checkout_v2" - assert event["event_type"] == "flag_exposure" - assert event["identity_identifier"] == "user1" + assert event["event"] == FLAG_EXPOSURE_EVENT == "$flag_exposure" + assert event["feature_name"] == "checkout_v2" + assert event["identifier"] == "user1" assert event["value"] == "variant_b" assert event["traits"] == {"plan": "premium"} assert event["metadata"]["source"] == "homepage" @@ -78,7 +81,7 @@ def test_track_exposure_event_uses_explicit_timestamp( event_processor.track_exposure_event(feature_name="checkout_v2", timestamp=ts) - assert event_processor._buffer[0]["evaluated_at"] == int(ts.timestamp() * 1000) + assert event_processor._buffer[0]["timestamp"] == int(ts.timestamp() * 1000) def test_auto_flush_on_buffer_full() -> None: @@ -101,7 +104,7 @@ def test_flush_sends_correct_http_request(event_processor: EventProcessor) -> No mock_session.post.assert_called_once() call_kwargs = mock_session.post.call_args - assert call_kwargs[0][0] == "http://test_analytics/v1/analytics/batch" + assert call_kwargs[0][0] == "http://test_analytics/v1/events" headers = call_kwargs[1]["headers"] assert headers["X-Environment-Key"] == "test_key" @@ -109,9 +112,9 @@ def test_flush_sends_correct_http_request(event_processor: EventProcessor) -> No assert "flagsmith-python-client/" in headers["Flagsmith-SDK-User-Agent"] body = json.loads(call_kwargs[1]["data"]) - assert body["environment_key"] == "test_key" + assert "environment_key" not in body assert len(body["events"]) == 1 - assert body["events"][0]["event_id"] == "purchase" + assert body["events"][0]["event"] == "purchase" def test_flush_noop_when_empty(event_processor: EventProcessor) -> None: @@ -131,7 +134,7 @@ def test_failed_flush_requeues_events(event_processor: EventProcessor) -> None: event_processor.flush() assert len(event_processor._buffer) == 1 - assert event_processor._buffer[0]["event_id"] == "purchase" + assert event_processor._buffer[0]["event"] == "purchase" def test_start_stop_lifecycle() -> None: From 629ae1253cda3143912c2a6a7d4e6243f1cdf280 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 08:05:30 +0000 Subject: [PATCH 06/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- flagsmith/analytics.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/flagsmith/analytics.py b/flagsmith/analytics.py index 5371314..7c9bc27 100644 --- a/flagsmith/analytics.py +++ b/flagsmith/analytics.py @@ -161,9 +161,7 @@ def _buffer_event( "value": value, "traits": dict(traits) if traits else None, "metadata": {**(metadata or {}), "sdk_version": __version__}, - "timestamp": int( - (timestamp or datetime.now()).timestamp() * 1000 - ), + "timestamp": int((timestamp or datetime.now()).timestamp() * 1000), } ) if len(self._buffer) >= self._max_buffer: From 5dd2265ace97cdb4b99feed79534a0af222eede8 Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Fri, 29 May 2026 13:46:58 +0530 Subject: [PATCH 07/11] feat!: rename EventProcessorConfig.analytics_server_url to events_api_url and default it Default points to https://events.api.flagsmith.com/ so callers don't have to specify it for cloud Flagsmith. Self-hosted users override via the events_api_url field. BREAKING CHANGE: EventProcessorConfig field analytics_server_url renamed to events_api_url. --- flagsmith/analytics.py | 6 ++++-- tests/conftest.py | 2 +- tests/test_event_processor.py | 4 ++-- tests/test_flagsmith.py | 8 ++++---- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/flagsmith/analytics.py b/flagsmith/analytics.py index 7c9bc27..6ec4a13 100644 --- a/flagsmith/analytics.py +++ b/flagsmith/analytics.py @@ -19,6 +19,8 @@ FLAG_EXPOSURE_EVENT: typing.Final[str] = "$flag_exposure" +DEFAULT_EVENT_API_URL: typing.Final[str] = "https://events.api.flagsmith.com/" + session = FuturesSession(max_workers=4) @@ -74,7 +76,7 @@ def track_feature(self, feature_name: str) -> None: @dataclass class EventProcessorConfig: - analytics_server_url: str + events_api_url: str = DEFAULT_EVENT_API_URL max_buffer_items: int = 1000 flush_interval_seconds: float = 10.0 @@ -91,7 +93,7 @@ def __init__( config: EventProcessorConfig, environment_key: str, ) -> None: - url = config.analytics_server_url + url = config.events_api_url if not url.endswith("/"): url = f"{url}/" self._batch_endpoint = f"{url}v1/events" diff --git a/tests/conftest.py b/tests/conftest.py index f2f6173..a71c3de 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -32,7 +32,7 @@ def analytics_processor() -> AnalyticsProcessor: @pytest.fixture() def event_processor_config() -> EventProcessorConfig: - return EventProcessorConfig(analytics_server_url="http://test_analytics/") + return EventProcessorConfig(events_api_url="http://test_analytics/") @pytest.fixture() diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index 4bdedee..54dd478 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -86,7 +86,7 @@ def test_track_exposure_event_uses_explicit_timestamp( def test_auto_flush_on_buffer_full() -> None: config = EventProcessorConfig( - analytics_server_url="http://test/", max_buffer_items=5 + events_api_url="http://test/", max_buffer_items=5 ) processor = EventProcessor(config=config, environment_key="key") @@ -139,7 +139,7 @@ def test_failed_flush_requeues_events(event_processor: EventProcessor) -> None: def test_start_stop_lifecycle() -> None: config = EventProcessorConfig( - analytics_server_url="http://test/", flush_interval_seconds=100 + events_api_url="http://test/", flush_interval_seconds=100 ) processor = EventProcessor(config=config, environment_key="key") diff --git a/tests/test_flagsmith.py b/tests/test_flagsmith.py index 6247a17..4eade38 100644 --- a/tests/test_flagsmith.py +++ b/tests/test_flagsmith.py @@ -960,7 +960,7 @@ def test_track_event_raises_without_config(api_key: str) -> None: def test_track_event_delegates_to_event_processor( mocker: MockerFixture, api_key: str ) -> None: - config = EventProcessorConfig(analytics_server_url="http://test/") + config = EventProcessorConfig(events_api_url="http://test/") flagsmith = Flagsmith(environment_key=api_key, event_processor_config=config) mock_track = mocker.patch.object(flagsmith._event_processor, "track_event") @@ -992,7 +992,7 @@ def test_track_exposure_event_raises_without_config(api_key: str) -> None: def test_track_exposure_event_delegates_to_event_processor( mocker: MockerFixture, api_key: str ) -> None: - config = EventProcessorConfig(analytics_server_url="http://test/") + config = EventProcessorConfig(events_api_url="http://test/") flagsmith = Flagsmith(environment_key=api_key, event_processor_config=config) mock_track = mocker.patch.object(flagsmith._event_processor, "track_exposure_event") @@ -1019,7 +1019,7 @@ def test_track_exposure_event_delegates_to_event_processor( def test_get_experiment_flag_returns_flag_and_tracks_exposure( mocker: MockerFixture, api_key: str, identities_json: str ) -> None: - config = EventProcessorConfig(analytics_server_url="http://test/") + config = EventProcessorConfig(events_api_url="http://test/") flagsmith = Flagsmith(environment_key=api_key, event_processor_config=config) mock_track = mocker.patch.object(flagsmith._event_processor, "track_exposure_event") @@ -1049,7 +1049,7 @@ def test_get_experiment_flag_returns_flag_and_tracks_exposure( def test_get_experiment_flag_skips_exposure_for_default_flag( mocker: MockerFixture, api_key: str ) -> None: - config = EventProcessorConfig(analytics_server_url="http://test/") + config = EventProcessorConfig(events_api_url="http://test/") def default_flag_handler(feature_name: str) -> DefaultFlag: return DefaultFlag(enabled=True, value="default-variant") From 7f4b8e8038febdbfed3f4b4565d580ea54736717 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 08:17:11 +0000 Subject: [PATCH 08/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_event_processor.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index 54dd478..620577c 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -85,9 +85,7 @@ def test_track_exposure_event_uses_explicit_timestamp( def test_auto_flush_on_buffer_full() -> None: - config = EventProcessorConfig( - events_api_url="http://test/", max_buffer_items=5 - ) + config = EventProcessorConfig(events_api_url="http://test/", max_buffer_items=5) processor = EventProcessor(config=config, environment_key="key") with mock.patch("flagsmith.analytics.session"): From b064ae7f827a04d70db6f3810338036afffad74f Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Fri, 29 May 2026 14:03:34 +0530 Subject: [PATCH 09/11] feat!: replace event_processor_config kwarg with enable_events flag MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce `enable_events: bool = False` on Flagsmith() as the on/off gate for the event processor. `event_processor_config` stays as an optional tuning knob (URL override for self-hosted, buffer/flush settings) and raises ValueError if supplied without `enable_events=True`. Split the formerly-overloaded `_initialise_analytics` into two narrow methods: `_initialise_analytics` (legacy AnalyticsProcessor only) and `_initialise_events` (EventProcessor only). Update track_event / track_exposure_event error messages to point at `enable_events=True`. BREAKING CHANGE: passing `event_processor_config` no longer enables the event processor on its own — also pass `enable_events=True`. --- flagsmith/flagsmith.py | 32 +++++++++++++++++++++++++++----- tests/test_flagsmith.py | 35 ++++++++++++++++++++++++++++++++--- 2 files changed, 59 insertions(+), 8 deletions(-) diff --git a/flagsmith/flagsmith.py b/flagsmith/flagsmith.py index 6bdb455..da4fb53 100644 --- a/flagsmith/flagsmith.py +++ b/flagsmith/flagsmith.py @@ -75,6 +75,7 @@ def __init__( environment_refresh_interval_seconds: typing.Union[int, float] = 60, retries: typing.Optional[Retry] = None, enable_analytics: bool = False, + enable_events: bool = False, event_processor_config: typing.Optional[EventProcessorConfig] = None, default_flag_handler: typing.Optional[ typing.Callable[[str], DefaultFlag] @@ -101,6 +102,12 @@ def __init__( Flagsmith API :param enable_analytics: if enabled, sends additional requests to the Flagsmith API to power flag analytics charts + :param enable_events: if enabled, starts an event processor that buffers + and sends events (custom and flag-exposure) to the Flagsmith events + API, powering experimentation analytics. + :param event_processor_config: optional configuration for the event + processor (URL override for self-hosted, buffer/flush tuning). Only + valid when ``enable_events=True``. :param default_flag_handler: callable which will be used in the case where flags cannot be retrieved from the API or a non-existent feature is requested @@ -139,6 +146,11 @@ def __init__( "Can only use realtime updates when running in local evaluation mode." ) + if event_processor_config is not None and not enable_events: + raise ValueError( + "event_processor_config can only be set when enable_events=True." + ) + if self.offline_handler: self._evaluation_context = map_environment_document_to_context( self.offline_handler.get_environment() @@ -188,6 +200,10 @@ def __init__( self._initialise_analytics( environment_key=environment_key, enable_analytics=enable_analytics, + ) + self._initialise_events( + environment_key=environment_key, + enable_events=enable_events, event_processor_config=event_processor_config, ) @@ -195,15 +211,21 @@ def _initialise_analytics( self, environment_key: str, enable_analytics: bool, - event_processor_config: typing.Optional[EventProcessorConfig], ) -> None: if enable_analytics: self._analytics_processor = AnalyticsProcessor( environment_key, self.api_url, timeout=self.request_timeout_seconds ) - if event_processor_config: + + def _initialise_events( + self, + environment_key: str, + enable_events: bool, + event_processor_config: typing.Optional[EventProcessorConfig], + ) -> None: + if enable_events: self._event_processor = EventProcessor( - config=event_processor_config, + config=event_processor_config or EventProcessorConfig(), environment_key=environment_key, ) self._event_processor.start() @@ -358,7 +380,7 @@ def track_event( if not self._event_processor: raise ValueError( "Event processor is not configured. " - "Provide event_processor_config to use track_event." + "Set enable_events=True to use track_event." ) self._event_processor.track_event( event=event, @@ -381,7 +403,7 @@ def track_exposure_event( if not self._event_processor: raise ValueError( "Event processor is not configured. " - "Provide event_processor_config to use track_exposure_event." + "Set enable_events=True to use track_exposure_event." ) self._event_processor.track_exposure_event( feature_name=feature_name, diff --git a/tests/test_flagsmith.py b/tests/test_flagsmith.py index 4eade38..9126b38 100644 --- a/tests/test_flagsmith.py +++ b/tests/test_flagsmith.py @@ -957,11 +957,35 @@ def test_track_event_raises_without_config(api_key: str) -> None: flagsmith.track_event("purchase") +def test_event_processor_config_without_enable_events_raises(api_key: str) -> None: + config = EventProcessorConfig(events_api_url="http://test/") + with pytest.raises( + ValueError, + match="event_processor_config can only be set when enable_events=True", + ): + Flagsmith(environment_key=api_key, event_processor_config=config) + + +def test_enable_events_without_config_uses_default(api_key: str) -> None: + flagsmith = Flagsmith(environment_key=api_key, enable_events=True) + try: + assert flagsmith._event_processor is not None + assert ( + flagsmith._event_processor._batch_endpoint + == "https://events.api.flagsmith.com/v1/events" + ) + finally: + if flagsmith._event_processor: + flagsmith._event_processor.stop() + + def test_track_event_delegates_to_event_processor( mocker: MockerFixture, api_key: str ) -> None: config = EventProcessorConfig(events_api_url="http://test/") - flagsmith = Flagsmith(environment_key=api_key, event_processor_config=config) + flagsmith = Flagsmith( + environment_key=api_key, enable_events=True, event_processor_config=config + ) mock_track = mocker.patch.object(flagsmith._event_processor, "track_event") @@ -993,7 +1017,9 @@ def test_track_exposure_event_delegates_to_event_processor( mocker: MockerFixture, api_key: str ) -> None: config = EventProcessorConfig(events_api_url="http://test/") - flagsmith = Flagsmith(environment_key=api_key, event_processor_config=config) + flagsmith = Flagsmith( + environment_key=api_key, enable_events=True, event_processor_config=config + ) mock_track = mocker.patch.object(flagsmith._event_processor, "track_exposure_event") @@ -1020,7 +1046,9 @@ def test_get_experiment_flag_returns_flag_and_tracks_exposure( mocker: MockerFixture, api_key: str, identities_json: str ) -> None: config = EventProcessorConfig(events_api_url="http://test/") - flagsmith = Flagsmith(environment_key=api_key, event_processor_config=config) + flagsmith = Flagsmith( + environment_key=api_key, enable_events=True, event_processor_config=config + ) mock_track = mocker.patch.object(flagsmith._event_processor, "track_exposure_event") responses.add(method="POST", url=flagsmith.identities_url, body=identities_json) @@ -1056,6 +1084,7 @@ def default_flag_handler(feature_name: str) -> DefaultFlag: flagsmith = Flagsmith( environment_key=api_key, + enable_events=True, event_processor_config=config, default_flag_handler=default_flag_handler, ) From ef42a70a25ef4631ea96d1623dd61e4b510302c5 Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Fri, 29 May 2026 17:11:15 +0530 Subject: [PATCH 10/11] fix: guard get_experiment_flag against events being disabled before resolving flag Move the event-processor presence check to the top of get_experiment_flag so the method fails immediately when events are disabled, instead of issuing an identity-flag request only to then raise from track_exposure_event. Matches the early-guard pattern in track_event and track_exposure_event. Also unify the three error messages to a single terse declarative sentence each, matching the existing codebase style. Addresses review feedback on #220. --- flagsmith/flagsmith.py | 12 ++++-------- tests/test_flagsmith.py | 12 ++++++++++-- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/flagsmith/flagsmith.py b/flagsmith/flagsmith.py index da4fb53..45606a3 100644 --- a/flagsmith/flagsmith.py +++ b/flagsmith/flagsmith.py @@ -358,6 +358,8 @@ def get_experiment_flag( (i.e. the feature was not present and was served via the `default_flag_handler`), to keep experimentation data clean. """ + if not self._event_processor: + raise ValueError("Events must be enabled to use experiment flags.") flag = self.get_identity_flags(identifier, traits).get_flag(feature_name) if not flag.is_default: self.track_exposure_event( @@ -378,10 +380,7 @@ def track_event( timestamp: typing.Optional[datetime] = None, ) -> None: if not self._event_processor: - raise ValueError( - "Event processor is not configured. " - "Set enable_events=True to use track_event." - ) + raise ValueError("Events must be enabled to track events.") self._event_processor.track_event( event=event, identifier=identifier, @@ -401,10 +400,7 @@ def track_exposure_event( timestamp: typing.Optional[datetime] = None, ) -> None: if not self._event_processor: - raise ValueError( - "Event processor is not configured. " - "Set enable_events=True to use track_exposure_event." - ) + raise ValueError("Events must be enabled to track exposure events.") self._event_processor.track_exposure_event( feature_name=feature_name, identifier=identifier, diff --git a/tests/test_flagsmith.py b/tests/test_flagsmith.py index 9126b38..9a10257 100644 --- a/tests/test_flagsmith.py +++ b/tests/test_flagsmith.py @@ -953,7 +953,7 @@ def test_flagsmith__init__expected_headers_sent( def test_track_event_raises_without_config(api_key: str) -> None: flagsmith = Flagsmith(environment_key=api_key) - with pytest.raises(ValueError, match="Event processor is not configured"): + with pytest.raises(ValueError, match="Events must be enabled"): flagsmith.track_event("purchase") @@ -1009,7 +1009,7 @@ def test_track_event_delegates_to_event_processor( def test_track_exposure_event_raises_without_config(api_key: str) -> None: flagsmith = Flagsmith(environment_key=api_key) - with pytest.raises(ValueError, match="Event processor is not configured"): + with pytest.raises(ValueError, match="Events must be enabled"): flagsmith.track_exposure_event("checkout_v2") @@ -1041,6 +1041,14 @@ def test_track_exposure_event_delegates_to_event_processor( ) +def test_get_experiment_flag_raises_without_events_enabled(api_key: str) -> None: + flagsmith = Flagsmith(environment_key=api_key) + with pytest.raises(ValueError, match="Events must be enabled"): + flagsmith.get_experiment_flag( + feature_name="some_feature", identifier="user1" + ) + + @responses.activate() def test_get_experiment_flag_returns_flag_and_tracks_exposure( mocker: MockerFixture, api_key: str, identities_json: str From eff19a4869f8230a4dd6af46fa845cf3ef2ba31e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 29 May 2026 11:41:28 +0000 Subject: [PATCH 11/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_flagsmith.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/test_flagsmith.py b/tests/test_flagsmith.py index 9a10257..ad5d9b2 100644 --- a/tests/test_flagsmith.py +++ b/tests/test_flagsmith.py @@ -1044,9 +1044,7 @@ def test_track_exposure_event_delegates_to_event_processor( def test_get_experiment_flag_raises_without_events_enabled(api_key: str) -> None: flagsmith = Flagsmith(environment_key=api_key) with pytest.raises(ValueError, match="Events must be enabled"): - flagsmith.get_experiment_flag( - feature_name="some_feature", identifier="user1" - ) + flagsmith.get_experiment_flag(feature_name="some_feature", identifier="user1") @responses.activate()