Skip to content

Commit

Permalink
Subscriptions-related options partially described.
Browse files Browse the repository at this point in the history
  • Loading branch information
idlesign committed Aug 11, 2017
1 parent c65eba1 commit 6dd27b5
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG
Expand Up @@ -4,6 +4,7 @@ uwsgiconf changelog

Unreleased
----------
+ Subscriptions-related options partially described.
+ More worker-related options described.
+ More Python-related options described.
+ rsyslog logger made separate from syslog.
Expand Down
12 changes: 12 additions & 0 deletions docs/source/grp_subscriptions.rst
@@ -0,0 +1,12 @@
Subscriptions
=============


.. toctree::
:maxdepth: 3

grp_subscriptions_algos


.. automodule:: uwsgiconf.options.subscriptions
:members:
6 changes: 6 additions & 0 deletions docs/source/grp_subscriptions_algos.rst
@@ -0,0 +1,6 @@
Balancing Algorithms
====================


.. automodule:: uwsgiconf.options.subscriptions_algos
:members:
1 change: 1 addition & 0 deletions docs/source/index_api.rst
Expand Up @@ -19,6 +19,7 @@ API
grp_routing
grp_spooler
grp_statics
grp_subscriptions
grp_workers
grp_python
formatters
61 changes: 61 additions & 0 deletions tests/options/test_subscriptions.py
@@ -0,0 +1,61 @@
import pytest

from uwsgiconf import Section
from uwsgiconf.exceptions import ConfigurationError


def test_subscriptions_basics(assert_lines):

assert_lines([
'subscription-mountpoint = 2',
], Section().subscriptions.set_server_params(mountpoints_depth=2))

assert_lines([
'subscriptions-sign-check',
], Section().subscriptions.set_server_verification_params(digest_algo='SHA1'), assert_in=False)

assert_lines([
'subscriptions-sign-check = SHA1:/here',
'subscriptions-sign-skip-uid = 1001',
'subscriptions-sign-skip-uid = 1002',
], Section().subscriptions.set_server_verification_params(
digest_algo='SHA1', dir_cert='/here', no_check_uid=[1001, 1002]))

assert_lines([
'start-unsubscribed = true',
], Section().subscriptions.set_client_params(start_unsubscribed=True))

# Subscribing:

with pytest.raises(ConfigurationError): # both key and server omitted
Section().subscriptions.subscribe(balancing_weight=2)

assert_lines([
'key=pythonz.net',
'server=127.0.0.1:4040',
], Section().subscriptions.subscribe('127.0.0.1:4040', 'pythonz.net'))

# SNI
assert_lines([
'socket=0',
'key=mydomain.it',
'sni_crt=/foo/bar.crt,sni_key=/foo/bar.key',
], Section().subscriptions.subscribe(key='mydomain.it', address=0, sni_cert='/foo/bar.crt', sni_key='/foo/bar.key'))

# algos
algo = Section.subscriptions.algorithms.weighted_least_reference_count(2)
assert_lines([
'backup=2',
'algo=wlrc',
], Section().subscriptions.subscribe('127.0.0.1:4040', balancing_algo=algo))

# modifiers
assert_lines([
'modifier1=31',
'modifier2=42',
], Section().subscriptions.subscribe('127.0.0.1:4040', modifier=Section.routing.modifiers.message(42)))

# signing
assert_lines([
'sign=SHA1:myssh001',
], Section().subscriptions.subscribe('127.0.0.1:4040', signing=('SHA1', 'myssh001')))
3 changes: 3 additions & 0 deletions uwsgiconf/config.py
Expand Up @@ -86,6 +86,9 @@ class Section(OptionsGroup):
statics = Options(Statics) # type: Statics
"""Static file serving options group."""

subscriptions = Options(Subscriptions) # type: Subscriptions
"""Subscription services options group."""

workers = Options(Workers) # type: Workers
"""Workers options group."""

Expand Down
1 change: 1 addition & 0 deletions uwsgiconf/options/__init__.py
Expand Up @@ -13,4 +13,5 @@
from .routing import Routing
from .spooler import Spooler
from .statics import Statics
from .subscriptions import Subscriptions
from .workers import Workers, Cheapening
214 changes: 214 additions & 0 deletions uwsgiconf/options/subscriptions.py
@@ -0,0 +1,214 @@
from ..base import OptionsGroup
from ..exceptions import ConfigurationError
from ..utils import make_key_val_string, filter_locals
from .subscriptions_algos import *


