Skip to content
Merged
2 changes: 1 addition & 1 deletion .isort.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ use_parentheses=true
multi_line_output=3
include_trailing_comma=true
line_length=79
known_third_party = requests,setuptools
known_third_party = pytest,requests,requests_futures,setuptools
60 changes: 60 additions & 0 deletions flagsmith/analytics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import json
from datetime import datetime

from requests_futures.sessions import FuturesSession

ANALYTICS_ENDPOINT = "analytics/flags/"

# Used to control how often we send data(in seconds)
ANALYTICS_TIMER = 10

session = FuturesSession(max_workers=4)


class AnalyticsProcessor:
"""
AnalyticsProcessor is used to track how often individual Flags are evaluated within
the Flagsmith SDK. Docs: https://docs.flagsmith.com/advanced-use/flag-analytics.
"""

def __init__(self, environment_key: str, base_api_url: str, timeout: int = 3):
"""
Initialise the AnalyticsProcessor to handle sending analytics on flag usage to
the Flagsmith API.

:param environment_key: environment key obtained from the Flagsmith UI
:param base_api_url: base api url to override when using self hosted version
:param timeout: used to tell requests to stop waiting for a response after a
given number of seconds
"""
self.analytics_endpoint = base_api_url + ANALYTICS_ENDPOINT
self.environment_key = environment_key
self._last_flushed = datetime.now()
self.analytics_data = {}
self.timeout = timeout
super().__init__()

def flush(self):
"""
Sends all the collected data to the api asynchronously and resets the timer
"""

if not self.analytics_data:
return
session.post(
self.analytics_endpoint,
data=json.dumps(self.analytics_data),
timeout=self.timeout,
headers={
"X-Environment-Key": self.environment_key,
"Content-Type": "application/json",
},
)

self.analytics_data.clear()
self._last_flushed = datetime.now()

def track_feature(self, feature_id: int):
self.analytics_data[feature_id] = self.analytics_data.get(feature_id, 0) + 1
if (datetime.now() - self._last_flushed).seconds > ANALYTICS_TIMER:
self.flush()
41 changes: 20 additions & 21 deletions flagsmith/flagsmith.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import requests

from .analytics import AnalyticsProcessor

logger = logging.getLogger(__name__)

SERVER_URL = "https://api.flagsmith.com/api/v1/"
Expand All @@ -19,12 +21,16 @@ def __init__(self, environment_id, api=SERVER_URL, request_timeout=None):
:param api: (optional) api url to override when using self hosted version
:param request_timeout: (optional) request timeout in seconds
"""

self.environment_id = environment_id
self.api = api
self.flags_endpoint = api + FLAGS_ENDPOINT
self.identities_endpoint = api + IDENTITY_ENDPOINT
self.traits_endpoint = api + TRAIT_ENDPOINT
self.request_timeout = request_timeout
self._analytics_processor = AnalyticsProcessor(
environment_id, api, self.request_timeout
)

def get_flags(self, identity=None):
"""
Expand Down Expand Up @@ -62,8 +68,9 @@ def has_feature(self, feature_name):
:return: True if exists, False if not.
"""
data = self._get_flags_response(feature_name)

if data:
feature_id = data["feature"]["id"]
self._analytics_processor.track_feature(feature_id)
return True

return False
Expand All @@ -72,7 +79,7 @@ def feature_enabled(self, feature_name, identity=None):
"""
Get enabled state of given feature for an environment.

:param feature_name: name of feature to determine if enabled (must match 'ID' on flagsmith.com)
:param feature_name: name of feature to determine if enabled
:param identity: (optional) application's unique identifier for the user to check feature state
:return: True / False if feature exists. None otherwise.
"""
Expand All @@ -81,21 +88,19 @@ def feature_enabled(self, feature_name, identity=None):

data = self._get_flags_response(feature_name, identity)

if data:
if data.get("flags"):
for flag in data.get("flags"):
if flag["feature"]["name"] == feature_name:
return flag["enabled"]
else:
return data["enabled"]
else:
if not data:
return None

feature_id = data["feature"]["id"]
self._analytics_processor.track_feature(feature_id)

return data["enabled"]

def get_value(self, feature_name, identity=None):
"""
Get value of given feature for an environment.

:param feature_name: name of feature to determine value of (must match 'ID' on flagsmith.com)
:param feature_name: name of feature to determine value of
:param identity: (optional) application's unique identifier for the user to check feature state
:return: value of the feature state if feature exists, None otherwise
"""
Expand All @@ -104,17 +109,11 @@ def get_value(self, feature_name, identity=None):

data = self._get_flags_response(feature_name, identity)

if data:
# using new endpoints means that the flags come back in a list, filter this for the one we want and
# return it's value
if data.get("flags"):
for flag in data.get("flags"):
if flag["feature"]["name"] == feature_name:
return flag["feature_state_value"]
else:
return data["feature_state_value"]
else:
if not data:
return None
feature_id = data["feature"]["id"]
self._analytics_processor.track_feature(feature_id)
return data["feature_state_value"]

def get_trait(self, trait_key, identity):
"""
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
requests>=2.19.1
requests>=2.19.1
requests-futures==1.0.0
10 changes: 10 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import pytest

from flagsmith.analytics import AnalyticsProcessor


@pytest.fixture
def analytics_processor():
return AnalyticsProcessor(
environment_key="test_key", base_api_url="http://test_url"
)
63 changes: 63 additions & 0 deletions tests/test_analytics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import json
from datetime import datetime, timedelta
from unittest import mock

from flagsmith.analytics import ANALYTICS_TIMER, AnalyticsProcessor


def test_analytics_processor_track_feature_updates_analytics_data(analytics_processor):
# When
analytics_processor.track_feature(1)
assert analytics_processor.analytics_data[1] == 1

analytics_processor.track_feature(1)
assert analytics_processor.analytics_data[1] == 2


def test_analytics_processor_flush_clears_analytics_data(analytics_processor):
analytics_processor.track_feature(1)
analytics_processor.flush()
assert analytics_processor.analytics_data == {}


def test_analytics_processor_flush_post_request_data_match_ananlytics_data(
analytics_processor,
):
# Given
with mock.patch("flagsmith.analytics.session") as session:
# When
analytics_processor.track_feature(1)
analytics_processor.track_feature(2)
analytics_processor.flush()
# Then
session.post.assert_called()
post_call = session.mock_calls[0]
assert {"1": 1, "2": 1} == json.loads(post_call[2]["data"])


def test_analytics_processor_flush_early_exit_if_analytics_data_is_empty(
analytics_processor,
):
with mock.patch("flagsmith.analytics.session") as session:
analytics_processor.flush()

# Then
session.post.assert_not_called()


def test_analytics_processor_calling_track_feature_calls_flush_when_timer_runs_out(
analytics_processor,
):
# Given
with mock.patch("flagsmith.analytics.datetime") as mocked_datetime, mock.patch(
"flagsmith.analytics.session"
) as session:
# Let's move the time
mocked_datetime.now.return_value = datetime.now() + timedelta(
seconds=ANALYTICS_TIMER + 1
)
# When
analytics_processor.track_feature(1)

# Then
session.post.assert_called()