Skip to content

Commit

Permalink
Merge 1e04b09 into 0fcfb6f
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanklee86 committed Sep 28, 2022
2 parents 0fcfb6f + 1e04b09 commit 9f359e5
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 8 deletions.
42 changes: 36 additions & 6 deletions UnleashClient/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# pylint: disable=invalid-name
import warnings
import random
import string
from datetime import datetime, timezone
from typing import Callable, Optional
from apscheduler.job import Job
from apscheduler.schedulers.base import BaseScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.executors.pool import ThreadPoolExecutor
from UnleashClient.api import register_client
from UnleashClient.periodic_tasks import fetch_and_load_features, aggregate_and_send_metrics
from UnleashClient.strategies import ApplicationHostname, Default, GradualRolloutRandom, \
Expand Down Expand Up @@ -35,6 +39,8 @@ class UnleashClient:
:param cache_directory: Location of the cache directory. When unset, FCache will determine the location.
:param verbose_log_level: Numerical log level (https://docs.python.org/3/library/logging.html#logging-levels) for cases where checking a feature flag fails.
:param cache: Custom cache implementation that extends UnleashClient.cache.BaseCache. When unset, UnleashClient will use Fcache.
:param scheduler: Custom APScheduler object. Use this if you want to customize jobstore or executors. When unset, UnleashClient will create it's own scheduler.
:param scheduler_executor: Name of APSCheduler executor to use if using a custom scheduler.
"""
def __init__(self,
url: str,
Expand All @@ -53,7 +59,9 @@ def __init__(self,
cache_directory: Optional[str] = None,
project_name: str = None,
verbose_log_level: int = 30,
cache: Optional[BaseCache] = None) -> None:
cache: Optional[BaseCache] = None,
scheduler: Optional[BaseScheduler] = None,
scheduler_executor: Optional[str] = None) -> None:
custom_headers = custom_headers or {}
custom_options = custom_options or {}
custom_strategies = custom_strategies or {}
Expand All @@ -80,7 +88,6 @@ def __init__(self,

# Class objects
self.features: dict = {}
self.scheduler = BackgroundScheduler()
self.fl_job: Job = None
self.metric_job: Job = None

Expand All @@ -91,6 +98,27 @@ def __init__(self,
})
self.unleash_bootstrapped = self.cache.bootstrapped

# Scheduler bootstrapping
# - Figure out the Unleash executor name.
if scheduler and scheduler_executor:
self.unleash_executor_name = scheduler_executor
elif scheduler and not scheduler_executor:
raise ValueError("If using a custom scheduler, you must specify a executor.")
else:
if not scheduler:
LOGGER.warning("scheduler_executor should only be used with a custom scheduler.")

self.unleash_executor_name = f"unleash_executor_{''.join(random.choices(string.ascii_uppercase + string.digits, k=6))}"

# Set up the scheduler.
if scheduler:
self.unleash_scheduler = scheduler
else:
executors = {
self.unleash_executor_name: ThreadPoolExecutor()
}
self.unleash_scheduler = BackgroundScheduler(executors=executors)

# Mappings
default_strategy_mapping = {
"applicationHostname": ApplicationHostname,
Expand Down Expand Up @@ -184,20 +212,22 @@ def initialize_client(self, fetch_toggles: bool = True) -> None:

job_func(**job_args) # type: ignore
# Start periodic jobs
self.scheduler.start()
self.fl_job = self.scheduler.add_job(job_func,
self.unleash_scheduler.start()
self.fl_job = self.unleash_scheduler.add_job(job_func,
trigger=IntervalTrigger(
seconds=int(self.unleash_refresh_interval),
jitter=self.unleash_refresh_jitter,
),
executor=self.unleash_executor_name,
kwargs=job_args)

if not self.unleash_disable_metrics:
self.metric_job = self.scheduler.add_job(aggregate_and_send_metrics,
self.metric_job = self.unleash_scheduler.add_job(aggregate_and_send_metrics,
trigger=IntervalTrigger(
seconds=int(self.unleash_metrics_interval),
jitter=self.unleash_metrics_jitter,
),
executor=self.unleash_executor_name,
kwargs=metrics_args)
except Exception as excep:
# Log exceptions during initialization. is_initialized will remain false.
Expand All @@ -218,7 +248,7 @@ def destroy(self) -> None:
self.fl_job.remove()
if self.metric_job:
self.metric_job.remove()
self.scheduler.shutdown()
self.unleash_scheduler.shutdown()
self.cache.destroy()

@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ disable = [
]
max-attributes = 25
max-args = 25
max-locals = 20
max-locals = 25
extension-pkg-allow-list = ["mmh3"]

[tool.setuptools_scm]
Expand Down
45 changes: 44 additions & 1 deletion tests/unit_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import pytest
import responses
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
from UnleashClient import UnleashClient
from UnleashClient.strategies import Strategy
from tests.utilities.testing_constants import URL, ENVIRONMENT, APP_NAME, INSTANCE_ID, REFRESH_INTERVAL, REFRESH_JITTER, \
Expand Down Expand Up @@ -228,7 +230,7 @@ def test_uc_dirty_cache(unleash_client_nodestroy):
unleash_client.initialize_client()
time.sleep(5)
assert unleash_client.is_enabled("testFlag")
unleash_client.scheduler.shutdown()
unleash_client.unleash_scheduler.shutdown()

# Check that everything works if previous cache exists.
unleash_client.initialize_client()
Expand Down Expand Up @@ -492,3 +494,44 @@ def test_uc_cache_bootstrap_url(cache):
)
assert len(unleash_client.features) >= 4
assert unleash_client.is_enabled("testFlag")


@responses.activate
def test_uc_custom_scheduler():
# Set up API
responses.add(responses.POST, URL + REGISTER_URL, json={}, status=202)
responses.add(responses.GET, URL + FEATURES_URL, json=MOCK_FEATURE_RESPONSE, status=200, headers={'etag': ETAG_VALUE})
responses.add(responses.POST, URL + METRICS_URL, json={}, status=202)

# Set up UnleashClient
custom_executors = {
'hamster_executor': ThreadPoolExecutor()
}

custom_scheduler = BackgroundScheduler(
executors=custom_executors
)

unleash_client = UnleashClient(
URL,
APP_NAME,
refresh_interval=REFRESH_INTERVAL,
metrics_interval=METRICS_INTERVAL,
scheduler=custom_scheduler,
scheduler_executor='hamster_executor'
)

# Create Unleash client and check initial load
unleash_client.initialize_client()
time.sleep(1)
assert unleash_client.is_initialized
assert len(unleash_client.features) >= 4

# Simulate caching
responses.add(responses.GET, URL + FEATURES_URL, json={}, status=304, headers={'etag': ETAG_VALUE})
time.sleep(16)

# Simulate server provisioning change
responses.add(responses.GET, URL + FEATURES_URL, json=MOCK_ALL_FEATURES, status=200, headers={'etag': 'W/somethingelse'})
time.sleep(30)
assert len(unleash_client.features) >= 9

0 comments on commit 9f359e5

Please sign in to comment.