Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 6.0.3 - 2025-07-07

- feat: add a feature flag evaluation cache (local storage or redis) to support returning flag evaluations when the service is down

# 6.0.2 - 2025-07-02

- fix: send_feature_flags changed to default to false in `Client::capture_exception`
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ We recommend using [uv](https://docs.astral.sh/uv/). It's super fast.
```bash
uv python install 3.9.19
uv python pin 3.9.19
uv venv env
uv venv
source env/bin/activate
uv sync --extra dev --extra test
pre-commit install
Expand Down
2 changes: 2 additions & 0 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,5 @@ posthog/ai/utils.py:0: error: Function "builtins.any" is not valid as a type [v
posthog/ai/utils.py:0: note: Perhaps you meant "typing.Any" instead of "any"?
posthog/ai/utils.py:0: error: Function "builtins.any" is not valid as a type [valid-type]
posthog/ai/utils.py:0: note: Perhaps you meant "typing.Any" instead of "any"?
posthog/client.py:0: error: Name "urlparse" already defined (possibly by an import) [no-redef]
posthog/client.py:0: error: Name "parse_qs" already defined (possibly by an import) [no-redef]
162 changes: 161 additions & 1 deletion posthog/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
to_values,
)
from posthog.utils import (
FlagCache,
RedisFlagCache,
SizeLimitedDict,
clean,
guess_timezone,
Expand Down Expand Up @@ -95,7 +97,30 @@ def add_context_tags(properties):


class Client(object):
"""Create a new PostHog client."""
"""Create a new PostHog client.

Examples:
Basic usage:
>>> client = Client("your-api-key")

With memory-based feature flag fallback cache:
>>> client = Client(
... "your-api-key",
... flag_fallback_cache_url="memory://local/?ttl=300&size=10000"
... )

With Redis fallback cache for high-scale applications:
>>> client = Client(
... "your-api-key",
... flag_fallback_cache_url="redis://localhost:6379/0/?ttl=300"
... )

With Redis authentication:
>>> client = Client(
... "your-api-key",
... flag_fallback_cache_url="redis://username:password@localhost:6379/0/?ttl=300"
... )
"""
Comment on lines +100 to +123
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this overkill to include in the docstring? makes it seem like this is the only thing worth passing to the Client, but I didn't know where else to put the docs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine, for now at least. Documenting the behaviour of these args on posthog.com seems like the generally accepted approach


log = logging.getLogger("posthog")

Expand Down Expand Up @@ -126,6 +151,7 @@ def __init__(
project_root=None,
privacy_mode=False,
before_send=None,
flag_fallback_cache_url=None,
):
self.queue = queue.Queue(max_queue_size)

Expand All @@ -151,6 +177,8 @@ def __init__(
)
self.poller = None
self.distinct_ids_feature_flags_reported = SizeLimitedDict(MAX_DICT_SIZE, set)
self.flag_cache = self._initialize_flag_cache(flag_fallback_cache_url)
self.flag_definition_version = 0
self.disabled = disabled
self.disable_geoip = disable_geoip
self.historical_migration = historical_migration
Expand Down Expand Up @@ -707,6 +735,9 @@ def shutdown(self):

def _load_feature_flags(self):
try:
# Store old flags to detect changes
old_flags_by_key: dict[str, dict] = self.feature_flags_by_key or {}

response = get(
self.personal_api_key,
f"/api/feature_flag/local_evaluation/?token={self.api_key}&send_cohorts",
Expand All @@ -718,6 +749,14 @@ def _load_feature_flags(self):
self.group_type_mapping = response["group_type_mapping"] or {}
self.cohorts = response["cohorts"] or {}

# Check if flag definitions changed and update version
if self.flag_cache and old_flags_by_key != (
self.feature_flags_by_key or {}
):
old_version = self.flag_definition_version
self.flag_definition_version += 1
self.flag_cache.invalidate_version(old_version)

except APIError as e:
if e.status == 401:
self.log.error(
Expand All @@ -739,6 +778,10 @@ def _load_feature_flags(self):
self.group_type_mapping = {}
self.cohorts = {}

# Clear flag cache when quota limited
if self.flag_cache:
self.flag_cache.clear()

if self.debug:
raise APIError(
status=402,
Expand Down Expand Up @@ -889,6 +932,12 @@ def _get_feature_flag_result(
flag_result = FeatureFlagResult.from_value_and_payload(
key, lookup_match_value, payload
)

# Cache successful local evaluation
if self.flag_cache and flag_result:
self.flag_cache.set_cached_flag(
distinct_id, key, flag_result, self.flag_definition_version
)
elif not only_evaluate_locally:
try:
flag_details, request_id = self._get_feature_flag_details_from_decide(
Expand All @@ -902,12 +951,30 @@ def _get_feature_flag_result(
flag_result = FeatureFlagResult.from_flag_details(
flag_details, override_match_value
)

# Cache successful remote evaluation
if self.flag_cache and flag_result:
self.flag_cache.set_cached_flag(
distinct_id, key, flag_result, self.flag_definition_version
)

self.log.debug(
f"Successfully computed flag remotely: #{key} -> #{flag_result}"
)
except Exception as e:
self.log.exception(f"[FEATURE FLAGS] Unable to get flag remotely: {e}")

# Fallback to cached value if remote evaluation fails
if self.flag_cache:
stale_result = self.flag_cache.get_stale_cached_flag(
distinct_id, key
)
if stale_result:
self.log.info(
f"[FEATURE FLAGS] Using stale cached value for flag {key}"
)
flag_result = stale_result

if send_feature_flag_events:
self._capture_feature_flag_called(
distinct_id,
Expand Down Expand Up @@ -1278,6 +1345,99 @@ def _get_all_flags_and_payloads_locally(
"featureFlagPayloads": payloads,
}, fallback_to_decide

def _initialize_flag_cache(self, cache_url):
"""Initialize feature flag cache for graceful degradation during service outages.

When enabled, the cache stores flag evaluation results and serves them as fallback
when the PostHog API is unavailable. This ensures your application continues to
receive flag values even during outages.

Args:
cache_url: Cache configuration URL. Examples:
- None: Disable caching
- "memory://local/?ttl=300&size=10000": Memory cache with TTL and size
- "redis://localhost:6379/0/?ttl=300": Redis cache with TTL
- "redis://username:password@host:port/?ttl=300": Redis with auth

Example usage:
# Memory cache
client = Client(
"your-api-key",
flag_fallback_cache_url="memory://local/?ttl=300&size=10000"
)

# Redis cache
client = Client(
"your-api-key",
flag_fallback_cache_url="redis://localhost:6379/0/?ttl=300"
)

# Normal evaluation - cache is populated
flag_value = client.get_feature_flag("my-flag", "user123")

# During API outage - returns cached value instead of None
flag_value = client.get_feature_flag("my-flag", "user123") # Uses cache
"""
if not cache_url:
return None

try:
from urllib.parse import urlparse, parse_qs
except ImportError:
from urlparse import urlparse, parse_qs

try:
parsed = urlparse(cache_url)
scheme = parsed.scheme.lower()
query_params = parse_qs(parsed.query)
ttl = int(query_params.get("ttl", [300])[0])

if scheme == "memory":
size = int(query_params.get("size", [10000])[0])
return FlagCache(size, ttl)

elif scheme == "redis":
try:
# Not worth importing redis if we're not using it
import redis

redis_url = f"{parsed.scheme}://"
if parsed.username or parsed.password:
redis_url += f"{parsed.username or ''}:{parsed.password or ''}@"
redis_url += (
f"{parsed.hostname or 'localhost'}:{parsed.port or 6379}"
)
if parsed.path:
redis_url += parsed.path

client = redis.from_url(redis_url)

# Test connection before using it
client.ping()

return RedisFlagCache(client, default_ttl=ttl)

except ImportError:
self.log.warning(
"[FEATURE FLAGS] Redis not available, flag caching disabled"
)
return None
except Exception as e:
self.log.warning(
f"[FEATURE FLAGS] Redis connection failed: {e}, flag caching disabled"
)
return None
Comment on lines +1420 to +1429
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

originally i was falling back to "memory" if they couldn't connect to redis, but that was a bad idea because I don't want to silently create an in-memory cache if they can't initialize a redis connection. Better to just warn and move on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense I think - do what's asked, or not at all.

else:
raise ValueError(
f"Unknown cache URL scheme: {scheme}. Supported schemes: memory, redis"
)

except Exception as e:
self.log.warning(
f"[FEATURE FLAGS] Failed to parse cache URL '{cache_url}': {e}"
)
return None

def feature_flag_definitions(self):
return self.feature_flags

Expand Down
123 changes: 123 additions & 0 deletions posthog/test/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import unittest
from dataclasses import dataclass
from datetime import date, datetime, timedelta
Expand All @@ -12,6 +13,7 @@
from pydantic.v1 import BaseModel as BaseModelV1

from posthog import utils
from posthog.types import FeatureFlagResult

TEST_API_KEY = "kOOlRy2QlMY9jHZQv0bKz0FZyazBUoY8Arj0lFVNjs4"
FAKE_TEST_API_KEY = "random_key"
Expand Down Expand Up @@ -173,3 +175,124 @@ class TestDataClass:
"inner_optional": None,
},
}


class TestFlagCache(unittest.TestCase):
def setUp(self):
self.cache = utils.FlagCache(max_size=3, default_ttl=1)
self.flag_result = FeatureFlagResult.from_value_and_payload(
"test-flag", True, None
)

def test_cache_basic_operations(self):
distinct_id = "user123"
flag_key = "test-flag"
flag_version = 1

# Test cache miss
result = self.cache.get_cached_flag(distinct_id, flag_key, flag_version)
assert result is None

# Test cache set and hit
self.cache.set_cached_flag(
distinct_id, flag_key, self.flag_result, flag_version
)
result = self.cache.get_cached_flag(distinct_id, flag_key, flag_version)
assert result is not None
assert result.get_value()

def test_cache_ttl_expiration(self):
distinct_id = "user123"
flag_key = "test-flag"
flag_version = 1

# Set flag in cache
self.cache.set_cached_flag(
distinct_id, flag_key, self.flag_result, flag_version
)

# Should be available immediately
result = self.cache.get_cached_flag(distinct_id, flag_key, flag_version)
assert result is not None

# Wait for TTL to expire (1 second + buffer)
time.sleep(1.1)

# Should be expired
result = self.cache.get_cached_flag(distinct_id, flag_key, flag_version)
assert result is None

def test_cache_version_invalidation(self):
distinct_id = "user123"
flag_key = "test-flag"
old_version = 1
new_version = 2

# Set flag with old version
self.cache.set_cached_flag(distinct_id, flag_key, self.flag_result, old_version)

# Should hit with old version
result = self.cache.get_cached_flag(distinct_id, flag_key, old_version)
assert result is not None

# Should miss with new version
result = self.cache.get_cached_flag(distinct_id, flag_key, new_version)
assert result is None

# Invalidate old version
self.cache.invalidate_version(old_version)

# Should miss even with old version after invalidation
result = self.cache.get_cached_flag(distinct_id, flag_key, old_version)
assert result is None

def test_stale_cache_functionality(self):
distinct_id = "user123"
flag_key = "test-flag"
flag_version = 1

# Set flag in cache
self.cache.set_cached_flag(
distinct_id, flag_key, self.flag_result, flag_version
)

# Wait for TTL to expire
time.sleep(1.1)

# Should not get fresh cache
result = self.cache.get_cached_flag(distinct_id, flag_key, flag_version)
assert result is None

# Should get stale cache (within 1 hour default)
stale_result = self.cache.get_stale_cached_flag(distinct_id, flag_key)
assert stale_result is not None
assert stale_result.get_value()

def test_lru_eviction(self):
# Cache has max_size=3, so adding 4 users should evict the LRU one
flag_version = 1

# Add 3 users
for i in range(3):
user_id = f"user{i}"
self.cache.set_cached_flag(
user_id, "test-flag", self.flag_result, flag_version
)

# Access user0 to make it recently used
self.cache.get_cached_flag("user0", "test-flag", flag_version)

# Add 4th user, should evict user1 (least recently used)
self.cache.set_cached_flag("user3", "test-flag", self.flag_result, flag_version)

# user0 should still be there (was recently accessed)
result = self.cache.get_cached_flag("user0", "test-flag", flag_version)
assert result is not None

# user2 should still be there (was recently added)
result = self.cache.get_cached_flag("user2", "test-flag", flag_version)
assert result is not None

# user3 should be there (just added)
result = self.cache.get_cached_flag("user3", "test-flag", flag_version)
assert result is not None
Loading
Loading