From 6dd27b5c4982a28a6a68b9c1ecd56bef0749d9b5 Mon Sep 17 00:00:00 2001 From: idle sign Date: Fri, 11 Aug 2017 20:46:10 +0700 Subject: [PATCH] Subscriptions-related options partially described. --- CHANGELOG | 1 + docs/source/grp_subscriptions.rst | 12 ++ docs/source/grp_subscriptions_algos.rst | 6 + docs/source/index_api.rst | 1 + tests/options/test_subscriptions.py | 61 +++++++ uwsgiconf/config.py | 3 + uwsgiconf/options/__init__.py | 1 + uwsgiconf/options/subscriptions.py | 214 +++++++++++++++++++++++ uwsgiconf/options/subscriptions_algos.py | 40 +++++ 9 files changed, 339 insertions(+) create mode 100644 docs/source/grp_subscriptions.rst create mode 100644 docs/source/grp_subscriptions_algos.rst create mode 100644 tests/options/test_subscriptions.py create mode 100644 uwsgiconf/options/subscriptions.py create mode 100644 uwsgiconf/options/subscriptions_algos.py diff --git a/CHANGELOG b/CHANGELOG index c28cf7c..17dc07f 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -4,6 +4,7 @@ uwsgiconf changelog Unreleased ---------- ++ Subscriptions-related options partially described. + More worker-related options described. + More Python-related options described. + rsyslog logger made separate from syslog. diff --git a/docs/source/grp_subscriptions.rst b/docs/source/grp_subscriptions.rst new file mode 100644 index 0000000..23967ed --- /dev/null +++ b/docs/source/grp_subscriptions.rst @@ -0,0 +1,12 @@ +Subscriptions +============= + + +.. toctree:: + :maxdepth: 3 + + grp_subscriptions_algos + + +.. automodule:: uwsgiconf.options.subscriptions + :members: diff --git a/docs/source/grp_subscriptions_algos.rst b/docs/source/grp_subscriptions_algos.rst new file mode 100644 index 0000000..3409cbd --- /dev/null +++ b/docs/source/grp_subscriptions_algos.rst @@ -0,0 +1,6 @@ +Balancing Algorithms +==================== + + +.. automodule:: uwsgiconf.options.subscriptions_algos + :members: diff --git a/docs/source/index_api.rst b/docs/source/index_api.rst index 4764bbe..fa75d4d 100644 --- a/docs/source/index_api.rst +++ b/docs/source/index_api.rst @@ -19,6 +19,7 @@ API grp_routing grp_spooler grp_statics + grp_subscriptions grp_workers grp_python formatters diff --git a/tests/options/test_subscriptions.py b/tests/options/test_subscriptions.py new file mode 100644 index 0000000..d1acdd1 --- /dev/null +++ b/tests/options/test_subscriptions.py @@ -0,0 +1,61 @@ +import pytest + +from uwsgiconf import Section +from uwsgiconf.exceptions import ConfigurationError + + +def test_subscriptions_basics(assert_lines): + + assert_lines([ + 'subscription-mountpoint = 2', + ], Section().subscriptions.set_server_params(mountpoints_depth=2)) + + assert_lines([ + 'subscriptions-sign-check', + ], Section().subscriptions.set_server_verification_params(digest_algo='SHA1'), assert_in=False) + + assert_lines([ + 'subscriptions-sign-check = SHA1:/here', + 'subscriptions-sign-skip-uid = 1001', + 'subscriptions-sign-skip-uid = 1002', + ], Section().subscriptions.set_server_verification_params( + digest_algo='SHA1', dir_cert='/here', no_check_uid=[1001, 1002])) + + assert_lines([ + 'start-unsubscribed = true', + ], Section().subscriptions.set_client_params(start_unsubscribed=True)) + + # Subscribing: + + with pytest.raises(ConfigurationError): # both key and server omitted + Section().subscriptions.subscribe(balancing_weight=2) + + assert_lines([ + 'key=pythonz.net', + 'server=127.0.0.1:4040', + ], Section().subscriptions.subscribe('127.0.0.1:4040', 'pythonz.net')) + + # SNI + assert_lines([ + 'socket=0', + 'key=mydomain.it', + 'sni_crt=/foo/bar.crt,sni_key=/foo/bar.key', + ], Section().subscriptions.subscribe(key='mydomain.it', address=0, sni_cert='/foo/bar.crt', sni_key='/foo/bar.key')) + + # algos + algo = Section.subscriptions.algorithms.weighted_least_reference_count(2) + assert_lines([ + 'backup=2', + 'algo=wlrc', + ], Section().subscriptions.subscribe('127.0.0.1:4040', balancing_algo=algo)) + + # modifiers + assert_lines([ + 'modifier1=31', + 'modifier2=42', + ], Section().subscriptions.subscribe('127.0.0.1:4040', modifier=Section.routing.modifiers.message(42))) + + # signing + assert_lines([ + 'sign=SHA1:myssh001', + ], Section().subscriptions.subscribe('127.0.0.1:4040', signing=('SHA1', 'myssh001'))) diff --git a/uwsgiconf/config.py b/uwsgiconf/config.py index 2d6e0d8..776c49a 100644 --- a/uwsgiconf/config.py +++ b/uwsgiconf/config.py @@ -86,6 +86,9 @@ class Section(OptionsGroup): statics = Options(Statics) # type: Statics """Static file serving options group.""" + subscriptions = Options(Subscriptions) # type: Subscriptions + """Subscription services options group.""" + workers = Options(Workers) # type: Workers """Workers options group.""" diff --git a/uwsgiconf/options/__init__.py b/uwsgiconf/options/__init__.py index 9e153b9..f39838d 100644 --- a/uwsgiconf/options/__init__.py +++ b/uwsgiconf/options/__init__.py @@ -13,4 +13,5 @@ from .routing import Routing from .spooler import Spooler from .statics import Statics +from .subscriptions import Subscriptions from .workers import Workers, Cheapening diff --git a/uwsgiconf/options/subscriptions.py b/uwsgiconf/options/subscriptions.py new file mode 100644 index 0000000..1b7c5ff --- /dev/null +++ b/uwsgiconf/options/subscriptions.py @@ -0,0 +1,214 @@ +from ..base import OptionsGroup +from ..exceptions import ConfigurationError +from ..utils import make_key_val_string, filter_locals +from .subscriptions_algos import * + + +class Subscriptions(OptionsGroup): + """ + This allows some uWSGI instances to announce their presence to subscriptions managing server, + which in its turn can address those nodes (e.g. delegate request processing to them) + and automatically remove dead nodes from the pool. + + .. note:: Subscription system in many ways relies on Master Process. + + .. warning:: The subscription system is meant for "trusted" networks. + All of the nodes in your network can potentially make a total mess with it. + + * http://uwsgi.readthedocs.io/en/latest/SubscriptionServer.html + + """ + + class algorithms(object): + """Balancing algorithms available to use with ``subscribe``.""" + + ip_hash = IpHash + least_reference_count = LeastReferenceCount + weighted_least_reference_count = WeightedLeastReferenceCount + weighted_round_robin = WeightedRoundRobin + + def set_server_params( + self, client_notify_address=None, mountpoints_depth=None, require_vassal=None, + tolerance=None, tolerance_inactive=None, key_dot_split=None): + """Sets subscription server related params. + + :param str|unicode client_notify_address: Set the notification socket for subscriptions. + When you subscribe to a server, you can ask it to "acknowledge" the acceptance of your request. + pointing address (Unix socket or UDP), on which your instance will bind and + the subscription server will send acknowledgements to. + + :param int mountpoints_depth: Enable support of mountpoints of certain depth for subscription system. + + * http://uwsgi-docs.readthedocs.io/en/latest/SubscriptionServer.html#mountpoints-uwsgi-2-1 + + :param bool require_vassal: Require a vassal field (see ``subscribe``) from each subscription. + + :param int tolerance: Subscription reclaim tolerance (seconds). + + :param int tolerance_inactive: Subscription inactivity tolerance (seconds). + + :param bool key_dot_split: Try to fallback to the next part in (dot based) subscription key. + Used, for example, in SNI. + + """ + # todo notify-socket (fallback) relation + self._set('subscription-notify-socket', client_notify_address) + self._set('subscription-mountpoint', mountpoints_depth) + self._set('subscription-vassal-required', require_vassal, cast=bool) + self._set('subscription-tolerance', tolerance) + self._set('subscription-tolerance-inactive', tolerance_inactive) + self._set('subscription-dotsplit', key_dot_split, cast=bool) + + return self._section + + def set_server_verification_params( + self, digest_algo=None, dir_cert=None, tolerance=None, no_check_uid=None, + dir_credentials=None, pass_unix_credentials=None): + """Sets peer verification params for subscription server. + + These are for secured subscriptions. + + :param str|unicode digest_algo: Digest algorithm. Example: SHA1 + + .. note:: Also requires ``dir_cert`` to be set. + + :param str|unicode dir_cert: Certificate directory. + + .. note:: Also requires ``digest_algo`` to be set. + + :param int tolerance: Maximum tolerance (in seconds) of clock skew for secured subscription system. + Default: 24h. + + :param str|unicode|int|list[str|unicode|int] no_check_uid: Skip signature check for the specified uids + when using unix sockets credentials. + + :param str|unicode|list[str|unicode] dir_credentials: Directories to search for subscriptions + key credentials. + + :param bool pass_unix_credentials: Enable management of SCM_CREDENTIALS in subscriptions UNIX sockets. + + """ + if digest_algo and dir_cert: + self._set('subscriptions-sign-check', '%s:%s' % (digest_algo, dir_cert)) + + self._set('subscriptions-sign-check-tolerance', tolerance) + self._set('subscriptions-sign-skip-uid', no_check_uid, multi=True) + self._set('subscriptions-credentials-check', dir_credentials, multi=True) + self._set('subscriptions-use-credentials', pass_unix_credentials, cast=bool) + + return self._section + + def set_client_params( + self, start_unsubscribed=None, clear_on_exit=None, unsubscribe_on_reload=None, + announce_interval=None): + """Sets subscribers related params. + + :param bool start_unsubscribed: Configure subscriptions but do not send them. + .. note:: Useful with master FIFO. + + :param bool clear_on_exit: Force clear instead of unsubscribe during shutdown. + + :param bool unsubscribe_on_reload: Force unsubscribe request even during graceful reload. + + :param int announce_interval: Send subscription announce at the specified interval. Default: 10 master cycles. + + """ + self._set('start-unsubscribed', start_unsubscribed, cast=bool) + self._set('subscription-clear-on-shutdown', clear_on_exit, cast=bool) + self._set('unsubscribe-on-graceful-reload', unsubscribe_on_reload, cast=bool) + self._set('subscribe-freq', announce_interval) + + return self._section + + def subscribe( + self, server=None, key=None, address=None, address_vassal=None, + balancing_weight=None, balancing_algo=None, modifier=None, signing=None, check_file=None, protocol=None, + sni_cert=None, sni_key=None, sni_client_ca=None): + """Registers a subscription intent. + + :param str|unicode server: Subscription server address (UDP or UNIX socket). + + Examples: + * 127.0.0.1:7171 + + :param str|unicode key: Key to subscribe. Generally the domain name (+ optional '/< mountpoint>'). + Examples: + * mydomain.it/foo + * mydomain.it/foo/bar (requires ``mountpoints_depth=2``) + * mydomain.it + * ubuntu64.local:9090 + + :param str|unicode|int address: Address to subscribe (the value for the key) + or zero-based internal socket number (integer). + + :param str|unicode address: Vassal node address. + + :param int balancing_weight: Load balancing value. Default: 1. + + :param balancing_algo: Load balancing algorithm to use. See ``balancing_algorithms`` + .. note:: Since 2.1 + + :param Modifier modifier: Routing modifier object. See ``.routing.modifiers`` + + :param list|tuple signing: Signing basics, expects two elements list/tuple: + (signing_algorithm, key). + + Examples: + * SHA1:idlessh001 + + :param str|unicode check_file: If this file exists the subscription packet is sent, + otherwise it is skipped. + + :param str|unicode protocol: the protocol to use, by default it is ``uwsgi``. + See ``.networking.socket_types``. + + .. note:: Since 2.1 + + :param str|unicode sni_cert: Certificate file to use for SNI proxy management. + * http://uwsgi.readthedocs.io/en/latest/SNI.html#subscription-system-and-sni + + :param str|unicode sni_key: sni_key Key file to use for SNI proxy management. + * http://uwsgi.readthedocs.io/en/latest/SNI.html#subscription-system-and-sni + + :param str|unicode sni_client_ca: Ca file to use for SNI proxy management. + * http://uwsgi.readthedocs.io/en/latest/SNI.html#subscription-system-and-sni + + """ + # todo params: inactive (inactive slot activation) + + if not any((server, key)): + raise ConfigurationError('Subscription requires `server` or `key` to be set.') + + address_key = 'addr' + if isinstance(address, int): + address_key = 'socket' + + if balancing_algo: + backup = getattr(balancing_algo, 'backup_level', None) + + if signing: + signing = ':'.join(signing) + + if modifier: + modifier1 = modifier + if modifier.submod: + modifier2 = modifier.submod + + rule = make_key_val_string( + filter_locals(locals(), drop=['address_key', 'modifier']), + aliases={ + 'address': address_key, + 'address_vassal': 'vassal', + 'signing': 'sign', + 'check_file': 'check', + 'balancing_weight': 'weight', + 'balancing_algo': 'algo', + 'protocol': 'proto', + 'sni_cert': 'sni_crt', + 'sni_client_ca': 'sni_ca', + }, + ) + + self._set('subscribe2', rule) + + return self._section diff --git a/uwsgiconf/options/subscriptions_algos.py b/uwsgiconf/options/subscriptions_algos.py new file mode 100644 index 0000000..cc40d22 --- /dev/null +++ b/uwsgiconf/options/subscriptions_algos.py @@ -0,0 +1,40 @@ +from ..base import ParametrizedValue + + +class BalancingAlgorithm(ParametrizedValue): + + name_separator = '' + + +class BalancingAlgorithmWithBackup(BalancingAlgorithm): + + def __init__(self, backup_level=None): + self.backup_level = backup_level + super(BalancingAlgorithmWithBackup, self).__init__() + + +class WeightedRoundRobin(BalancingAlgorithmWithBackup): + """Weighted round robin algorithm with backup support. + The default algorithm. + + """ + + name = 'wrr' + + +class LeastReferenceCount(BalancingAlgorithmWithBackup): + """Least reference count algorithm with backup support.""" + + name = 'lrc' + + +class WeightedLeastReferenceCount(BalancingAlgorithmWithBackup): + """Weighted least reference count algorithm with backup support.""" + + name = 'wlrc' + + +class IpHash(BalancingAlgorithmWithBackup): + """IP hash algorithm with backup support.""" + + name = 'iphash'