Skip to content

Commit

Permalink
feat: Automatic assignment tracking (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
tyiuhc committed Aug 21, 2023
1 parent 3591c14 commit f3988fd
Show file tree
Hide file tree
Showing 19 changed files with 382 additions and 5 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test-arm.yml
Expand Up @@ -17,6 +17,8 @@ jobs:
install: |
apt update
apt -y install python3
apt -y install pip
apt -y install ca-certificates
run: |
pip install -r requirements.txt
python3 -m unittest discover -s ./tests -p '*_test.py'
4 changes: 4 additions & 0 deletions .github/workflows/test.yml
Expand Up @@ -16,6 +16,10 @@ jobs:
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'

- name: Install requirements
run: pip install -r requirements.txt

- name: Unit Test
run: python -m unittest discover -s ./tests -p '*_test.py'
1 change: 1 addition & 0 deletions requirements.txt
@@ -0,0 +1 @@
amplitude_analytics~=1.1.1
Empty file added src/__init__.py
Empty file.
1 change: 1 addition & 0 deletions src/amplitude_experiment/__init__.py
Expand Up @@ -12,3 +12,4 @@
from .cookie import AmplitudeCookie
from .local.client import LocalEvaluationClient
from .local.config import LocalEvaluationConfig
from .flagresult import FlagResult
4 changes: 4 additions & 0 deletions src/amplitude_experiment/assignment/__init__.py
@@ -0,0 +1,4 @@
from .assignment import Assignment, DAY_MILLIS
from .assignment_filter import AssignmentFilter
from .assignment_service import AssignmentService, to_event
from .assignment_config import AssignmentConfig
24 changes: 24 additions & 0 deletions src/amplitude_experiment/assignment/assignment.py
@@ -0,0 +1,24 @@
import time
from typing import Dict

from ..flagresult import FlagResult
from ..user import User

DAY_MILLIS = 24 * 60 * 60 * 1000


class Assignment:

def __init__(self, user: User, results: Dict[str, FlagResult]):
self.user = user
self.results = results
self.timestamp = time.time()

def canonicalize(self) -> str:
user = self.user.user_id.strip() if self.user.user_id else 'undefined'
device = self.user.device_id.strip() if self.user.device_id else 'undefined'
canonical = user + ' ' + device + ' '
for key in sorted(self.results):
value = self.results[key].value.strip() if self.results[key] else 'undefined'
canonical += key.strip() + ' ' + value + ' '
return canonical
8 changes: 8 additions & 0 deletions src/amplitude_experiment/assignment/assignment_config.py
@@ -0,0 +1,8 @@
import amplitude


class AssignmentConfig(amplitude.Config):
def __init__(self, api_key: str, cache_capacity: int = 65536, **kw):
self.api_key = api_key
self.cache_capacity = cache_capacity
super(AssignmentConfig, self).__init__(**kw)
17 changes: 17 additions & 0 deletions src/amplitude_experiment/assignment/assignment_filter.py
@@ -0,0 +1,17 @@
import time

from .assignment import Assignment
from .assignment import DAY_MILLIS
from ..util.cache import Cache


class AssignmentFilter:
def __init__(self, size: int, ttl_millis: int = DAY_MILLIS):
self.cache = Cache(size, ttl_millis)

def should_track(self, assignment: Assignment) -> bool:
canonical_assignment = assignment.canonicalize()
track = self.cache.get(canonical_assignment) is None
if track:
self.cache.put(canonical_assignment, object())
return track
42 changes: 42 additions & 0 deletions src/amplitude_experiment/assignment/assignment_service.py
@@ -0,0 +1,42 @@
from amplitude import Amplitude, BaseEvent
from ..assignment.assignment import Assignment
from ..assignment.assignment import DAY_MILLIS
from ..assignment.assignment_filter import AssignmentFilter

FLAG_TYPE_MUTUAL_EXCLUSION_GROUP = "mutual-exclusion-group"
FLAG_TYPE_HOLDOUT_GROUP = "holdout-group"


def to_event(assignment: Assignment) -> BaseEvent:
event = BaseEvent(event_type='[Experiment] Assignment', user_id=assignment.user.user_id,
device_id=assignment.user.device_id, event_properties={}, user_properties={})
for key in sorted(assignment.results):
event.event_properties[key + '.variant'] = assignment.results[key].value

set_props = {}
unset_props = {}

for key in sorted(assignment.results):
if assignment.results[key].type == FLAG_TYPE_MUTUAL_EXCLUSION_GROUP:
continue
elif assignment.results[key].is_default_variant:
unset_props[f'[Experiment] {key}'] = '-'
else:
set_props[f'[Experiment] {key}'] = assignment.results[key].value

event.user_properties['$set'] = set_props
event.user_properties['$unset'] = unset_props

event.insert_id = f'{event.user_id} {event.device_id} {hash(assignment.canonicalize())} {assignment.timestamp / DAY_MILLIS}'

return event


class AssignmentService:
def __init__(self, amplitude: Amplitude, assignment_filter: AssignmentFilter):
self.amplitude = amplitude
self.assignmentFilter = assignment_filter

def track(self, assignment: Assignment):
if self.assignmentFilter.should_track(assignment):
self.amplitude.track(to_event(assignment))
9 changes: 9 additions & 0 deletions src/amplitude_experiment/flagresult.py
@@ -0,0 +1,9 @@
class FlagResult:
def __init__(self, value: str, is_default_variant: bool, payload: str = None, exp_key: str = None,
deployed: bool = None, type: str = None):
self.value = value
self.payload = payload
self.is_default_variant = is_default_variant
self.exp_key = exp_key
self.deployed = deployed
self.type = type
28 changes: 24 additions & 4 deletions src/amplitude_experiment/local/client.py
Expand Up @@ -3,7 +3,11 @@
from threading import Lock
from typing import Any, List, Dict

from amplitude import Amplitude

from .config import LocalEvaluationConfig
from ..assignment import Assignment, AssignmentFilter, AssignmentService
from ..assignment.assignment_service import FLAG_TYPE_MUTUAL_EXCLUSION_GROUP, FLAG_TYPE_HOLDOUT_GROUP
from ..user import User
from ..connection_pool import HTTPConnectionPool
from .poller import Poller
Expand All @@ -25,10 +29,16 @@ def __init__(self, api_key: str, config: LocalEvaluationConfig = None):
Returns:
Experiment Client instance.
"""

if not api_key:
raise ValueError("Experiment API key is empty")
self.api_key = api_key
self.config = config or LocalEvaluationConfig()
self.assignment_service = None
if config and config.assignment_config:
instance = Amplitude(config.assignment_config.api_key, config.assignment_config)
self.assignment_service = AssignmentService(instance, AssignmentFilter(
config.assignment_config.cache_capacity))
self.logger = logging.getLogger("Amplitude")
self.logger.addHandler(logging.StreamHandler())
if self.config.debug:
Expand Down Expand Up @@ -58,17 +68,26 @@ def evaluate(self, user: User, flag_keys: List[str] = None) -> Dict[str, Variant
"""
variants = {}
if self.flags is None or len(self.flags) == 0:
if self.assignment_service:
self.assignment_service.track(Assignment(user, {}))
return variants
user_json = str(user)
self.logger.debug(f"[Experiment] Evaluate: User: {user_json} - Flags: {self.flags}")
result_json = evaluate(self.flags, user_json)
self.logger.debug(f"[Experiment] Evaluate Result: {result_json}")
evaluation_result = json.loads(result_json)
filter_result = flag_keys is not None
assignment_result = {}
for key, value in evaluation_result.items():
if value.get('isDefaultVariant') or (filter_result and key not in flag_keys):
continue
variants[key] = Variant(value['variant'].get('key'), value['variant'].get('payload'))
included = not filter_result or key in flag_keys
if not value.get('isDefaultVariant') and included:
variants[key] = Variant(value['variant'].get('key'), value['variant'].get('payload'))
if included or evaluation_result[key]['type'] == FLAG_TYPE_MUTUAL_EXCLUSION_GROUP or \
evaluation_result[key]['type'] == FLAG_TYPE_HOLDOUT_GROUP:
assignment_result[key] = evaluation_result[key]

if self.assignment_service:
self.assignment_service.track(Assignment(user, assignment_result))
return variants

def __do_flags(self):
Expand All @@ -84,7 +103,8 @@ def __do_flags(self):
response = conn.request('GET', '/sdk/v1/flags', body, headers)
response_body = response.read().decode("utf8")
if response.status != 200:
raise Exception(f"[Experiment] Get flagConfigs - received error response: ${response.status}: ${response_body}")
raise Exception(
f"[Experiment] Get flagConfigs - received error response: ${response.status}: ${response_body}")
self.logger.debug(f"[Experiment] Got flag configs: {response_body}")
self.lock.acquire()
self.flags = response_body
Expand Down
7 changes: 6 additions & 1 deletion src/amplitude_experiment/local/config.py
@@ -1,3 +1,6 @@
from src.amplitude_experiment.assignment.assignment_config import AssignmentConfig


class LocalEvaluationConfig:
"""Experiment Local Client Configuration"""

Expand All @@ -6,7 +9,8 @@ class LocalEvaluationConfig:
def __init__(self, debug: bool = False,
server_url: str = DEFAULT_SERVER_URL,
flag_config_polling_interval_millis: int = 30000,
flag_config_poller_request_timeout_millis: int = 10000):
flag_config_poller_request_timeout_millis: int = 10000,
assignment_config: AssignmentConfig = None):
"""
Initialize a config
Parameters:
Expand All @@ -25,3 +29,4 @@ def __init__(self, debug: bool = False,
self.server_url = server_url
self.flag_config_polling_interval_millis = flag_config_polling_interval_millis
self.flag_config_poller_request_timeout_millis = flag_config_poller_request_timeout_millis
self.assignment_config = assignment_config
1 change: 1 addition & 0 deletions src/amplitude_experiment/util/__init__.py
@@ -0,0 +1 @@
from .cache import Cache
67 changes: 67 additions & 0 deletions src/amplitude_experiment/util/cache.py
@@ -0,0 +1,67 @@
import time

class Cache:
class Node:
def __init__(self, key, value):
self.key = key
self.value = value
self.prev = None
self.next = None
self.last_accessed_time = time.time()

def __init__(self, capacity, ttl_millis):
self.capacity = capacity
self.ttl_millis = ttl_millis
self.cache = {}
self.head = self.Node(None, None)
self.tail = self.Node(None, None)
self.head.next = self.tail
self.tail.prev = self.head

def _add_node(self, node):
node.prev = self.head
node.next = self.head.next
self.head.next.prev = node
self.head.next = node

def _remove_node(self, node):
prev = node.prev
next_node = node.next
prev.next = next_node
next_node.prev = prev

def _move_to_head(self, node):
self._remove_node(node)
self._add_node(node)

def get(self, key):
if key in self.cache:
node = self.cache[key]
current_time = time.time()
if (current_time - node.last_accessed_time) * 1000 <= self.ttl_millis:
node.last_accessed_time = current_time # Update last accessed time
self._move_to_head(node)
return node.value
else:
# Node has expired, remove it from the cache
self._remove_node(node)
del self.cache[key]
return None

def put(self, key, value):
if key in self.cache:
node = self.cache[key]
node.value = value
node.last_accessed_time = time.time() # Update last accessed time
self._move_to_head(node)
else:
if len(self.cache) >= self.capacity:
# Evict the least recently used node (tail's prev)
tail_prev = self.tail.prev
self._remove_node(tail_prev)
del self.cache[tail_prev.key]

new_node = self.Node(key, value)
self._add_node(new_node)
self.cache[key] = new_node

Empty file.

0 comments on commit f3988fd

Please sign in to comment.