class Subscriptions(OptionsGroup):
"""
This allows some uWSGI instances to announce their presence to subscriptions managing server,
which in its turn can address those nodes (e.g. delegate request processing to them)
and automatically remove dead nodes from the pool.
.. note:: Subscription system in many ways relies on Master Process.
.. warning:: The subscription system is meant for "trusted" networks.
All of the nodes in your network can potentially make a total mess with it.
* http://uwsgi.readthedocs.io/en/latest/SubscriptionServer.html
"""

class algorithms(object):
"""Balancing algorithms available to use with ``subscribe``."""

ip_hash = IpHash
least_reference_count = LeastReferenceCount
weighted_least_reference_count = WeightedLeastReferenceCount
weighted_round_robin = WeightedRoundRobin

def set_server_params(
self, client_notify_address=None, mountpoints_depth=None, require_vassal=None,
tolerance=None, tolerance_inactive=None, key_dot_split=None):
"""Sets subscription server related params.
:param str|unicode client_notify_address: Set the notification socket for subscriptions.
When you subscribe to a server, you can ask it to "acknowledge" the acceptance of your request.
pointing address (Unix socket or UDP), on which your instance will bind and
the subscription server will send acknowledgements to.
:param int mountpoints_depth: Enable support of mountpoints of certain depth for subscription system.
* http://uwsgi-docs.readthedocs.io/en/latest/SubscriptionServer.html#mountpoints-uwsgi-2-1
:param bool require_vassal: Require a vassal field (see ``subscribe``) from each subscription.
:param int tolerance: Subscription reclaim tolerance (seconds).
:param int tolerance_inactive: Subscription inactivity tolerance (seconds).
:param bool key_dot_split: Try to fallback to the next part in (dot based) subscription key.
Used, for example, in SNI.
"""
# todo notify-socket (fallback) relation
self._set('subscription-notify-socket', client_notify_address)
self._set('subscription-mountpoint', mountpoints_depth)
self._set('subscription-vassal-required', require_vassal, cast=bool)
self._set('subscription-tolerance', tolerance)
self._set('subscription-tolerance-inactive', tolerance_inactive)
self._set('subscription-dotsplit', key_dot_split, cast=bool)

return self._section

def set_server_verification_params(
self, digest_algo=None, dir_cert=None, tolerance=None, no_check_uid=None,
dir_credentials=None, pass_unix_credentials=None):
"""Sets peer verification params for subscription server.
These are for secured subscriptions.
:param str|unicode digest_algo: Digest algorithm. Example: SHA1
.. note:: Also requires ``dir_cert`` to be set.
:param str|unicode dir_cert: Certificate directory.
.. note:: Also requires ``digest_algo`` to be set.
:param int tolerance: Maximum tolerance (in seconds) of clock skew for secured subscription system.
Default: 24h.
:param str|unicode|int|list[str|unicode|int] no_check_uid: Skip signature check for the specified uids
when using unix sockets credentials.
:param str|unicode|list[str|unicode] dir_credentials: Directories to search for subscriptions
key credentials.
:param bool pass_unix_credentials: Enable management of SCM_CREDENTIALS in subscriptions UNIX sockets.
"""
if digest_algo and dir_cert:
self._set('subscriptions-sign-check', '%s:%s' % (digest_algo, dir_cert))

self._set('subscriptions-sign-check-tolerance', tolerance)
self._set('subscriptions-sign-skip-uid', no_check_uid, multi=True)
self._set('subscriptions-credentials-check', dir_credentials, multi=True)
self._set('subscriptions-use-credentials', pass_unix_credentials, cast=bool)

return self._section

def set_client_params(
self, start_unsubscribed=None, clear_on_exit=None, unsubscribe_on_reload=None,
announce_interval=None):
"""Sets subscribers related params.
:param bool start_unsubscribed: Configure subscriptions but do not send them.
.. note:: Useful with master FIFO.
:param bool clear_on_exit: Force clear instead of unsubscribe during shutdown.
:param bool unsubscribe_on_reload: Force unsubscribe request even during graceful reload.
:param int announce_interval: Send subscription announce at the specified interval. Default: 10 master cycles.
"""
self._set('start-unsubscribed', start_unsubscribed, cast=bool)
self._set('subscription-clear-on-shutdown', clear_on_exit, cast=bool)
self._set('unsubscribe-on-graceful-reload', unsubscribe_on_reload, cast=bool)
self._set('subscribe-freq', announce_interval)

