Skip to content

Commit

Permalink
Feature/configurable discovery (#67)
Browse files Browse the repository at this point in the history
This commit adds a `selector` parameter to the connect function. The selector is a function that chooses a node from the list returned by gossip. By default we `select_random` but we also provide functions for `prefer_master` and `prefer_replica`
  • Loading branch information
bobthemighty committed Sep 17, 2018
1 parent f45ecdb commit 4361adf
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 24 deletions.
8 changes: 8 additions & 0 deletions CHANGES.md
@@ -1,3 +1,9 @@
## [0.6-alpha-2] - 2018-09-17
Discovery now supports "selectors" to control how we pick a node from gossip

## [0.6-alpha-1] - 2018-09-14
Added support for catch-up subscriptions.

## [0.5] - 2018-04-27
### Breaking changes
- Dropped the ConnectionContextManager class.
Expand All @@ -16,6 +22,8 @@
- `published_event` reversed order of type and stream


[0.6.0-alpha-2]: https://github.com/madecom/photon-pump/compare/v0.6.0-alpha-1...v0.6.0-alpha-2
[0.6.0-alpha-1]: https://github.com/madecom/photon-pump/compare/v0.5.0...v0.6.0-alpha-1
[0.5]: https://github.com/madecom/photon-pump/compare/v0.4.0...v0.5.0
[0.4]: https://github.com/madecom/photon-pump/compare/v0.3.0...v0.4.0
[0.3]: https://github.com/madecom/photon-pump/compare/v0.2.5...v0.3
Expand Down
8 changes: 4 additions & 4 deletions README.rst
Expand Up @@ -20,7 +20,7 @@ Photon pump is available on the `cheese shop`_. ::

You will need to install lib-protobuf 3.2.0 or above.

Documentation is available on `Read the docs`_. ::
Documentation is available on `Read the docs`_.

Basic Usage
-----------
Expand Down Expand Up @@ -207,12 +207,12 @@ Sometimes we want to watch a stream continuously and be notified when a new even
A persistent subscription stores its state on the server. When your application restarts, you can connect to the subscription again and continue where you left off. Multiple clients can connect to the same persistent subscription to support competing consumer scenarios. To support these features, persistent subscriptions have to run against the master node of an Eventstore cluster.
Firstly, we need to create the subscription.
Firstly, we need to :meth:`create the subscription <photonpump.connection.Client.create_subscription>`.
>>> async def create_subscription(subscription_name, stream_name, conn):
>>> await conn.create_subscription(subscription_name, stream_name)
Once we have a subscription, we can connect to it to begin receiving events. A persistent subscription exposes an `events` property, which acts like an asynchronous iterator.
Once we have a subscription, we can :meth:`connect to it <photonpump.connection.Client.connect_subscription>` to begin receiving events. A persistent subscription exposes an `events` property, which acts like an asynchronous iterator.
>>> async def read_events_from_subscription(subscription_name, stream_name, conn):
>>> subscription = await conn.connect_subscription(subscription_name, stream_name)
Expand All @@ -238,7 +238,7 @@ Volatile subsciptions do not support event acknowledgement.
High-Availability Scenarios
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Eventstore supports an HA-cluster deployment topology. In this scenario, Eventstore runs a master node and multiple slaves. Some operations, particularly subscriptions and projections, are handled only by the master node. To connect to an HA-cluster and automatically find the master node, photonpump supports cluster discovery.
Eventstore supports an HA-cluster deployment topology. In this scenario, Eventstore runs a master node and multiple slaves. Some operations, particularly persistent subscriptions and projections, are handled only by the master node. To connect to an HA-cluster and automatically find the master node, photonpump supports cluster discovery.
The cluster discovery interrogates eventstore gossip to find the active master. You can provide the IP of a maching in the cluster, or a DNS name that resolves to some members of the cluster, and photonpump will discover the others.
Expand Down
3 changes: 3 additions & 0 deletions docs/api.rst
Expand Up @@ -10,3 +10,6 @@ Photonpump API Reference

.. automodule:: photonpump.messages
:members:

.. automodule:: photonpump.discovery
:members:
38 changes: 35 additions & 3 deletions photonpump/connection.py
Expand Up @@ -8,7 +8,7 @@

from . import conversations as convo
from . import messages as msg
from .discovery import DiscoveryRetryPolicy, NodeService, get_discoverer
from .discovery import DiscoveryRetryPolicy, NodeService, get_discoverer, select_random

HEADER_LENGTH = 1 + 1 + 16
SIZE_UINT_32 = 4
Expand Down Expand Up @@ -972,7 +972,6 @@ async def stop(self):
self.dispatch_loop,
self.heartbeat_loop,
loop=self.loop,

return_exceptions=True,
)
self.transport.close()
Expand All @@ -990,6 +989,7 @@ def connect(
password=None,
loop=None,
name=None,
selector=select_random,
) -> Client:
""" Create a new client.
Expand All @@ -1015,6 +1015,36 @@ def connect(
>>> async with connect(discovery_host="eventstore.test") as c:
>>> await c.ping()
The discovery host returns gossip data about the cluster. We use the
gossip to select a node at random from the avaialble cluster members.
If you're using
:meth:`persistent subscriptions <photonpump.connection.Client.create_subscription>`
you will always want to connect to the master node of the cluster.
The selector parameter is a function that chooses an available node from
the gossip result. To select the master node, use the
:func:`photonpump.discovery.prefer_master` function. This function will return
the master node if there is a live master, and a random replica otherwise.
All requests to the server can be made with the require_master flag which
will raise an error if the current node is not a master.
>>> async with connect(
>>> discovery_host="eventstore.test",
>>> selector=discovery.prefer_master,
>>> ) as c:
>>> await c.ping(require_master=True)
Conversely, you might want to avoid connecting to the master node for reasons
of scalability. For this you can use the
:func:`photonpump.discovery.prefer_replica` function.
>>> async with connect(
>>> discovery_host="eventstore.test",
>>> selector=discovery.prefer_replica,
>>> ) as c:
>>> await c.ping()
For some operations, you may need to authenticate your requests by
providing a username and password to the client.
Expand All @@ -1040,9 +1070,11 @@ def connect(
username: The username to use when communicating with eventstore.
password: The password to use when communicating with eventstore.
loop:An Asyncio event loop.
selector: An optional function that selects one element from a list of
:class:`photonpump.disovery.DiscoveredNode` elements.
"""
discovery = get_discoverer(host, port, discovery_host, discovery_port)
discovery = get_discoverer(host, port, discovery_host, discovery_port, selector)
dispatcher = MessageDispatcher(name=name, loop=loop)
connector = Connector(discovery, dispatcher, name=name)

