Skip to content

Commit

Permalink
Renaming services
Browse files Browse the repository at this point in the history
  • Loading branch information
akadlec committed Jan 10, 2022
1 parent 5944f4e commit 737927b
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 54 deletions.
19 changes: 12 additions & 7 deletions redisdb_exchange_plugin/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@
# Library dependencies
from kink import di

from redisdb_exchange_plugin.client import Client

# Library libs
from redisdb_exchange_plugin.connection import RedisClient
from redisdb_exchange_plugin.exchange import RedisExchange
from redisdb_exchange_plugin.connection import Connection
from redisdb_exchange_plugin.logger import Logger
from redisdb_exchange_plugin.publisher import Publisher

Expand All @@ -42,7 +43,7 @@ def create_container(
di[Logger] = Logger(logger=logger)
di["fb-redisdb-exchange-plugin_logger"] = di[Logger]

di[RedisClient] = RedisClient(
di[Connection] = Connection(
host=str(settings.get("host", "127.0.0.1")) if settings.get("host", None) is not None else "127.0.0.1",
port=int(str(settings.get("port", 6379))),
channel_name=str(settings.get("channel_name", "fb_exchange"))
Expand All @@ -52,10 +53,14 @@ def create_container(
password=str(settings.get("password", None)) if settings.get("password", None) is not None else None,
logger=di[Logger],
)
di["fb-redisdb-exchange-plugin_redis-client"] = di[RedisClient]
di["fb-redisdb-exchange-plugin_redis-connection"] = di[Connection]

di[Publisher] = Publisher(redis_client=di[RedisClient])
di[Publisher] = Publisher(
channel_name=str(settings.get("channel_name", "fb_exchange")),
connection=di[Connection],
logger=di[Logger],
)
di["fb-redisdb-exchange-plugin_publisher"] = di[Publisher]

di[RedisExchange] = RedisExchange(redis_client=di[RedisClient], logger=di[Logger])
di["fb-redisdb-exchange-plugin_exchange"] = di[RedisExchange]
di[Client] = Client(connection=di[Connection], logger=di[Logger])
di["fb-redisdb-exchange-plugin_client"] = di[Client]
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,20 @@
from metadata.validator import validate

# Library libs
from redisdb_exchange_plugin.connection import RedisClient
from redisdb_exchange_plugin.exceptions import HandleDataException, HandleRequestException
from redisdb_exchange_plugin.connection import Connection
from redisdb_exchange_plugin.exceptions import (
HandleDataException,
HandleRequestException,
)
from redisdb_exchange_plugin.logger import Logger


class IConsumer(ABC): # pylint: disable=too-few-public-methods
"""
Redis exchange consumer interface
Redis client consumer interface
@package FastyBird:RedisDbExchangePlugin!
@module consumer
@module client
@author Adam Kadlec <adam.kadlec@fastybird.com>
"""
Expand All @@ -58,17 +61,17 @@ def consume(


@inject
class RedisExchange:
class Client:
"""
Redis data exchange
Redis exchange client
@package FastyBird:RedisDbExchangePlugin!
@module redis
@module client
@author Adam Kadlec <adam.kadlec@fastybird.com>
"""

__redis_client: RedisClient
__connection: Connection

__consumer: Optional[IConsumer]

Expand All @@ -78,11 +81,11 @@ class RedisExchange:

def __init__(
self,
redis_client: RedisClient,
connection: Connection,
logger: Logger,
consumer: IConsumer = None, # type: ignore[assignment]
) -> None:
self.__redis_client = redis_client
self.__connection = connection
self.__logger = logger

self.__consumer = consumer
Expand All @@ -91,16 +94,16 @@ def __init__(

def start(self) -> None:
"""Start exchange services"""
self.__redis_client.subscribe()
self.__connection.subscribe()

self.__logger.info("Starting Redis DB exchange client")

# -----------------------------------------------------------------------------

def stop(self) -> None:
"""Close all opened connections & stop exchange thread"""
self.__redis_client.unsubscribe()
self.__redis_client.close()
self.__connection.unsubscribe()
self.__connection.close()

self.__logger.info("Closing Redis DB exchange client")

Expand All @@ -109,7 +112,7 @@ def stop(self) -> None:
def handle(self) -> None:
"""Process Redis exchange messages"""
try:
data = self.__redis_client.receive()
data = self.__connection.receive()

if data is not None:
self.__receive(data)
Expand Down
33 changes: 5 additions & 28 deletions redisdb_exchange_plugin/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
from typing import Dict, Optional, Union

# Library dependencies
from metadata.routing import RoutingKey
from metadata.types import ModuleOrigin
from redis import Redis
from redis.client import PubSub

Expand All @@ -34,7 +32,7 @@
from redisdb_exchange_plugin.logger import Logger


class RedisClient:
class Connection(Redis):
"""
Redis client
Expand All @@ -44,8 +42,6 @@ class RedisClient:
@author Adam Kadlec <adam.kadlec@fastybird.com>
"""

__redis_client: Redis

__pub_sub: Optional[PubSub] = None

__identifier: str
Expand All @@ -64,7 +60,7 @@ def __init__( # pylint: disable=too-many-arguments
username: Optional[str] = None,
password: Optional[str] = None,
) -> None:
self.__redis_client = Redis(
super().__init__(
host=host,
port=port,
username=username,
Expand All @@ -78,32 +74,13 @@ def __init__( # pylint: disable=too-many-arguments

# -----------------------------------------------------------------------------

def publish(self, origin: ModuleOrigin, routing_key: RoutingKey, data: Optional[Dict]) -> None:
"""Publish message to default exchange channel"""
message = {
"routing_key": routing_key.value,
"origin": origin.value,
"sender_id": self.__identifier,
"data": data,
}

result: int = self.__redis_client.publish(channel=self.__channel_name, message=json.dumps(message))

self.__logger.debug(
"Successfully published message to: %d consumers via RedisDB exchange plugin with key: %s",
result,
routing_key,
)

# -----------------------------------------------------------------------------

def subscribe(self) -> None:
"""Subscribe to default exchange channel"""
if self.__pub_sub is not None:
raise InvalidStateException("Exchange is already subscribed to exchange")

# Connect to pub sub exchange
self.__pub_sub = self.__redis_client.pubsub()
self.__pub_sub = super().pubsub()
# Subscribe to channel
self.__pub_sub.subscribe(self.__channel_name)

Expand Down Expand Up @@ -164,7 +141,7 @@ def close(self) -> None:
"""Close opened connection to Redis database"""
self.unsubscribe()

self.__redis_client.close()
super().close()

# -----------------------------------------------------------------------------

Expand All @@ -176,4 +153,4 @@ def identifier(self) -> str:
# -----------------------------------------------------------------------------

def __del__(self) -> None:
self.__redis_client.close()
super().close()
35 changes: 30 additions & 5 deletions redisdb_exchange_plugin/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
"""

# Python base dependencies
import json
from typing import Dict, Optional

# Library dependencies
from metadata.routing import RoutingKey
from metadata.types import ModuleOrigin

# Library libs
from redisdb_exchange_plugin.connection import RedisClient
from redisdb_exchange_plugin.connection import Connection
from redisdb_exchange_plugin.logger import Logger


class Publisher: # pylint: disable=too-few-public-methods
Expand All @@ -39,18 +41,41 @@ class Publisher: # pylint: disable=too-few-public-methods
@author Adam Kadlec <adam.kadlec@fastybird.com>
"""

__redis_client: RedisClient
__channel_name: str

__connection: Connection

__logger: Logger

# -----------------------------------------------------------------------------

def __init__(
self,
redis_client: RedisClient,
channel_name: str,
connection: Connection,
logger: Logger,
) -> None:
self.__redis_client = redis_client
self.__channel_name = channel_name

self.__connection = connection

self.__logger = logger

# -----------------------------------------------------------------------------

def publish(self, origin: ModuleOrigin, routing_key: RoutingKey, data: Optional[Dict]) -> None:
"""Publish message to Redis exchange"""
self.__redis_client.publish(origin=origin, routing_key=routing_key, data=data)
message = {
"routing_key": routing_key.value,
"origin": origin.value,
"sender_id": self.__connection.identifier,
"data": data,
}

result: int = self.__connection.publish(channel=self.__channel_name, message=json.dumps(message))

self.__logger.debug(
"Successfully published message to: %d consumers via RedisDB exchange plugin with key: %s",
result,
routing_key,
)

0 comments on commit 737927b

Please sign in to comment.