diff --git a/README.md b/README.md index fbe2085..6d95ebb 100644 --- a/README.md +++ b/README.md @@ -149,6 +149,40 @@ if client.initialize: > Note that if evaluation called before Go SDK client initialized, you set the wrong flag key/user for the evaluation or the related feature flag is not found, SDK will return the default value you set. The `fbclient.common_types.EvalDetail` will explain the details of the latest evaluation including error raison. +### Flag Tracking + +`fbclient.client.FBClient.flag_tracker` registers a listener to be notified of feature flag changes in general. + +Note that a flag value change listener is bound to a specific user and flag key. + +The flag value change listener will be notified whenever the SDK receives any change to any feature flag's configuration, +or to a user segment that is referenced by a feature flag. To register a flag value change listener, use `add_flag_value_may_changed_listener` or `add_flag_value_changed_listener` + +When you track a flag change using `add_flag_value_may_changed_listener`, this does not necessarily mean the flag's value has changed for any particular flag, only that some part of the flag configuration was changed so that it *_MAY_* return a different value than it previously returned for some user. + +If you want to track a flag whose value *_MUST_* be changed, `add_flag_value_changed_listener` will register a listener that will be notified if and only if the flag value changes. + +Change notices only work if the SDK is actually connecting to FeatBit feature flag center. +If the SDK is in offline mode, then it cannot know when there is a change, because flags are read on an as-needed basis. + +```python +if client.initialize: + # flag value may be changed + client.flag_tracker.add_flag_value_may_changed_listener(flag_key, user, callback_fn) + # flag value must be changed + client.flag_tracker.add_flag_value_changed_listener(flag_key, user, callback_fn) + +``` +`flag_key`: the key of the feature flag to track + +`user`: the user to evaluate the flag value + +`callback_fn`: the function to be called for the flag value change +* the first argument is the flag key +* the second argument is the latest flag value + + + ### Offline Mode In some situations, you might want to stop making remote calls to FeatBit. Here is how: diff --git a/fbclient/client.py b/fbclient/client.py index 281d3ce..257b4bf 100644 --- a/fbclient/client.py +++ b/fbclient/client.py @@ -12,7 +12,9 @@ REASON_USER_NOT_SPECIFIED, Evaluator) from fbclient.event_processor import DefaultEventProcessor, NullEventProcessor from fbclient.event_types import FlagEvent, Metric, MetricEvent, UserEvent +from fbclient.flag_change_notification import FlagTracker from fbclient.interfaces import DataUpdateStatusProvider +from fbclient.notice_broadcaster import NoticeBroadcater from fbclient.status import DataUpdateStatusProviderImpl from fbclient.streaming import Streaming, _data_to_dict from fbclient.update_processor import NullUpdateProcessor @@ -71,6 +73,9 @@ def __init__(self, config: Config, start_wait: float = 15.): else: self._config.validate() + self._broadcaster = NoticeBroadcater() + self._flag_tracker = FlagTracker(self._broadcaster, self.variation) + # init components # event processor self._event_processor = self._build_event_processor(config) @@ -84,8 +89,7 @@ def __init__(self, config: Config, start_wait: float = 15.): self._update_status_provider = DataUpdateStatusProviderImpl(config.data_storage) # update processor update_processor_ready = threading.Event() - self._update_processor = self._build_update_processor(config, self._update_status_provider, - update_processor_ready) + self._update_processor = self._build_update_processor(config, self._broadcaster, self._update_status_provider, update_processor_ready) self._update_processor.start() if start_wait > 0: @@ -111,7 +115,7 @@ def _build_event_processor(self, config: Config): return DefaultEventProcessor(config, DefaultSender('insight', config, max_size=10)) - def _build_update_processor(self, config: Config, update_status_provider, update_processor_event): + def _build_update_processor(self, config: Config, broadcaster: NoticeBroadcater, update_status_provider, update_processor_event): if config.update_processor_imp: log.debug("Using user-specified update processor: %s" % str(config.update_processor_imp)) return config.update_processor_imp(config, update_status_provider, update_processor_event) @@ -120,7 +124,7 @@ def _build_update_processor(self, config: Config, update_status_provider, update log.debug("Offline mode, SDK disable streaming data updating") return NullUpdateProcessor(config, update_status_provider, update_processor_event) - return Streaming(config, update_status_provider, update_processor_event) + return Streaming(config, broadcaster, update_status_provider, update_processor_event) @property def initialize(self) -> bool: @@ -136,6 +140,15 @@ def initialize(self) -> bool: def update_status_provider(self) -> DataUpdateStatusProvider: return self._update_status_provider + @property + def flag_tracker(self) -> FlagTracker: + """ + Returns an object for tracking changes in feature flag configurations. + The :class:`FlagTracker` contains methods for requesting notifications about feature flag changes using + an event listener model. + """ + return self._flag_tracker + def stop(self): """Releases all threads and network connections used by SDK. @@ -145,6 +158,7 @@ def stop(self): self._data_storage.stop() self._update_processor.stop() self._event_processor.stop() + self._broadcaster.stop() def __enter__(self): return self diff --git a/fbclient/evaluator.py b/fbclient/evaluator.py index da8ad69..89c5781 100644 --- a/fbclient/evaluator.py +++ b/fbclient/evaluator.py @@ -146,7 +146,7 @@ def _match_default_user_variation(self, flag: dict, user: FBUser) -> Optional[_E def _match_any_rule(self, user: FBUser, rule: dict) -> bool: # conditions cannot be empty - return all(self._process_condition(user, condiction) for condiction in rule['conditions']) + return all(self._process_condition(user, condition) for condition in rule['conditions']) def _process_condition(self, user: FBUser, condition: dict) -> bool: op = condition['op'] diff --git a/fbclient/flag_change_notification.py b/fbclient/flag_change_notification.py new file mode 100644 index 0000000..01bd658 --- /dev/null +++ b/fbclient/flag_change_notification.py @@ -0,0 +1,186 @@ + +from abc import ABC, abstractmethod +from typing import Any, Callable +from fbclient.common_types import FBUser +from fbclient.interfaces import Notice +from fbclient.notice_broadcaster import NoticeBroadcater + +FLAG_CHANGE_NOTICE_TYPE = 'flag_change_notice' + + +class FlagChangedNotice(Notice): + def __init__(self, flag_key: str): + self.__flag_key = flag_key + + @property + def notice_type(self) -> str: + return FLAG_CHANGE_NOTICE_TYPE + + @property + def flag_key(self) -> str: + return self.__flag_key + + +class FlagChangedListener(ABC): + """ + A notice listener that is notified when a feature flag's configuration has changed. + + This is an abstract class. You need to implement your own listener by overriding the :func:`on_flag_change` method. + + """ + @abstractmethod + def on_flag_change(self, notice: FlagChangedNotice): + pass + + +class FlagValueChangedListener(FlagChangedListener): + def __init__(self, + flag_key: str, + user: dict, + evaluate_fn: Callable[[str, dict, Any], Any], + flag_value_changed_fn: Callable[[str, Any], None]): + self.__flag_key = flag_key + self.__user = user + self.__evaluate_fn = evaluate_fn + self.__fn = flag_value_changed_fn + # record the flag value when the listener is created + self.__prvious_flag_value = self.__evaluate_fn(self.__flag_key, self.__user, None) + + def on_flag_change(self, notice: FlagChangedNotice): + if notice.flag_key == self.__flag_key: + prev_flag_value = self.__prvious_flag_value + curr_flag_value = self.__evaluate_fn(self.__flag_key, self.__user, None) + if prev_flag_value != curr_flag_value: + self.__fn(self.__flag_key, curr_flag_value) + self.__prvious_flag_value = curr_flag_value + + +class FlagValueMayChangedListener(FlagChangedListener): + def __init__(self, + flag_key: str, + user: dict, + evaluate_fn: Callable[[str, dict, Any], Any], + flag_value_changed_fn: Callable[[str, Any], None]): + self.__flag_key = flag_key + self.__user = user + self.__evaluate_fn = evaluate_fn + self.__fn = flag_value_changed_fn + + def on_flag_change(self, notice: FlagChangedNotice): + if notice.flag_key == self.__flag_key: + curr_flag_value = self.__evaluate_fn(self.__flag_key, self.__user, None) + self.__fn(self.__flag_key, curr_flag_value) + + +class FlagTracker: + """ + A registry to register the flag change listeners in order to track changes in feature flag configurations. + + The registered listerners only work if the SDK is actually connecting to FeatBit feature flag center. + If the SDK is only in offline mode then it cannot know when there is a change, because flags are read on an as-needed basis. + + Application code never needs to initialize or extend this class directly. + """ + + def __init__(self, + flag_change_broadcaster: NoticeBroadcater, + evaluate_fn: Callable[[str, dict, Any], Any]): + """ + :param flag_change_broadcaster: The broadcaster that broadcasts the flag change notices + :param evaluate_fn: The function to evaluate the flag value + """ + self.__broadcater = flag_change_broadcaster + self.__evaluate_fn = evaluate_fn + + def add_flag_value_changed_listener(self, + flag_key: str, + user: dict, + flag_value_changed_fn: Callable[[str, Any], None]) -> FlagValueChangedListener: + """ + Registers a listener to be notified of a change in a specific feature flag's value for a specific FeatBit user. + + The listener will be notified whenever the SDK receives any change to any feature flag's configuration, + or to a user segment that is referenced by a feature flag. + + When you call this method, it first immediately evaluates the feature flag. It then uses :class:`FlagChangeListener` to start listening for feature flag configuration + changes, and whenever the specified feature flag changes, it re-evaluates the flag for the same user. It then calls :class:`FlagValueChangeListener` + if and only if the resulting value has changed. + + :param flag_key: The key of the feature flag to track + :param user: The user to evaluate the flag value + :param flag_value_changed_fn: The function to be called only when this flag value changes + * the first argument is the flag key + * the second argument is the latest flag value, this value must be different from the previous value + + :return: A listener object that can be used to remove it later on. + """ + + # check flag key + if not isinstance(flag_key, str) or not flag_key: + raise ValueError('flag_key must be a non-empty string') + # check user + FBUser.from_dict(user) + # check flag_value_changed_fn + if not isinstance(flag_value_changed_fn, Callable) or not flag_value_changed_fn: + raise ValueError('flag_value_changed_fn must be a callable function') + + listener = FlagValueChangedListener(flag_key, user, self.__evaluate_fn, flag_value_changed_fn) + self.add_flag_changed_listener(listener) + return listener + + def add_flag_value_may_changed_listener(self, + flag_key: str, + user: dict, + flag_value_changed_fn: Callable[[str, Any], None]) -> FlagValueMayChangedListener: + """ + Registers a listener to be notified of a change in a specific feature flag's value for a specific FeatBit user. + + The listener will be notified whenever the SDK receives any change to any feature flag's configuration, + or to a user segment that is referenced by a feature flag. + + Note that this does not necessarily mean the flag's value has changed for any particular flag, + only that some part of the flag configuration was changed so that it may return a different value than it previously returned for some user. + + If you want to track flag value changes,use :func:`add_flag_value_changed_listener instead. + + :param flag_key: The key of the feature flag to track + :param user: The user to evaluate the flag value + :param flag_value_changed_fn: The function to be called only if any changes to a specific flag + * the first argument is the flag key + * the second argument is the latest flag value, this value may be same as the previous value + + :return: A listener object that can be used to remove it later on. + + """ + + # check flag key + if not isinstance(flag_key, str) or not flag_key: + raise ValueError('flag_key must be a non-empty string') + # check user + FBUser.from_dict(user) + # check flag_value_changed_fn + if not isinstance(flag_value_changed_fn, Callable) or not flag_value_changed_fn: + raise ValueError('flag_value_changed_fn must be a callable function') + + listener = FlagValueMayChangedListener(flag_key, user, self.__evaluate_fn, flag_value_changed_fn) + self.add_flag_changed_listener(listener) + return listener + + def add_flag_changed_listener(self, listener: FlagChangedListener): + """ + Registers a listener to be notified of feature flag changes in general. + + The listener will be notified whenever the SDK receives any change to any feature flag's configuration, + or to a user segment that is referenced by a feature flag. + + :param listener: The listener to be registered. The :class:`FlagChangedListner` is an abstract class. You need to implement your own listener. + """ + self.__broadcater.add_listener(FLAG_CHANGE_NOTICE_TYPE, listener.on_flag_change) # type: ignore + + def remove_flag_change_notifier(self, listener: FlagChangedListener): + """ + Unregisters a listener so that it will no longer be notified of feature flag changes. + + :param listener: The listener to be unregistered. The listener must be the same object that was passed to :func:`add_flag_changed_listner` or :func:`add_flag_value_changed_listerner` + """ + self.__broadcater.remove_listener(FLAG_CHANGE_NOTICE_TYPE, listener.on_flag_change) # type: ignore diff --git a/fbclient/interfaces.py b/fbclient/interfaces.py index 20adcdb..084ca1c 100644 --- a/fbclient/interfaces.py +++ b/fbclient/interfaces.py @@ -265,3 +265,17 @@ def stop(self): Shuts down the connection to feature flag center """ pass + + +class Notice(ABC): + """ + This is not an insight event to be sent to FeatBit Flag Center; it is a notice to notify the SDK that something has happened, + such as flag values updated + """ + @property + @abstractmethod + def notice_type(self) -> str: + """ + Returns the type of this notice + """ + pass diff --git a/fbclient/notice_broadcaster.py b/fbclient/notice_broadcaster.py new file mode 100644 index 0000000..725b1c3 --- /dev/null +++ b/fbclient/notice_broadcaster.py @@ -0,0 +1,57 @@ + +from queue import Empty, Queue +from threading import Thread +from typing import Callable +from fbclient.interfaces import Notice + +from fbclient.utils import log + + +class NoticeBroadcater: + def __init__(self): + self.__notice_queue = Queue() + self.__closed = False + self.__listeners = {} + self.__thread = Thread(daemon=True, target=self.__run) + log.debug('notice broadcaster starting...') + self.__thread.start() + + def add_listener(self, notice_type: str, listener: Callable[[Notice], None]): + if isinstance(notice_type, str) and notice_type.strip() and listener is not None: + log.debug('add a listener for notice type %s' % notice_type) + if notice_type not in self.__listeners: + self.__listeners[notice_type] = [] + self.__listeners[notice_type].append(listener) + + def remove_listener(self, notice_type: str, listener: Callable[[Notice], None]): + if notice_type in self.__listeners and listener is not None: + log.debug('remove a listener for notice type %s' % notice_type) + notifiers = self.__listeners[notice_type] + if not notifiers: + del self.__listeners[notice_type] + else: + notifiers.remove(listener) + + def broadcast(self, notice: Notice): + self.__notice_queue.put(notice) + + def stop(self): + log.debug('notice broadcaster stopping...') + self.__closed = True + self.__thread.join() + + def __run(self): + while not self.__closed: + try: + notice = self.__notice_queue.get(block=True, timeout=1) + self.__notice_process(notice) + except Empty: + pass + + def __notice_process(self, notice: Notice): + if notice.notice_type in self.__listeners: + for listerner in self.__listeners[notice.notice_type]: + try: + listerner(notice) + except Exception as e: + log.exception('FB Python SDK: unexpected error in handle notice %s: %s' % (notice.notice_type, str(e))) diff --git a/fbclient/status.py b/fbclient/status.py index 01602b7..93242da 100644 --- a/fbclient/status.py +++ b/fbclient/status.py @@ -36,6 +36,9 @@ def __handle_exception(self, error: Exception, error_type: str, message: str): log.exception('FB Python SDK: Data Storage error: %s, UpdateProcessor will attempt to receive the data' % str(error)) self.update_state(State.interrupted_state(error_type, message)) + def get_all(self, kind: Category) -> Mapping[str, dict]: + return self.__storage.get_all(kind) + @property def initialized(self) -> bool: return self.__storage.initialized diff --git a/fbclient/streaming.py b/fbclient/streaming.py index 3ee2b27..ba7081c 100644 --- a/fbclient/streaming.py +++ b/fbclient/streaming.py @@ -8,7 +8,10 @@ from fbclient.category import FEATURE_FLAGS, SEGMENTS from fbclient.config import Config -from fbclient.interfaces import DataUpdateStatusProvider, UpdateProcessor +from fbclient.flag_change_notification import FlagChangedNotice +from fbclient.interfaces import UpdateProcessor +from fbclient.notice_broadcaster import NoticeBroadcater +from fbclient.status import DataUpdateStatusProviderImpl from fbclient.status_types import (DATA_INVALID_ERROR, NETWORK_ERROR, REQUEST_INVALID_ERROR, RUNTIME_ERROR, SYSTEM_QUIT, UNKNOWN_CLOSE_CODE, @@ -51,10 +54,12 @@ def _data_to_dict(data: dict) -> Tuple[int, dict]: flag['variationMap'] = dict((var['id'], var['value']) for var in flag['variations']) flag['_id'] = flag['id'] flag['id'] = flag['key'] + flag['cat'] = FEATURE_FLAGS flags[flag['id']] = {'id': flag['id'], 'timestamp': flag['timestamp'], 'isArchived': True} if flag['isArchived'] else flag version = max(version, flag['timestamp']) for segment in data['segments']: segment['timestamp'] = from_str_datetime_to_millis(segment['updatedAt']) + segment['cat'] = SEGMENTS segments[segment['id']] = {'id': segment['id'], 'timestamp': segment['timestamp'], 'isArchived': True} if segment['isArchived'] else segment version = max(version, segment['timestamp']) return version, all_data @@ -75,9 +80,10 @@ def _handle_ws_error(error: BaseException) -> Tuple[bool, bool, State]: class Streaming(Thread, UpdateProcessor): __ping_interval = 10.0 - def __init__(self, config: Config, dataUpdateStatusProvider: DataUpdateStatusProvider, ready: Event): + def __init__(self, config: Config, broadcaster: NoticeBroadcater, dataUpdateStatusProvider: DataUpdateStatusProviderImpl, ready: Event): super().__init__(daemon=True) self.__config = config + self.__broadcaster = broadcaster self.__storage = dataUpdateStatusProvider self.__ready = ready self.__running = True @@ -188,6 +194,38 @@ def _on_open(self, wsapp: websocket.WebSocketApp): def _on_process_data(self, data): + def get_feature_flag_keys_from_segment(flags, segment_id): + flag_keys = [] + for flag in flags.values(): + for rule in flag['rules']: # type: ignore + for condition in rule['conditions']: # type: ignore + op = condition['op'] # type: ignore + if not op: + try: + segment_ids = json.loads(condition['value']) # type: ignore + if segment_id in segment_ids: # type: ignore + flag_keys.append(flag['id']) # type: ignore + except: + pass + return set(flag_keys) + + def broacasting(): + flags = self.__storage.get_all(FEATURE_FLAGS) + for items in all_data.values() : + for item in sorted(items.values(), key=lambda x: x['timestamp']): + if item['cat'] == FEATURE_FLAGS: + # listen flag changes + if item['id'] not in broacasting_ingnored_flag_keys: + broacasting_ingnored_flag_keys.append(item['id']) + self.__broadcaster.broadcast(FlagChangedNotice(item['id'])) + elif item['cat'] == SEGMENTS: + # listen segment changes + flag_keys = get_feature_flag_keys_from_segment(flags, item['id']) + for flag_key in flag_keys: + if flag_key not in broacasting_ingnored_flag_keys: + broacasting_ingnored_flag_keys.append(flag_key) + self.__broadcaster.broadcast(FlagChangedNotice(flag_key)) + log.debug('Streaming WebSocket is processing data') version, all_data = _data_to_dict(data) op_ok = False @@ -201,6 +239,9 @@ def _on_process_data(self, data): # set ready when the initialization is complete. self.__ready.set() self.__storage.update_state(State.ok_state()) + # broadcast the flag change notices + broacasting_ingnored_flag_keys = [] + broacasting() log.debug("processing data is well done") return op_ok diff --git a/fbclient/version.py b/fbclient/version.py index 3c37171..6886913 100644 --- a/fbclient/version.py +++ b/fbclient/version.py @@ -1 +1 @@ -VERSION = "1.1.3" +VERSION = "1.1.4" diff --git a/tests/test_fbclient.py b/tests/test_fbclient.py index 031b673..74e8e59 100644 --- a/tests/test_fbclient.py +++ b/tests/test_fbclient.py @@ -1,7 +1,6 @@ import base64 from datetime import datetime from pathlib import Path -from time import sleep from unittest.mock import patch import pytest diff --git a/tests/test_flag_change_notification.py b/tests/test_flag_change_notification.py new file mode 100644 index 0000000..723cc1b --- /dev/null +++ b/tests/test_flag_change_notification.py @@ -0,0 +1,113 @@ + + +from queue import Empty, Queue + +import pytest + +from fbclient.flag_change_notification import (FlagChangedListener, + FlagChangedNotice, FlagTracker) +from fbclient.interfaces import Notice +from fbclient.notice_broadcaster import NoticeBroadcater + +TEST_NOTICE_TYPE = 'test_notice_type' + + +class TestNotice(Notice): + def __init__(self, notice_type: str, content: str): + self.__notice_type = notice_type + self.__content = content + + @property + def notice_type(self) -> str: + return self.__notice_type + + @property + def content(self) -> str: + return self.__content + + +class FakeFlagChangedListener(FlagChangedListener): + def __init__(self, queue: Queue): + self.__queue = queue + + def on_flag_change(self, notice: FlagChangedNotice): + self.__queue.put(notice) + + +@pytest.fixture +def queue(): + return Queue() + + +def test_register_a_listener(queue): + notice_broadcaster = NoticeBroadcater() + notice_broadcaster.add_listener(TEST_NOTICE_TYPE, queue.put) + notice_broadcaster.add_listener(TEST_NOTICE_TYPE, queue.put) + notice_broadcaster.add_listener(TEST_NOTICE_TYPE, queue.put) + + notice = TestNotice(TEST_NOTICE_TYPE, 'test content') + notice_broadcaster.broadcast(notice) + + assert queue.get() == notice + assert queue.get() == notice + assert queue.get() == notice + + notice_broadcaster.stop() + + +def test_unregister_a_listener(queue): + notice_broadcaster = NoticeBroadcater() + notice_broadcaster.add_listener(TEST_NOTICE_TYPE, queue.put) + notice_broadcaster.add_listener(TEST_NOTICE_TYPE, queue.put) + notice_broadcaster.add_listener(TEST_NOTICE_TYPE, queue.put) + notice_broadcaster.remove_listener(TEST_NOTICE_TYPE, queue.put) + + notice = TestNotice(TEST_NOTICE_TYPE, 'test content') + notice_broadcaster.broadcast(notice) + + assert queue.get() == notice + assert queue.get() == notice + + with pytest.raises(Empty): + queue.get(timeout=0.01) + + notice_broadcaster.stop() + + +def test_register_a_flag_changed_listener(queue): + notice_broadcaster = NoticeBroadcater() + flag_changed_listener = FakeFlagChangedListener(queue) + flag_tracker = FlagTracker(notice_broadcaster, None) # type: ignore + flag_tracker.add_flag_changed_listener(flag_changed_listener) + flag_tracker.add_flag_changed_listener(flag_changed_listener) + flag_tracker.add_flag_changed_listener(flag_changed_listener) + + notice = FlagChangedNotice('test_flag_key') + notice_broadcaster.broadcast(notice) + + assert queue.get() == notice + assert queue.get() == notice + assert queue.get() == notice + + notice_broadcaster.stop() + + +def test_unregister_a_flag_changed_listener(queue): + notice_broadcaster = NoticeBroadcater() + flag_changed_listener = FakeFlagChangedListener(queue) + flag_tracker = FlagTracker(notice_broadcaster, None) # type: ignore + flag_tracker.add_flag_changed_listener(flag_changed_listener) + flag_tracker.add_flag_changed_listener(flag_changed_listener) + flag_tracker.add_flag_changed_listener(flag_changed_listener) + flag_tracker.remove_flag_change_notifier(flag_changed_listener) + + notice = FlagChangedNotice('test_flag_key') + notice_broadcaster.broadcast(notice) + + assert queue.get() == notice + assert queue.get() == notice + + with pytest.raises(Empty): + queue.get(timeout=0.01) + + notice_broadcaster.stop()