Expand Down
68 changes: 55 additions & 13 deletions photonpump/discovery.py
Expand Up @@ -4,7 +4,7 @@
import socket
from enum import IntEnum
from operator import attrgetter
from typing import Iterable, List, NamedTuple, Optional
from typing import Callable, Iterable, List, NamedTuple, Optional

import aiodns
import aiohttp
Expand All @@ -26,7 +26,7 @@ class NodeState(IntEnum):
Shutdown = 10


INELIGIBLE_STATE = [NodeState.Manager, NodeState.ShuttingDown, NodeState.Shutdown]
ELIGIBLE_STATE = [NodeState.Clone, NodeState.Slave, NodeState.Master]


class NodeService(NamedTuple):
Expand All @@ -47,24 +47,60 @@ class DiscoveredNode(NamedTuple):
external_http: NodeService


Selector = Callable[[List[DiscoveredNode]], Optional[DiscoveredNode]]


def first(elems: Iterable):
LOG.info(elems)

for elem in elems:
return elem


def select(gossip: List[DiscoveredNode]) -> Optional[DiscoveredNode]:
def prefer_master(nodes: List[DiscoveredNode]) -> Optional[DiscoveredNode]:
"""
Select the master if available, otherwise fall back to a replica.
"""
return max(nodes, key=attrgetter("state"))


def prefer_replica(nodes: List[DiscoveredNode]) -> Optional[DiscoveredNode]:
"""
Select a random replica if any are available or fall back to the master.
"""
masters = [node for node in nodes if node.state == NodeState.Master]
replicas = [node for node in nodes if node.state != NodeState.Master]

if replicas:
return random.choice(replicas)
else:
# if you have more than one master then you're on your own, bud.

return masters[0]


def select_random(nodes: List[DiscoveredNode]) -> Optional[DiscoveredNode]:
"""
Return a random node.
"""
return random.choice(nodes)


def select(
gossip: List[DiscoveredNode], selector: Optional[Selector] = None
) -> Optional[DiscoveredNode]:
eligible_nodes = [
node for node in gossip if node.is_alive and node.state not in INELIGIBLE_STATE
node for node in gossip if node.is_alive and node.state in ELIGIBLE_STATE
]

LOG.debug("Selecting node from gossip members: %r" % eligible_nodes)
LOG.debug("Selecting node from gossip members: %r", eligible_nodes)

if not eligible_nodes:
return None

return max(eligible_nodes, key=attrgetter("state"))
selector = selector or prefer_master

return selector(eligible_nodes)


def read_gossip(data):
Expand All @@ -73,7 +109,7 @@ def read_gossip(data):

return []

LOG.debug(f"Received gossip for { len(data['members']) } nodes")
LOG.debug("Received gossip for {%s} nodes", len(data["members"]))

