Skip to content

Commit

Permalink
1. Fixes for PR comments. 2. PUBSUB Mock implementation - tests are p…
Browse files Browse the repository at this point in the history
…assing. 3. Python DEVELOPER.md fixes to ignore .env folder when running linters
  • Loading branch information
ikolomi committed Feb 11, 2024
1 parent 4480f76 commit 4153354
Show file tree
Hide file tree
Showing 4 changed files with 750 additions and 335 deletions.
4 changes: 2 additions & 2 deletions python/DEVELOPER.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ Run from the main `/python` folder
> Note: make sure to [generate protobuf with interface files]("#protobuf-interface-files") before running mypy linter
```bash
pip install -r dev_requirements.txt
isort . --profile black --skip-glob python/glide/protobuf
black . --exclude python/glide/protobuf
isort . --profile black --skip-glob python/glide/protobuf --extend-skip-glob .env
black . --exclude "python/glide/protobuf|.env"
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics --exclude=python/glide/protobuf,.env/* --extend-ignore=E230
flake8 . --count --exit-zero --max-complexity=12 --max-line-length=127 --statistics --exclude=python/glide/protobuf,.env/* --extend-ignore=E230
# run type check
Expand Down
108 changes: 59 additions & 49 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import (
Any,
Callable,
Dict,
List,
Mapping,
Expand All @@ -12,8 +17,6 @@
Union,
cast,
get_args,
Callable,
Any,
)

from glide.constants import TOK, TResult
Expand Down Expand Up @@ -1237,101 +1240,108 @@ async def zrem(

@dataclass
class PubSubMsg:
""" Describes incomming message """
message: str
"""
Incomming message
"""Describes incoming message
Attributes:
message (str): Incoming message.
channel (str): Name of an channel that triggered the message.
pattern (Optional[str]): Pattern that triggered the message.
context (Optional[str]): User-provided context for this subscription.
"""

message: str
channel: str
"""
Name of an channel that triggered the message
"""
pattern: Optional[str]
"""
Pattern that triggered the message
"""
context: Optional[Any]

class ChannelModes(Enum):
""" Describes subsciption modes. """
"""Describes subsciption modes."""

Exact = 1
""" Use exact channel names """
Pattern = 2
""" Use channel name patterns """
Sharded = 3
""" Use sharded PUBSUB. See https://redis.io/docs/interact/pubsub/#sharded-pubsub for more details """
""" Use sharded PUBSUB. See https://redis.io/docs/interact/pubsub/#sharded-pubsub for more details """

async def subscribe(self,
channels_or_patterns: Set[str],
channel_mode: ChannelModes,
callback: Callable[[PubSubMsg, Optional[Any]]],
context: Optional[Any]) -> None:
async def subscribe(
self,
channels_or_patterns: Set[str],
channel_mode: CoreCommands.ChannelModes,
callback: Callable[[CoreCommands.PubSubMsg], None],
context: Optional[Any],
) -> None:
"""
Add specific channels or set of channels defined by a pattern to active subscriptions.
The callback is associated with each channel/pattern,
meaning subsequent subsciption for the same channel/pattern will override the previous association, dereferencing the original callback object
meaning subsequent subsciption for the same channel/pattern will override the previous association,
dereferencing the original callback object.
Note1: Overlapping subscriptions produced by combinations of Exact and Pattern modes
count as distinc subscriptions and will produce duplicated messages.
Note2: Patterns are not applicable in Sharded mode.
Note2: Patterns are not applicable in Sharded mode. That is, it is not possible to create a sharded subscription with a pattern,
and pattern/exact subscriptions won't receive messages which were published with using Sharded mode.
See https://redis.io/docs/interact/pubsub for more details.
Args:
channels_or_patterns: Set of channels or patterns to subscibe to.
channel_mode: Mode of operation. See ChannelModes
callback: Callback to be async-triggered for each new message accompanied with a context object. For each channel or pattern there is exactly one active callback.
channels_or_patterns (Set[str]): Set of channels or patterns to subscibe to.
channel_mode (ChannelModes): Mode of operation. See ChannelModes.
callback (Callable[[CoreCommands.PubSubMsg], None]): Callback to be called for each new message.
For each channel or pattern there is exactly one active callback.
Subsequent calls to subscribe() will reset callbacks for existing channels or patterns.
context: User-provided context for this subscription
context (Optional[Any]): User-provided context for this subscription. Will be passed to callback as part of PubSubMsg.
Examples:
>>> await client.subscribe({"local-news"}, ChannelModes.Exact, process_news_message, None)
>>> def process_news_message(message: PubSubMsg) -> None:
>>> print(f"'{message.context}' received new message '{message.message}' from channel '{message.channel}')
>>> await client.subscribe({"local-news"}, ChannelModes.Exact, process_news_message, "example-client")
1 # Subscribes to local-news channel with process_news_message callback function
>>> await client.subscribe({"*"}, ChannelModes.Pattern, process_all_messages, None)
>>> def process_all_messages(message: PubSubMsg) -> None:
>>> print(f"'{message.context}' received new message '{message.message}' from channel '{message.channel}' by pattern '{message.pattern}')
>>> await client.subscribe({"*"}, ChannelModes.Pattern, process_all_messages, "example-client")
2 # Subscribes to all channels with process_all_message callback function
"""
pass
...

async def publish(self,
message: str,
channels: Set[str],
sharded: bool = False) -> int:
async def publish(self, message: str, channel: str, sharded: bool = False) -> int:
"""
Publish message on channels.
See https://redis.io/docs/interact/pubsub for more details.
Args:
message: Message to publish
channels: Set of channels or patterns to publish the message on.
sharded: Use sharded PUBSUB mode.
message (str): Message to publish.
channel (str): Channel to publish the message on.
sharded (bool): Use sharded PUBSUB mode.
Returns:
int: Number of clients that received the message. //TODO: Consider None, do we really want it?
int: Number of clients that received the message.
Examples:
>>> await client.publish("Hi all!", {"global-channel"}, True)
>>> await client.publish("Hi all!", "global-channel", True)
1 # Publishes "Hi all!" message on global-channel channel using sharded mode
>>> await client.publish("Hi to sharded channel1 and channel2!", {"channel1", "channel2"}, True)
2 # Publishes "Hi to sharded channel1 and channel2!" message on channel1 and channel2 using sharded mode
"""
pass

# async def get_pubsub_message(self) -> PubSubMsg:
# pass
...

async def unsubscribe(self,
channels_or_patterns: Set[str],
channel_mode: ChannelModes):
async def unsubscribe(
self, channels_or_patterns: Set[str], channel_mode: CoreCommands.ChannelModes
) -> Set[str]:
"""
Remove specific channels or channel patterns from active susbsciptions.
Note that channels and patterns must be removed separately.
See https://redis.io/docs/interact/pubsub for more details.
Args:
channels_or_patterns: Set of channels or patterns to unsubscibe from.
channel_mode: Mode of operation. See ChannelModes
channels_or_patterns (Set[str]): Set of channels or patterns to unsubscibe from.
Empty set unsubscribes from all channels.
channel_mode (ChannelModes): Mode of operation. See ChannelModes
Returns:
Set[str]: Channles or patterns that where successfully unsubscribed from
Examples:
>>> await client.unsubscribe({"local-news"}, ChannelModes.Exact)
1 # Removes "local-news" channel from active exact subscriptions
>>> await client.unsubscribe({"*"}, ChannelModes.Pattern)
2 # Removes channel pattern "*" from active pattern subsciptions
"""
pass
...
7 changes: 7 additions & 0 deletions python/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ def pytest_addoption(parser):
default=[],
)

parser.addoption(
"--mock-pubsub",
default=True,
action="store_true",
help="Use PUBUSB mock implementation, defaults to `%(default)s`",
)


@pytest.fixture(autouse=True, scope="session")
def call_before_all_pytests(request):
Expand Down
Loading

0 comments on commit 4153354

Please sign in to comment.