return self._section

def subscribe(
self, server=None, key=None, address=None, address_vassal=None,
balancing_weight=None, balancing_algo=None, modifier=None, signing=None, check_file=None, protocol=None,
sni_cert=None, sni_key=None, sni_client_ca=None):
"""Registers a subscription intent.
:param str|unicode server: Subscription server address (UDP or UNIX socket).
Examples:
* 127.0.0.1:7171
:param str|unicode key: Key to subscribe. Generally the domain name (+ optional '/< mountpoint>').
Examples:
* mydomain.it/foo
* mydomain.it/foo/bar (requires ``mountpoints_depth=2``)
* mydomain.it
* ubuntu64.local:9090
:param str|unicode|int address: Address to subscribe (the value for the key)
or zero-based internal socket number (integer).
:param str|unicode address: Vassal node address.
:param int balancing_weight: Load balancing value. Default: 1.
:param balancing_algo: Load balancing algorithm to use. See ``balancing_algorithms``
.. note:: Since 2.1
:param Modifier modifier: Routing modifier object. See ``.routing.modifiers``
:param list|tuple signing: Signing basics, expects two elements list/tuple:
(signing_algorithm, key).
Examples:
* SHA1:idlessh001
:param str|unicode check_file: If this file exists the subscription packet is sent,
otherwise it is skipped.
:param str|unicode protocol: the protocol to use, by default it is ``uwsgi``.
See ``.networking.socket_types``.
.. note:: Since 2.1
:param str|unicode sni_cert: Certificate file to use for SNI proxy management.
* http://uwsgi.readthedocs.io/en/latest/SNI.html#subscription-system-and-sni
:param str|unicode sni_key: sni_key Key file to use for SNI proxy management.
* http://uwsgi.readthedocs.io/en/latest/SNI.html#subscription-system-and-sni
:param str|unicode sni_client_ca: Ca file to use for SNI proxy management.
* http://uwsgi.readthedocs.io/en/latest/SNI.html#subscription-system-and-sni
"""
# todo params: inactive (inactive slot activation)

if not any((server, key)):
raise ConfigurationError('Subscription requires `server` or `key` to be set.')

address_key = 'addr'
if isinstance(address, int):
address_key = 'socket'

if balancing_algo:
backup = getattr(balancing_algo, 'backup_level', None)

if signing:
signing = ':'.join(signing)

if modifier:
modifier1 = modifier
if modifier.submod:
modifier2 = modifier.submod

rule = make_key_val_string(
filter_locals(locals(), drop=['address_key', 'modifier']),
aliases={
'address': address_key,
'address_vassal': 'vassal',
'signing': 'sign',
'check_file': 'check',
'balancing_weight': 'weight',
'balancing_algo': 'algo',
'protocol': 'proto',
'sni_cert': 'sni_crt',
'sni_client_ca': 'sni_ca',
},
)

self._set('subscribe2', rule)

return self._section
40 changes: 40 additions & 0 deletions uwsgiconf/options/subscriptions_algos.py
@@ -0,0 +1,40 @@
from ..base import ParametrizedValue


class BalancingAlgorithm(ParametrizedValue):

name_separator = ''


class BalancingAlgorithmWithBackup(BalancingAlgorithm):

def __init__(self, backup_level=None):
self.backup_level = backup_level
super(BalancingAlgorithmWithBackup, self).__init__()


class WeightedRoundRobin(BalancingAlgorithmWithBackup):
"""Weighted round robin algorithm with backup support.
The default algorithm.
"""

name = 'wrr'


class LeastReferenceCount(BalancingAlgorithmWithBackup):
"""Least reference count algorithm with backup support."""

name = 'lrc'


class WeightedLeastReferenceCount(BalancingAlgorithmWithBackup):
"""Weighted least reference count algorithm with backup support."""

name = 'wlrc'


class IpHash(BalancingAlgorithmWithBackup):
"""IP hash algorithm with backup support."""

name = 'iphash'

0 comments on commit 6dd27b5

Please sign in to comment.