Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions flagsmith/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from flagsmith import webhooks
from flagsmith.analytics import PipelineAnalyticsConfig
from flagsmith.analytics import EventProcessorConfig
from flagsmith.flagsmith import Flagsmith
from flagsmith.version import __version__

__all__ = ("Flagsmith", "PipelineAnalyticsConfig", "webhooks", "__version__")
__all__ = ("Flagsmith", "EventProcessorConfig", "webhooks", "__version__")
115 changes: 59 additions & 56 deletions flagsmith/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import json
import logging
import threading
import time
import typing
from dataclasses import dataclass
from datetime import datetime
Expand All @@ -18,6 +17,10 @@
# Used to control how often we send data(in seconds)
ANALYTICS_TIMER: typing.Final[int] = 10

FLAG_EXPOSURE_EVENT: typing.Final[str] = "$flag_exposure"

DEFAULT_EVENT_API_URL: typing.Final[str] = "https://events.api.flagsmith.com/"

session = FuturesSession(max_workers=4)


Expand Down Expand Up @@ -72,92 +75,95 @@ def track_feature(self, feature_name: str) -> None:


@dataclass
class PipelineAnalyticsConfig:
analytics_server_url: str
class EventProcessorConfig:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this autoTrackEvaluations property in the JS sdk (naming completely open to discussion), to opt-in/out of auto flag evaluation but I see that you completely removed the flag evaluations

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer removing this entirely — I don't see a use case for it anytime soon. The less code, the better.

events_api_url: str = DEFAULT_EVENT_API_URL
max_buffer_items: int = 1000
flush_interval_seconds: float = 10.0


class PipelineAnalyticsProcessor:
class EventProcessor:
"""
Buffered analytics processor that sends per-evaluation and custom events
to the Flagsmith pipeline analytics endpoint in batches.

Evaluation events are deduplicated within each flush window. Events are
flushed periodically via a background timer or when the buffer is full.
Buffered event processor that batches custom events and POSTs them to the
Flagsmith event endpoint. Flushes on a background timer or when the buffer
fills.
"""

def __init__(
self,
config: PipelineAnalyticsConfig,
config: EventProcessorConfig,
environment_key: str,
) -> None:
url = config.analytics_server_url
url = config.events_api_url
if not url.endswith("/"):
url = f"{url}/"
self._batch_endpoint = f"{url}v1/analytics/batch"
self._batch_endpoint = f"{url}v1/events"
self._environment_key = environment_key
self._max_buffer = config.max_buffer_items
self._flush_interval_seconds = config.flush_interval_seconds

self._buffer: typing.List[typing.Dict[str, typing.Any]] = []
self._dedup_keys: typing.Dict[str, str] = {}
self._lock = threading.Lock()
self._timer: typing.Optional[threading.Timer] = None

def record_evaluation_event(
def track_event(
self,
flag_key: str,
enabled: bool,
value: typing.Any,
identity_identifier: typing.Optional[str] = None,
event: str,
identifier: typing.Optional[str] = None,
value: typing.Optional[typing.Union[str, int, float]] = None,
traits: typing.Optional[typing.Dict[str, typing.Any]] = None,
metadata: typing.Optional[typing.Dict[str, typing.Any]] = None,
timestamp: typing.Optional[datetime] = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only caller using the timestamp is in a test and I don't think this should be a parameter. We would be better owning the value entirely here https://github.com/Flagsmith/flagsmith-python-client/pull/220/changes#diff-6ab1fe663f72c6d402a35766b992df078f6402650191f09d3df831a3d9aeb439R166-R167 and dropping the parameter in all the track_* methods

) -> None:
fingerprint = f"{identity_identifier or 'none'}|{enabled}|{value}"
should_flush = False

with self._lock:
if self._dedup_keys.get(flag_key) == fingerprint:
return
self._dedup_keys[flag_key] = fingerprint
self._buffer.append(
{
"event_id": flag_key,
"event_type": "flag_evaluation",
"evaluated_at": int(time.time() * 1000),
"identity_identifier": identity_identifier,
"enabled": enabled,
"value": value,
"traits": dict(traits) if traits else None,
"metadata": {"sdk_version": __version__},
}
)
if len(self._buffer) >= self._max_buffer:
should_flush = True

if should_flush:
self.flush()
self._buffer_event(
event=event,
feature_name=None,
identifier=identifier,
value=value,
traits=traits,
metadata=metadata,
timestamp=timestamp,
)

def record_custom_event(
def track_exposure_event(
self,
event_name: str,
identity_identifier: typing.Optional[str] = None,
feature_name: str,
identifier: typing.Optional[str] = None,
value: typing.Optional[str] = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional? Should be the same as track_event Option[Union[str,int,float]]

traits: typing.Optional[typing.Dict[str, typing.Any]] = None,
metadata: typing.Optional[typing.Dict[str, typing.Any]] = None,
timestamp: typing.Optional[datetime] = None,
) -> None:
should_flush = False
self._buffer_event(
event=FLAG_EXPOSURE_EVENT,
feature_name=feature_name,
identifier=identifier,
value=value,
traits=traits,
metadata=metadata,
timestamp=timestamp,
)

def _buffer_event(
self,
event: str,
feature_name: typing.Optional[str],
identifier: typing.Optional[str],
value: typing.Optional[typing.Union[str, int, float]],
traits: typing.Optional[typing.Dict[str, typing.Any]],
metadata: typing.Optional[typing.Dict[str, typing.Any]],
timestamp: typing.Optional[datetime],
) -> None:
should_flush = False
with self._lock:
self._buffer.append(
{
"event_id": event_name,
"event_type": "custom_event",
"evaluated_at": int(time.time() * 1000),
"identity_identifier": identity_identifier,
"enabled": None,
"value": None,
"event": event,
"feature_name": feature_name,
"identifier": identifier,
"value": value,
"traits": dict(traits) if traits else None,
"metadata": {**(metadata or {}), "sdk_version": __version__},
"timestamp": int((timestamp or datetime.now()).timestamp() * 1000),
}
)
if len(self._buffer) >= self._max_buffer:
Expand All @@ -172,11 +178,8 @@ def flush(self) -> None:
return
events = self._buffer
self._buffer = []
self._dedup_keys.clear()

payload = json.dumps(
{"events": events, "environment_key": self._environment_key}
)
payload = json.dumps({"events": events})
try:
future = session.post(
self._batch_endpoint,
Expand Down
Loading
Loading