return [
DiscoveredNode(
Expand Down Expand Up @@ -142,7 +178,7 @@ async def reset_to_dns(self):
random.shuffle(result)

if result:
LOG.debug(f"Found { len(result) } hosts for name {self.name}")
LOG.debug("Found %s hosts for name %s", len(result), self.name)
current_attempt = 0
self.seeds = [
NodeService(address=node.host, port=self.port, secure_port=None)
Expand Down Expand Up @@ -182,15 +218,15 @@ async def fetch_new_gossip(session, seed):
if not seed:
return []

LOG.debug(f"Fetching gossip from http://{seed.address}:{seed.port}/gossip")
LOG.debug("Fetching gossip from http://%s:%s/gossip", seed.address, seed.port)
try:
resp = await session.get(f"http://{seed.address}:{seed.port}/gossip")
data = await resp.json()

return read_gossip(data)
except aiohttp.ClientError:
LOG.exception(
f"Failed loading gossip from http://{seed.address}:{seed.port}/gossip"
"Failed loading gossip from http://%s:%s/gossip", seed.address, seed.port
)

return None
Expand All @@ -209,6 +245,7 @@ async def discover(self):
if self.failed:
raise DiscoveryFailed()
LOG.debug("SingleNodeDiscovery returning node %s", self.node)

return self.node


Expand Down Expand Up @@ -245,12 +282,13 @@ def record_failure(self, node):


class ClusterDiscovery:
def __init__(self, seed_finder, http_session, retry_policy):
def __init__(self, seed_finder, http_session, retry_policy, selector):
self.session = http_session
self.seeds = seed_finder
self.last_gossip = []
self.best_node = None
self.retry_policy = retry_policy
self.selector = selector

def close(self):
self.session.close()
Expand All @@ -263,7 +301,7 @@ def record_gossip(self, node, gossip):

for member in gossip:
self.seeds.add_node(member.external_http)
self.best_node = select(gossip)
self.best_node = select(gossip, self.selector)
self.retry_policy.record_success(node)

async def get_gossip(self):
Expand Down Expand Up @@ -343,7 +381,9 @@ def record_failure(self, node):
self.stats.record_failure(node)


def get_discoverer(host, port, discovery_host, discovery_port):
def get_discoverer(
host, port, discovery_host, discovery_port, selector: Optional[Selector] = None
):
if discovery_host is None:
LOG.info("Using single-node discoverer")

Expand All @@ -358,6 +398,7 @@ def get_discoverer(host, port, discovery_host, discovery_port):
StaticSeedFinder([NodeService(discovery_host, discovery_port, None)]),
session,
DiscoveryRetryPolicy(),
selector,
)
except socket.error:
LOG.info("Using cluster node discovery with DNS")
Expand All @@ -367,4 +408,5 @@ def get_discoverer(host, port, discovery_host, discovery_port):
DnsSeedFinder(discovery_host, resolver, discovery_port),
session,
DiscoveryRetryPolicy(),
selector,
)
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -33,7 +33,7 @@ def run_tests(self):

setup(
name="photon-pump",
version="0.5.0",
version="0.6.0-alpha-2",
url="http://github.com/madedotcom/photon-pump/",
license="MIT",
author="Bob Gregory",
Expand Down
38 changes: 35 additions & 3 deletions test/discovery_test.py
Expand Up @@ -18,6 +18,8 @@
get_discoverer,
read_gossip,
select,
prefer_master,
prefer_replica,
)

from . import data
Expand Down Expand Up @@ -214,7 +216,7 @@ async def wait(self, seed):
session = aiohttp.ClientSession()
with aioresponses() as mock:
successful_discoverer = ClusterDiscovery(
StaticSeedFinder([seed]), session, retry
StaticSeedFinder([seed]), session, retry, None
)

mock.get("http://1.2.3.4:2113/gossip", status=500)
Expand Down Expand Up @@ -254,7 +256,7 @@ async def wait(self, seed):
gossip = data.make_gossip("2.3.4.5")
with aioresponses() as mock:
successful_discoverer = ClusterDiscovery(
StaticSeedFinder([seed]), aiohttp.ClientSession(), retry
StaticSeedFinder([seed]), aiohttp.ClientSession(), retry, None
)

mock.get("http://1.2.3.4:2113/gossip", status=500)
Expand Down Expand Up @@ -302,8 +304,38 @@ def mark_failed(self, node):

node = NodeService("2.3.4.5", 1234, None)
finder = spy_seed_finder()
discoverer = ClusterDiscovery(finder, None, None)
discoverer = ClusterDiscovery(finder, None, None, None)

discoverer.mark_failed(node)

assert finder == [node]


@pytest.mark.asyncio
async def test_prefer_replica():
"""
If we ask the discoverer to prefer_replica it should return a replica node
before returning a master.
"""

discoverer = get_discoverer(None, None, "10.0.0.1", 2113, prefer_replica)
gossip = data.make_gossip("10.0.0.1", "10.0.0.2")
with aioresponses() as mock:
mock.get("http://10.0.0.1:2113/gossip", payload=gossip)

assert await discoverer.discover() == NodeService("10.0.0.2", 1113, None)


@pytest.mark.asyncio
async def test_prefer_master():
"""
If we ask the discoverer to prefer_master it should return a master node
before returning a replica.
"""

discoverer = get_discoverer(None, None, "10.0.0.1", 2113, prefer_master)
gossip = data.make_gossip("10.0.0.1", "10.0.0.2")
with aioresponses() as mock:
mock.get("http://10.0.0.1:2113/gossip", payload=gossip)

assert await discoverer.discover() == NodeService("10.0.0.1", 1113, None)

0 comments on commit 4361adf

Please sign in to comment.