diff --git a/flagsmith/__init__.py b/flagsmith/__init__.py index d36feb8..92e8b9a 100644 --- a/flagsmith/__init__.py +++ b/flagsmith/__init__.py @@ -1,6 +1,6 @@ from flagsmith import webhooks -from flagsmith.analytics import PipelineAnalyticsConfig +from flagsmith.analytics import EventProcessorConfig from flagsmith.flagsmith import Flagsmith from flagsmith.version import __version__ -__all__ = ("Flagsmith", "PipelineAnalyticsConfig", "webhooks", "__version__") +__all__ = ("Flagsmith", "EventProcessorConfig", "webhooks", "__version__") diff --git a/flagsmith/analytics.py b/flagsmith/analytics.py index f0d0bab..6ec4a13 100644 --- a/flagsmith/analytics.py +++ b/flagsmith/analytics.py @@ -2,7 +2,6 @@ import json import logging import threading -import time import typing from dataclasses import dataclass from datetime import datetime @@ -18,6 +17,10 @@ # Used to control how often we send data(in seconds) ANALYTICS_TIMER: typing.Final[int] = 10 +FLAG_EXPOSURE_EVENT: typing.Final[str] = "$flag_exposure" + +DEFAULT_EVENT_API_URL: typing.Final[str] = "https://events.api.flagsmith.com/" + session = FuturesSession(max_workers=4) @@ -72,92 +75,95 @@ def track_feature(self, feature_name: str) -> None: @dataclass -class PipelineAnalyticsConfig: - analytics_server_url: str +class EventProcessorConfig: + events_api_url: str = DEFAULT_EVENT_API_URL max_buffer_items: int = 1000 flush_interval_seconds: float = 10.0 -class PipelineAnalyticsProcessor: +class EventProcessor: """ - Buffered analytics processor that sends per-evaluation and custom events - to the Flagsmith pipeline analytics endpoint in batches. - - Evaluation events are deduplicated within each flush window. Events are - flushed periodically via a background timer or when the buffer is full. + Buffered event processor that batches custom events and POSTs them to the + Flagsmith event endpoint. Flushes on a background timer or when the buffer + fills. """ def __init__( self, - config: PipelineAnalyticsConfig, + config: EventProcessorConfig, environment_key: str, ) -> None: - url = config.analytics_server_url + url = config.events_api_url if not url.endswith("/"): url = f"{url}/" - self._batch_endpoint = f"{url}v1/analytics/batch" + self._batch_endpoint = f"{url}v1/events" self._environment_key = environment_key self._max_buffer = config.max_buffer_items self._flush_interval_seconds = config.flush_interval_seconds self._buffer: typing.List[typing.Dict[str, typing.Any]] = [] - self._dedup_keys: typing.Dict[str, str] = {} self._lock = threading.Lock() self._timer: typing.Optional[threading.Timer] = None - def record_evaluation_event( + def track_event( self, - flag_key: str, - enabled: bool, - value: typing.Any, - identity_identifier: typing.Optional[str] = None, + event: str, + identifier: typing.Optional[str] = None, + value: typing.Optional[typing.Union[str, int, float]] = None, traits: typing.Optional[typing.Dict[str, typing.Any]] = None, + metadata: typing.Optional[typing.Dict[str, typing.Any]] = None, + timestamp: typing.Optional[datetime] = None, ) -> None: - fingerprint = f"{identity_identifier or 'none'}|{enabled}|{value}" - should_flush = False - - with self._lock: - if self._dedup_keys.get(flag_key) == fingerprint: - return - self._dedup_keys[flag_key] = fingerprint - self._buffer.append( - { - "event_id": flag_key, - "event_type": "flag_evaluation", - "evaluated_at": int(time.time() * 1000), - "identity_identifier": identity_identifier, - "enabled": enabled, - "value": value, - "traits": dict(traits) if traits else None, - "metadata": {"sdk_version": __version__}, - } - ) - if len(self._buffer) >= self._max_buffer: - should_flush = True - - if should_flush: - self.flush() + self._buffer_event( + event=event, + feature_name=None, + identifier=identifier, + value=value, + traits=traits, + metadata=metadata, + timestamp=timestamp, + ) - def record_custom_event( + def track_exposure_event( self, - event_name: str, - identity_identifier: typing.Optional[str] = None, + feature_name: str, + identifier: typing.Optional[str] = None, + value: typing.Optional[str] = None, traits: typing.Optional[typing.Dict[str, typing.Any]] = None, metadata: typing.Optional[typing.Dict[str, typing.Any]] = None, + timestamp: typing.Optional[datetime] = None, ) -> None: - should_flush = False + self._buffer_event( + event=FLAG_EXPOSURE_EVENT, + feature_name=feature_name, + identifier=identifier, + value=value, + traits=traits, + metadata=metadata, + timestamp=timestamp, + ) + def _buffer_event( + self, + event: str, + feature_name: typing.Optional[str], + identifier: typing.Optional[str], + value: typing.Optional[typing.Union[str, int, float]], + traits: typing.Optional[typing.Dict[str, typing.Any]], + metadata: typing.Optional[typing.Dict[str, typing.Any]], + timestamp: typing.Optional[datetime], + ) -> None: + should_flush = False with self._lock: self._buffer.append( { - "event_id": event_name, - "event_type": "custom_event", - "evaluated_at": int(time.time() * 1000), - "identity_identifier": identity_identifier, - "enabled": None, - "value": None, + "event": event, + "feature_name": feature_name, + "identifier": identifier, + "value": value, "traits": dict(traits) if traits else None, "metadata": {**(metadata or {}), "sdk_version": __version__}, + "timestamp": int((timestamp or datetime.now()).timestamp() * 1000), } ) if len(self._buffer) >= self._max_buffer: @@ -172,11 +178,8 @@ def flush(self) -> None: return events = self._buffer self._buffer = [] - self._dedup_keys.clear() - payload = json.dumps( - {"events": events, "environment_key": self._environment_key} - ) + payload = json.dumps({"events": events}) try: future = session.post( self._batch_endpoint, diff --git a/flagsmith/flagsmith.py b/flagsmith/flagsmith.py index 8242948..45606a3 100644 --- a/flagsmith/flagsmith.py +++ b/flagsmith/flagsmith.py @@ -10,8 +10,8 @@ from flagsmith.analytics import ( AnalyticsProcessor, - PipelineAnalyticsConfig, - PipelineAnalyticsProcessor, + EventProcessor, + EventProcessorConfig, ) from flagsmith.exceptions import FlagsmithAPIError, FlagsmithClientError from flagsmith.mappers import ( @@ -23,6 +23,7 @@ ) from flagsmith.models import ( DefaultFlag, + Flag, Flags, Segment, SegmentOverridesIndex, @@ -74,7 +75,8 @@ def __init__( environment_refresh_interval_seconds: typing.Union[int, float] = 60, retries: typing.Optional[Retry] = None, enable_analytics: bool = False, - pipeline_analytics_config: typing.Optional[PipelineAnalyticsConfig] = None, + enable_events: bool = False, + event_processor_config: typing.Optional[EventProcessorConfig] = None, default_flag_handler: typing.Optional[ typing.Callable[[str], DefaultFlag] ] = None, @@ -100,6 +102,12 @@ def __init__( Flagsmith API :param enable_analytics: if enabled, sends additional requests to the Flagsmith API to power flag analytics charts + :param enable_events: if enabled, starts an event processor that buffers + and sends events (custom and flag-exposure) to the Flagsmith events + API, powering experimentation analytics. + :param event_processor_config: optional configuration for the event + processor (URL override for self-hosted, buffer/flush tuning). Only + valid when ``enable_events=True``. :param default_flag_handler: callable which will be used in the case where flags cannot be retrieved from the API or a non-existent feature is requested @@ -120,9 +128,7 @@ def __init__( self.default_flag_handler = default_flag_handler self.enable_realtime_updates = enable_realtime_updates self._analytics_processor: typing.Optional[AnalyticsProcessor] = None - self._pipeline_analytics_processor: typing.Optional[ - PipelineAnalyticsProcessor - ] = None + self._event_processor: typing.Optional[EventProcessor] = None self.__evaluation_context: typing.Optional[SDKEvaluationContext] = None self._segment_overrides_index: SegmentOverridesIndex = {} self._environment_updated_at: typing.Optional[datetime] = None @@ -140,6 +146,11 @@ def __init__( "Can only use realtime updates when running in local evaluation mode." ) + if event_processor_config is not None and not enable_events: + raise ValueError( + "event_processor_config can only be set when enable_events=True." + ) + if self.offline_handler: self._evaluation_context = map_environment_document_to_context( self.offline_handler.get_environment() @@ -189,25 +200,35 @@ def __init__( self._initialise_analytics( environment_key=environment_key, enable_analytics=enable_analytics, - pipeline_analytics_config=pipeline_analytics_config, + ) + self._initialise_events( + environment_key=environment_key, + enable_events=enable_events, + event_processor_config=event_processor_config, ) def _initialise_analytics( self, environment_key: str, enable_analytics: bool, - pipeline_analytics_config: typing.Optional[PipelineAnalyticsConfig], ) -> None: if enable_analytics: self._analytics_processor = AnalyticsProcessor( environment_key, self.api_url, timeout=self.request_timeout_seconds ) - if pipeline_analytics_config: - self._pipeline_analytics_processor = PipelineAnalyticsProcessor( - config=pipeline_analytics_config, + + def _initialise_events( + self, + environment_key: str, + enable_events: bool, + event_processor_config: typing.Optional[EventProcessorConfig], + ) -> None: + if enable_events: + self._event_processor = EventProcessor( + config=event_processor_config or EventProcessorConfig(), environment_key=environment_key, ) - self._pipeline_analytics_processor.start() + self._event_processor.start() def _initialise_local_evaluation(self) -> None: # To ensure that the environment is set before allowing subsequent @@ -324,23 +345,69 @@ def get_identity_segments( return map_segment_results_to_identity_segments(evaluation_result["segments"]) + def get_experiment_flag( + self, + feature_name: str, + identifier: str, + traits: typing.Optional[TraitMapping] = None, + ) -> typing.Union[DefaultFlag, Flag]: + """ + Resolve a flag for an identity and record an exposure event. + + Skips the exposure event when the resolved flag is a `DefaultFlag` + (i.e. the feature was not present and was served via the + `default_flag_handler`), to keep experimentation data clean. + """ + if not self._event_processor: + raise ValueError("Events must be enabled to use experiment flags.") + flag = self.get_identity_flags(identifier, traits).get_flag(feature_name) + if not flag.is_default: + self.track_exposure_event( + feature_name=feature_name, + identifier=identifier, + value=str(flag.value) if flag.value is not None else None, + traits=traits, + ) + return flag + def track_event( self, - event_name: str, - identity_identifier: typing.Optional[str] = None, + event: str, + identifier: typing.Optional[str] = None, + value: typing.Optional[typing.Union[str, int, float]] = None, traits: typing.Optional[TraitMapping] = None, metadata: typing.Optional[typing.Dict[str, typing.Any]] = None, + timestamp: typing.Optional[datetime] = None, ) -> None: - if not self._pipeline_analytics_processor: - raise ValueError( - "Pipeline analytics is not configured. " - "Provide pipeline_analytics_config to use track_event." - ) - self._pipeline_analytics_processor.record_custom_event( - event_name=event_name, - identity_identifier=identity_identifier, + if not self._event_processor: + raise ValueError("Events must be enabled to track events.") + self._event_processor.track_event( + event=event, + identifier=identifier, + value=value, traits=resolve_trait_values(traits), metadata=metadata, + timestamp=timestamp, + ) + + def track_exposure_event( + self, + feature_name: str, + identifier: typing.Optional[str] = None, + value: typing.Optional[str] = None, + traits: typing.Optional[TraitMapping] = None, + metadata: typing.Optional[typing.Dict[str, typing.Any]] = None, + timestamp: typing.Optional[datetime] = None, + ) -> None: + if not self._event_processor: + raise ValueError("Events must be enabled to track exposure events.") + self._event_processor.track_exposure_event( + feature_name=feature_name, + identifier=identifier, + value=value, + traits=resolve_trait_values(traits), + metadata=metadata, + timestamp=timestamp, ) def update_environment(self) -> None: @@ -418,7 +485,6 @@ def _get_environment_flags_from_document(self) -> Flags: evaluation_result=evaluation_result, analytics_processor=self._analytics_processor, default_flag_handler=self.default_flag_handler, - pipeline_analytics_processor=self._pipeline_analytics_processor, ) def _get_identity_flags_from_document( @@ -442,9 +508,6 @@ def _get_identity_flags_from_document( overrides_index=self._segment_overrides_index, analytics_processor=self._analytics_processor, default_flag_handler=self.default_flag_handler, - pipeline_analytics_processor=self._pipeline_analytics_processor, - identity_identifier=identifier, - traits=resolve_trait_values(traits), ) def _get_environment_flags_from_api(self) -> Flags: @@ -456,7 +519,6 @@ def _get_environment_flags_from_api(self) -> Flags: api_flags=json_response, analytics_processor=self._analytics_processor, default_flag_handler=self.default_flag_handler, - pipeline_analytics_processor=self._pipeline_analytics_processor, ) except FlagsmithAPIError: if self.offline_handler: @@ -489,9 +551,6 @@ def _get_identity_flags_from_api( api_flags=json_response["flags"], analytics_processor=self._analytics_processor, default_flag_handler=self.default_flag_handler, - pipeline_analytics_processor=self._pipeline_analytics_processor, - identity_identifier=identifier, - traits=resolve_trait_values(traits), ) except FlagsmithAPIError: if self.offline_handler: @@ -525,5 +584,5 @@ def __del__(self) -> None: if hasattr(self, "event_stream_thread"): self.event_stream_thread.stop() - if self._pipeline_analytics_processor: - self._pipeline_analytics_processor.stop() + if self._event_processor: + self._event_processor.stop() diff --git a/flagsmith/models.py b/flagsmith/models.py index 9e48724..cfb5ff4 100644 --- a/flagsmith/models.py +++ b/flagsmith/models.py @@ -6,7 +6,7 @@ from flag_engine import engine from flag_engine.context.types import SegmentContext -from flagsmith.analytics import AnalyticsProcessor, PipelineAnalyticsProcessor +from flagsmith.analytics import AnalyticsProcessor from flagsmith.exceptions import FlagsmithFeatureDoesNotExistError from flagsmith.types import ( FeatureMetadata, @@ -85,9 +85,6 @@ class Flags: flags: typing.Dict[str, Flag] = field(default_factory=dict) default_flag_handler: typing.Optional[typing.Callable[[str], DefaultFlag]] = None _analytics_processor: typing.Optional[AnalyticsProcessor] = None - _pipeline_analytics_processor: typing.Optional[PipelineAnalyticsProcessor] = None - _identity_identifier: typing.Optional[str] = None - _traits: typing.Optional[typing.Dict[str, typing.Any]] = None # Lazy-evaluation state. When `_context` is set, `flags` is a # per-feature memo rather than a fully-materialised snapshot; unseen # features are resolved on demand via the engine primitives and @@ -103,11 +100,6 @@ def from_evaluation_result( evaluation_result: SDKEvaluationResult, analytics_processor: typing.Optional[AnalyticsProcessor], default_flag_handler: typing.Optional[typing.Callable[[str], DefaultFlag]], - pipeline_analytics_processor: typing.Optional[ - PipelineAnalyticsProcessor - ] = None, - identity_identifier: typing.Optional[str] = None, - traits: typing.Optional[typing.Dict[str, typing.Any]] = None, ) -> Flags: return cls( flags={ @@ -117,9 +109,6 @@ def from_evaluation_result( }, default_flag_handler=default_flag_handler, _analytics_processor=analytics_processor, - _pipeline_analytics_processor=pipeline_analytics_processor, - _identity_identifier=identity_identifier, - _traits=traits, ) @classmethod @@ -129,11 +118,6 @@ def from_evaluation_context( overrides_index: SegmentOverridesIndex, analytics_processor: typing.Optional[AnalyticsProcessor], default_flag_handler: typing.Optional[typing.Callable[[str], DefaultFlag]], - pipeline_analytics_processor: typing.Optional[ - PipelineAnalyticsProcessor - ] = None, - identity_identifier: typing.Optional[str] = None, - traits: typing.Optional[typing.Dict[str, typing.Any]] = None, ) -> Flags: """Build a lazy `Flags` backed by an evaluation context. @@ -146,9 +130,6 @@ def from_evaluation_context( flags={}, default_flag_handler=default_flag_handler, _analytics_processor=analytics_processor, - _pipeline_analytics_processor=pipeline_analytics_processor, - _identity_identifier=identity_identifier, - _traits=traits, _context=context, _overrides_index=overrides_index, ) @@ -159,11 +140,6 @@ def from_api_flags( api_flags: typing.Sequence[typing.Mapping[str, typing.Any]], analytics_processor: typing.Optional[AnalyticsProcessor], default_flag_handler: typing.Optional[typing.Callable[[str], DefaultFlag]], - pipeline_analytics_processor: typing.Optional[ - PipelineAnalyticsProcessor - ] = None, - identity_identifier: typing.Optional[str] = None, - traits: typing.Optional[typing.Dict[str, typing.Any]] = None, ) -> Flags: flags = { flag_data["feature"]["name"]: Flag.from_api_flag(flag_data) @@ -174,9 +150,6 @@ def from_api_flags( flags=flags, default_flag_handler=default_flag_handler, _analytics_processor=analytics_processor, - _pipeline_analytics_processor=pipeline_analytics_processor, - _identity_identifier=identity_identifier, - _traits=traits, ) def all_flags(self) -> typing.List[Flag]: @@ -252,15 +225,6 @@ def get_flag(self, feature_name: str) -> typing.Union[DefaultFlag, Flag]: if self._analytics_processor and hasattr(flag, "feature_name"): self._analytics_processor.track_feature(flag.feature_name) - if self._pipeline_analytics_processor and hasattr(flag, "feature_name"): - self._pipeline_analytics_processor.record_evaluation_event( - flag_key=flag.feature_name, - enabled=flag.enabled, - value=flag.value, - identity_identifier=self._identity_identifier, - traits=self._traits, - ) - return flag def _resolve_flag(self, feature_name: str) -> Flag: diff --git a/tests/conftest.py b/tests/conftest.py index 9c58c96..a71c3de 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,8 +13,8 @@ from flagsmith import Flagsmith from flagsmith.analytics import ( AnalyticsProcessor, - PipelineAnalyticsConfig, - PipelineAnalyticsProcessor, + EventProcessor, + EventProcessorConfig, ) from flagsmith.api.types import EnvironmentModel from flagsmith.mappers import map_environment_document_to_context @@ -31,16 +31,16 @@ def analytics_processor() -> AnalyticsProcessor: @pytest.fixture() -def pipeline_analytics_config() -> PipelineAnalyticsConfig: - return PipelineAnalyticsConfig(analytics_server_url="http://test_analytics/") +def event_processor_config() -> EventProcessorConfig: + return EventProcessorConfig(events_api_url="http://test_analytics/") @pytest.fixture() -def pipeline_analytics_processor( - pipeline_analytics_config: PipelineAnalyticsConfig, -) -> PipelineAnalyticsProcessor: - return PipelineAnalyticsProcessor( - config=pipeline_analytics_config, +def event_processor( + event_processor_config: EventProcessorConfig, +) -> EventProcessor: + return EventProcessor( + config=event_processor_config, environment_key="test_key", ) diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py new file mode 100644 index 0000000..620577c --- /dev/null +++ b/tests/test_event_processor.py @@ -0,0 +1,152 @@ +import json +from concurrent.futures import Future +from datetime import datetime, timezone +from unittest import mock + +from flagsmith.analytics import ( + FLAG_EXPOSURE_EVENT, + EventProcessor, + EventProcessorConfig, +) + + +def test_track_event_buffers_event(event_processor: EventProcessor) -> None: + event_processor.track_event( + event="purchase", + identifier="user1", + value=99.5, + traits={"plan": "premium"}, + metadata={"sku": "abc"}, + ) + event_processor.track_event(event="purchase", identifier="user1") + + assert len(event_processor._buffer) == 2 + event = event_processor._buffer[0] + assert event["event"] == "purchase" + assert event["feature_name"] is None + assert event["identifier"] == "user1" + assert event["value"] == 99.5 + assert event["traits"] == {"plan": "premium"} + assert event["metadata"]["sku"] == "abc" + assert "sdk_version" in event["metadata"] + assert isinstance(event["timestamp"], int) + + +def test_track_event_defaults_timestamp_to_now( + event_processor: EventProcessor, +) -> None: + before = int(datetime.now().timestamp() * 1000) + event_processor.track_event(event="ping") + after = int(datetime.now().timestamp() * 1000) + + assert before <= event_processor._buffer[0]["timestamp"] <= after + + +def test_track_event_uses_explicit_timestamp( + event_processor: EventProcessor, +) -> None: + ts = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + + event_processor.track_event(event="ping", timestamp=ts) + + assert event_processor._buffer[0]["timestamp"] == int(ts.timestamp() * 1000) + + +def test_track_exposure_event_buffers_with_flag_exposure_event_name( + event_processor: EventProcessor, +) -> None: + event_processor.track_exposure_event( + feature_name="checkout_v2", + identifier="user1", + value="variant_b", + traits={"plan": "premium"}, + metadata={"source": "homepage"}, + ) + + assert len(event_processor._buffer) == 1 + event = event_processor._buffer[0] + assert event["event"] == FLAG_EXPOSURE_EVENT == "$flag_exposure" + assert event["feature_name"] == "checkout_v2" + assert event["identifier"] == "user1" + assert event["value"] == "variant_b" + assert event["traits"] == {"plan": "premium"} + assert event["metadata"]["source"] == "homepage" + assert "sdk_version" in event["metadata"] + + +def test_track_exposure_event_uses_explicit_timestamp( + event_processor: EventProcessor, +) -> None: + ts = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + + event_processor.track_exposure_event(feature_name="checkout_v2", timestamp=ts) + + assert event_processor._buffer[0]["timestamp"] == int(ts.timestamp() * 1000) + + +def test_auto_flush_on_buffer_full() -> None: + config = EventProcessorConfig(events_api_url="http://test/", max_buffer_items=5) + processor = EventProcessor(config=config, environment_key="key") + + with mock.patch("flagsmith.analytics.session"): + for i in range(5): + processor.track_event(event=f"event_{i}") + + assert len(processor._buffer) == 0 + + +def test_flush_sends_correct_http_request(event_processor: EventProcessor) -> None: + with mock.patch("flagsmith.analytics.session") as mock_session: + event_processor.track_event(event="purchase", identifier="user1") + event_processor.flush() + + mock_session.post.assert_called_once() + call_kwargs = mock_session.post.call_args + assert call_kwargs[0][0] == "http://test_analytics/v1/events" + + headers = call_kwargs[1]["headers"] + assert headers["X-Environment-Key"] == "test_key" + assert headers["Content-Type"] == "application/json; charset=utf-8" + assert "flagsmith-python-client/" in headers["Flagsmith-SDK-User-Agent"] + + body = json.loads(call_kwargs[1]["data"]) + assert "environment_key" not in body + assert len(body["events"]) == 1 + assert body["events"][0]["event"] == "purchase" + + +def test_flush_noop_when_empty(event_processor: EventProcessor) -> None: + with mock.patch("flagsmith.analytics.session") as mock_session: + event_processor.flush() + + mock_session.post.assert_not_called() + + +def test_failed_flush_requeues_events(event_processor: EventProcessor) -> None: + future: Future[None] = Future() + future.set_exception(Exception("connection error")) + + with mock.patch("flagsmith.analytics.session") as mock_session: + mock_session.post.return_value = future + event_processor.track_event(event="purchase") + event_processor.flush() + + assert len(event_processor._buffer) == 1 + assert event_processor._buffer[0]["event"] == "purchase" + + +def test_start_stop_lifecycle() -> None: + config = EventProcessorConfig( + events_api_url="http://test/", flush_interval_seconds=100 + ) + processor = EventProcessor(config=config, environment_key="key") + + processor.start() + assert processor._timer is not None + assert processor._timer.is_alive() + + with mock.patch("flagsmith.analytics.session"): + processor.track_event(event="purchase") + processor.stop() + + assert len(processor._buffer) == 0 diff --git a/tests/test_flagsmith.py b/tests/test_flagsmith.py index b5891ed..ad5d9b2 100644 --- a/tests/test_flagsmith.py +++ b/tests/test_flagsmith.py @@ -10,7 +10,7 @@ from responses import matchers from flagsmith import Flagsmith, __version__ -from flagsmith.analytics import PipelineAnalyticsConfig +from flagsmith.analytics import EventProcessorConfig from flagsmith.api.types import EnvironmentModel from flagsmith.exceptions import ( FlagsmithAPIError, @@ -953,88 +953,159 @@ def test_flagsmith__init__expected_headers_sent( def test_track_event_raises_without_config(api_key: str) -> None: flagsmith = Flagsmith(environment_key=api_key) - with pytest.raises(ValueError, match="Pipeline analytics is not configured"): + with pytest.raises(ValueError, match="Events must be enabled"): flagsmith.track_event("purchase") -@responses.activate() -def test_pipeline_analytics_records_events( - mocker: MockerFixture, api_key: str, flags_json: str -) -> None: - config = PipelineAnalyticsConfig(analytics_server_url="http://test/") - flagsmith = Flagsmith(environment_key=api_key, pipeline_analytics_config=config) +def test_event_processor_config_without_enable_events_raises(api_key: str) -> None: + config = EventProcessorConfig(events_api_url="http://test/") + with pytest.raises( + ValueError, + match="event_processor_config can only be set when enable_events=True", + ): + Flagsmith(environment_key=api_key, event_processor_config=config) - mock_eval = mocker.patch.object( - flagsmith._pipeline_analytics_processor, "record_evaluation_event" - ) - mock_custom = mocker.patch.object( - flagsmith._pipeline_analytics_processor, "record_custom_event" - ) - responses.add(method="GET", url=flagsmith.environment_flags_url, body=flags_json) - flags = flagsmith.get_environment_flags() - flags.get_flag("some_feature") +def test_enable_events_without_config_uses_default(api_key: str) -> None: + flagsmith = Flagsmith(environment_key=api_key, enable_events=True) + try: + assert flagsmith._event_processor is not None + assert ( + flagsmith._event_processor._batch_endpoint + == "https://events.api.flagsmith.com/v1/events" + ) + finally: + if flagsmith._event_processor: + flagsmith._event_processor.stop() - mock_eval.assert_called_once_with( - flag_key="some_feature", - enabled=True, - value="some-value", - identity_identifier=None, - traits=None, + +def test_track_event_delegates_to_event_processor( + mocker: MockerFixture, api_key: str +) -> None: + config = EventProcessorConfig(events_api_url="http://test/") + flagsmith = Flagsmith( + environment_key=api_key, enable_events=True, event_processor_config=config ) + mock_track = mocker.patch.object(flagsmith._event_processor, "track_event") + flagsmith.track_event( "purchase", - identity_identifier="user1", - traits={"plan": "premium"}, + identifier="user1", + value=99.5, + traits={"plan": {"value": "premium", "transient": True}}, metadata={"amount": 99}, ) - mock_custom.assert_called_once_with( - event_name="purchase", - identity_identifier="user1", + mock_track.assert_called_once_with( + event="purchase", + identifier="user1", + value=99.5, traits={"plan": "premium"}, metadata={"amount": 99}, + timestamp=None, + ) + + +def test_track_exposure_event_raises_without_config(api_key: str) -> None: + flagsmith = Flagsmith(environment_key=api_key) + with pytest.raises(ValueError, match="Events must be enabled"): + flagsmith.track_exposure_event("checkout_v2") + + +def test_track_exposure_event_delegates_to_event_processor( + mocker: MockerFixture, api_key: str +) -> None: + config = EventProcessorConfig(events_api_url="http://test/") + flagsmith = Flagsmith( + environment_key=api_key, enable_events=True, event_processor_config=config ) + mock_track = mocker.patch.object(flagsmith._event_processor, "track_exposure_event") + + flagsmith.track_exposure_event( + "checkout_v2", + identifier="user1", + value="variant_b", + traits={"plan": {"value": "premium", "transient": True}}, + metadata={"source": "homepage"}, + ) + + mock_track.assert_called_once_with( + feature_name="checkout_v2", + identifier="user1", + value="variant_b", + traits={"plan": "premium"}, + metadata={"source": "homepage"}, + timestamp=None, + ) + + +def test_get_experiment_flag_raises_without_events_enabled(api_key: str) -> None: + flagsmith = Flagsmith(environment_key=api_key) + with pytest.raises(ValueError, match="Events must be enabled"): + flagsmith.get_experiment_flag(feature_name="some_feature", identifier="user1") + @responses.activate() -def test_identity_flags_records_evaluation_with_resolved_traits( +def test_get_experiment_flag_returns_flag_and_tracks_exposure( mocker: MockerFixture, api_key: str, identities_json: str ) -> None: - config = PipelineAnalyticsConfig(analytics_server_url="http://test/") - flagsmith = Flagsmith(environment_key=api_key, pipeline_analytics_config=config) - - mock_record = mocker.patch.object( - flagsmith._pipeline_analytics_processor, "record_evaluation_event" + config = EventProcessorConfig(events_api_url="http://test/") + flagsmith = Flagsmith( + environment_key=api_key, enable_events=True, event_processor_config=config ) - responses.add(method="POST", url=flagsmith.identities_url, body=identities_json) + mock_track = mocker.patch.object(flagsmith._event_processor, "track_exposure_event") responses.add(method="POST", url=flagsmith.identities_url, body=identities_json) - flags = flagsmith.get_identity_flags("user123", traits={"plan": "premium"}) - flags.get_flag("some_feature") + result = flagsmith.get_experiment_flag( + feature_name="some_feature", + identifier="user1", + traits={"plan": "premium"}, + ) - mock_record.assert_called_once_with( - flag_key="some_feature", - enabled=True, + assert isinstance(result, Flag) + assert result.is_default is False + assert result.feature_name == "some_feature" + assert result.value == "some-value" + mock_track.assert_called_once_with( + feature_name="some_feature", + identifier="user1", value="some-value", - identity_identifier="user123", traits={"plan": "premium"}, + metadata=None, + timestamp=None, ) - mock_record.reset_mock() - flags = flagsmith.get_identity_flags( - "user123", - traits={"plan": {"value": "premium", "transient": True}}, +@responses.activate() +def test_get_experiment_flag_skips_exposure_for_default_flag( + mocker: MockerFixture, api_key: str +) -> None: + config = EventProcessorConfig(events_api_url="http://test/") + + def default_flag_handler(feature_name: str) -> DefaultFlag: + return DefaultFlag(enabled=True, value="default-variant") + + flagsmith = Flagsmith( + environment_key=api_key, + enable_events=True, + event_processor_config=config, + default_flag_handler=default_flag_handler, + ) + mock_track = mocker.patch.object(flagsmith._event_processor, "track_exposure_event") + responses.add( + method="POST", + url=flagsmith.identities_url, + body=json.dumps({"flags": [], "traits": []}), ) - flags.get_flag("some_feature") - mock_record.assert_called_once_with( - flag_key="some_feature", - enabled=True, - value="some-value", - identity_identifier="user123", - traits={"plan": "premium"}, + result = flagsmith.get_experiment_flag( + feature_name="missing_feature", identifier="user1" ) + + assert isinstance(result, DefaultFlag) + assert result.is_default is True + assert result.value == "default-variant" + mock_track.assert_not_called() diff --git a/tests/test_pipeline_analytics.py b/tests/test_pipeline_analytics.py deleted file mode 100644 index 2a45899..0000000 --- a/tests/test_pipeline_analytics.py +++ /dev/null @@ -1,182 +0,0 @@ -import json -from concurrent.futures import Future -from unittest import mock - -import pytest - -from flagsmith.analytics import ( - PipelineAnalyticsConfig, - PipelineAnalyticsProcessor, -) - - -def test_record_evaluation_event_buffers_event( - pipeline_analytics_processor: PipelineAnalyticsProcessor, -) -> None: - pipeline_analytics_processor.record_evaluation_event( - flag_key="my_flag", - enabled=True, - value="variant_a", - identity_identifier="user123", - traits={"plan": "premium"}, - ) - - assert len(pipeline_analytics_processor._buffer) == 1 - event = pipeline_analytics_processor._buffer[0] - assert event["event_id"] == "my_flag" - assert event["event_type"] == "flag_evaluation" - assert event["identity_identifier"] == "user123" - assert event["enabled"] is True - assert event["value"] == "variant_a" - assert event["traits"] == {"plan": "premium"} - assert "sdk_version" in event["metadata"] - assert isinstance(event["evaluated_at"], int) - - -@pytest.mark.parametrize( - "second_enabled, expected_count", - [ - (True, 1), # same fingerprint -> deduplicated - (False, 2), # different fingerprint -> both kept - ], -) -def test_evaluation_event_deduplication( - pipeline_analytics_processor: PipelineAnalyticsProcessor, - second_enabled: bool, - expected_count: int, -) -> None: - pipeline_analytics_processor.record_evaluation_event( - flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1" - ) - pipeline_analytics_processor.record_evaluation_event( - flag_key="my_flag", - enabled=second_enabled, - value="v1", - identity_identifier="user1", - ) - - assert len(pipeline_analytics_processor._buffer) == expected_count - - -def test_dedup_keys_cleared_after_flush( - pipeline_analytics_processor: PipelineAnalyticsProcessor, -) -> None: - with mock.patch("flagsmith.analytics.session"): - pipeline_analytics_processor.record_evaluation_event( - flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1" - ) - pipeline_analytics_processor.flush() - - pipeline_analytics_processor.record_evaluation_event( - flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1" - ) - - assert len(pipeline_analytics_processor._buffer) == 1 - - -def test_auto_flush_on_buffer_full() -> None: - config = PipelineAnalyticsConfig( - analytics_server_url="http://test/", max_buffer_items=5 - ) - processor = PipelineAnalyticsProcessor(config=config, environment_key="key") - - with mock.patch("flagsmith.analytics.session"): - for i in range(5): - processor.record_evaluation_event( - flag_key=f"flag_{i}", enabled=True, value=str(i) - ) - - assert len(processor._buffer) == 0 - - -def test_flush_sends_correct_http_request( - pipeline_analytics_processor: PipelineAnalyticsProcessor, -) -> None: - with mock.patch("flagsmith.analytics.session") as mock_session: - pipeline_analytics_processor.record_evaluation_event( - flag_key="my_flag", enabled=True, value="v1", identity_identifier="user1" - ) - pipeline_analytics_processor.flush() - - mock_session.post.assert_called_once() - call_kwargs = mock_session.post.call_args - assert call_kwargs[0][0] == "http://test_analytics/v1/analytics/batch" - - headers = call_kwargs[1]["headers"] - assert headers["X-Environment-Key"] == "test_key" - assert headers["Content-Type"] == "application/json; charset=utf-8" - assert "flagsmith-python-client/" in headers["Flagsmith-SDK-User-Agent"] - - body = json.loads(call_kwargs[1]["data"]) - assert body["environment_key"] == "test_key" - assert len(body["events"]) == 1 - assert body["events"][0]["event_id"] == "my_flag" - - -def test_flush_noop_when_empty( - pipeline_analytics_processor: PipelineAnalyticsProcessor, -) -> None: - with mock.patch("flagsmith.analytics.session") as mock_session: - pipeline_analytics_processor.flush() - - mock_session.post.assert_not_called() - - -def test_failed_flush_requeues_events( - pipeline_analytics_processor: PipelineAnalyticsProcessor, -) -> None: - future: Future[None] = Future() - future.set_exception(Exception("connection error")) - - with mock.patch("flagsmith.analytics.session") as mock_session: - mock_session.post.return_value = future - pipeline_analytics_processor.record_evaluation_event( - flag_key="my_flag", enabled=True, value="v1" - ) - pipeline_analytics_processor.flush() - - assert len(pipeline_analytics_processor._buffer) == 1 - assert pipeline_analytics_processor._buffer[0]["event_id"] == "my_flag" - - -def test_record_custom_event( - pipeline_analytics_processor: PipelineAnalyticsProcessor, -) -> None: - pipeline_analytics_processor.record_custom_event( - event_name="purchase", - identity_identifier="user1", - traits={"plan": "premium"}, - metadata={"amount": 99}, - ) - # Custom events are never deduplicated - pipeline_analytics_processor.record_custom_event( - event_name="purchase", - identity_identifier="user1", - ) - - assert len(pipeline_analytics_processor._buffer) == 2 - event = pipeline_analytics_processor._buffer[0] - assert event["event_id"] == "purchase" - assert event["event_type"] == "custom_event" - assert event["enabled"] is None - assert event["value"] is None - assert event["traits"] == {"plan": "premium"} - assert event["metadata"]["amount"] == 99 - assert "sdk_version" in event["metadata"] - - -def test_start_stop_lifecycle() -> None: - config = PipelineAnalyticsConfig( - analytics_server_url="http://test/", flush_interval_seconds=100 - ) - processor = PipelineAnalyticsProcessor(config=config, environment_key="key") - - processor.start() - assert processor._timer is not None - assert processor._timer.is_alive() - - with mock.patch("flagsmith.analytics.session"): - processor.record_evaluation_event(flag_key="my_flag", enabled=True, value="v1") - processor.stop() - - assert len(processor._buffer) == 0