Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Commit

Permalink
Merge pull request #176 from aio-libs/mpsc
Browse files Browse the repository at this point in the history
Multi-producers, single-consumer pub/sub queue; abc module;
  • Loading branch information
popravich authored Dec 28, 2016
2 parents 9144bc0 + b0966d2 commit bb2a66e
Show file tree
Hide file tree
Showing 7 changed files with 811 additions and 161 deletions.
2 changes: 1 addition & 1 deletion aioredis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .connection import RedisConnection, create_connection
from .commands import Redis, create_redis, create_reconnecting_redis
from .pool import RedisPool, create_pool
from .util import Channel
from .pubsub import Channel
from .errors import (
ConnectionClosedError,
MultiExecError,
Expand Down
157 changes: 157 additions & 0 deletions aioredis/abc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""The module provides connection and connections pool interfaces.
These are intended to be used for implementing custom connection managers.
"""
import abc
import asyncio
try:
from abc import ABC
except ImportError:
class ABC(metaclass=abc.ABCMeta):
pass


__all__ = [
'AbcConnection',
'AbcPool',
'AbcChannel',
]


class AbcConnection(ABC):
"""Abstract connection interface."""

@abc.abstractmethod
def execute(self):
"""Execute redis command."""

@abc.abstractmethod
def execute_pubsub(self):
"""Execute Redis (p)subscribe/(p)unsubscribe commands."""

@abc.abstractmethod
def close(self):
"""Perform connection(s) close and resources cleanup."""

@asyncio.coroutine
@abc.abstractmethod
def wait_closed(self):
"""
Coroutine waiting until all resources are closed/released/cleaned up.
"""

@property
@abc.abstractmethod
def closed(self):
"""Flag indicating if connection is closing or already closed."""

@property
@abc.abstractmethod
def db(self):
"""Currently selected DB index."""

@property
@abc.abstractmethod
def encoding(self):
"""Current set connection codec."""

@property
@abc.abstractmethod
def in_pubsub(self):
"""Returns number of subscribed channels.
Can be tested as bool indicating Pub/Sub mode state.
"""

@property
@abc.abstractmethod
def pubsub_channels(self):
"""Read-only channels dict."""

@property
@abc.abstractmethod
def pubsub_patterns(self):
"""Read-only patterns dict."""

@property
@abc.abstractmethod
def address(self):
"""Connection address."""


class AbcPool(AbcConnection):
"""Abstract connections pool interface.
Inherited from AbcConnection so both common interface
for executing Redis commands.
"""

@abc.abstractmethod
def get_connection(self): # TODO: arguments
"""Gets free connection from pool in a sync way.
If no connection available — returns None
"""

@asyncio.coroutine
@abc.abstractmethod
def acquire(self): # TODO: arguments
"""Acquires connection from pool."""

@abc.abstractmethod
def release(self): # TODO: arguments
"""Releases connection to pool."""

@property
@abc.abstractmethod
def address(self):
"""Connection address or None."""


class AbcChannel(ABC):
"""Abstract Pub/Sub Channel interface."""

@property
@abc.abstractmethod
def name(self):
"""Encoded channel name or pattern."""

@property
@abc.abstractmethod
def is_pattern(self):
"""Boolean flag indicating if channel is pattern channel."""

@property
@abc.abstractmethod
def is_active(self):
"""Flag indicating that channel has unreceived messages
and not marked as closed."""

@asyncio.coroutine
@abc.abstractmethod
def get(self):
"""Wait and return new message.
Will raise ChannelClosedError if channel is not active.
"""

# wait_message is not required; details of implementation
# @abc.abstractmethod
# def wait_message(self):
# pass

@abc.abstractmethod
def put_nowait(self, data):
"""Send data to channel.
Called by RedisConnection when new message received.
For pattern subscriptions data will be a tuple of
channel name and message itself.
"""

@abc.abstractmethod
def close(self):
"""Marks Channel as closed, no more messages will be sent to it.
Called by RedisConnection when channel is unsubscribed
or connection is closed.
"""
5 changes: 3 additions & 2 deletions aioredis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
_set_result,
_set_exception,
coerced_keys_dict,
Channel,
decode,
async_task,
create_future,
Expand All @@ -24,6 +23,8 @@
ReplyError,
WatchVariableError,
)
from .pubsub import Channel
from .abc import AbcChannel
from .log import logger


Expand Down Expand Up @@ -266,7 +267,7 @@ def execute_pubsub(self, command, *channels):
raise TypeError("No channels/patterns supplied")
is_pattern = len(command) in (10, 12)
mkchannel = partial(Channel, is_pattern=is_pattern, loop=self._loop)
channels = [ch if isinstance(ch, Channel) else mkchannel(ch)
channels = [ch if isinstance(ch, AbcChannel) else mkchannel(ch)
for ch in channels]
if not all(ch.is_pattern == is_pattern for ch in channels):
raise ValueError("Not all channels {} match command {}"
Expand Down
Loading

0 comments on commit bb2a66e

Please sign in to comment.