Skip to content

Commit

Permalink
Updating package
Browse files Browse the repository at this point in the history
  • Loading branch information
akadlec committed Jan 9, 2022
1 parent 0570980 commit eca23fd
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 22 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"cweagans/composer-patches": "^1.7",
"contributte/console": "^0.9",
"fastybird/datetime-factory": "^0.2",
"fastybird/exchange-plugin": "^0.4",
"fastybird/exchange-plugin": "^0.5",
"fastybird/modules-metadata": "^0.22",
"fastybird/socket-server-factory": "^0.1",
"nette/bootstrap": "^3.0",
Expand Down
23 changes: 8 additions & 15 deletions redisdb_exchange_plugin/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

# Library dependencies
import modules_metadata.exceptions as metadata_exceptions
from exchange_plugin.consumer import IConsumer
from exchange_plugin.consumer import Consumer
from kink import inject
from modules_metadata.loader import load_schema_by_routing_key
from modules_metadata.routing import RoutingKey
Expand All @@ -52,7 +52,7 @@ class RedisExchange(Thread):

__redis_client: RedisClient

__exchange_consumer: Optional[IConsumer] = None
__exchange_consumer: Consumer

__logger: Logger

Expand All @@ -64,7 +64,7 @@ def __init__(
self,
redis_client: RedisClient,
logger: Logger,
exchange_consumer: Optional[IConsumer] = None,
exchange_consumer: Consumer,
) -> None:
super().__init__(name="Redis DB exchange client thread", daemon=True)

Expand Down Expand Up @@ -125,12 +125,6 @@ def is_healthy(self) -> bool:

# -----------------------------------------------------------------------------

def register_consumer(self, consumer: IConsumer) -> None:
"""Register exchange consumer"""
self.__exchange_consumer = consumer

# -----------------------------------------------------------------------------

def __receive(self, data: Dict) -> None:
try:
origin = self.__validate_origin(origin=data.get("origin", None))
Expand All @@ -150,12 +144,11 @@ def __receive(self, data: Dict) -> None:
data=data.get("data", None),
)

if self.__exchange_consumer is not None:
self.__exchange_consumer.consume(
origin=origin,
routing_key=routing_key,
data=data,
)
self.__exchange_consumer.consume(
origin=origin,
routing_key=routing_key,
data=data,
)

else:
self.__logger.warning("Received exchange message is not valid")
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
redis>=3.5
fastybird-exchange-plugin>=0.4
fastybird-exchange-plugin>=0.5
fastybird-modules-metadata>=0.22
kink>=0.6
setuptools>=57.4
9 changes: 4 additions & 5 deletions src/Subscribers/AsyncClientSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
class AsyncClientSubscriber implements EventDispatcher\EventSubscriberInterface
{

/** @var ExchangePluginConsumer\IConsumer|null */
/** @var ExchangePluginConsumer\IConsumer */
private ?ExchangePluginConsumer\IConsumer $consumer;

/** @var ModulesMetadataLoaders\ISchemaLoader */
Expand All @@ -58,7 +58,7 @@ public function __construct(
ModulesMetadataLoaders\ISchemaLoader $schemaLoader,
ModulesMetadataSchemas\IValidator $validator,
PsrEventDispatcher\EventDispatcherInterface $dispatcher,
?ExchangePluginConsumer\IConsumer $consumer = null,
?ExchangePluginConsumer\IConsumer $consumer,
?Log\LoggerInterface $logger = null
) {
$this->schemaLoader = $schemaLoader;
Expand Down Expand Up @@ -146,9 +146,8 @@ private function handle(
}

try {
if ($this->consumer !== null) {
$this->consumer->consume($origin, $routingKey, $data);
}
$this->consumer->consume($origin, $routingKey, $data);

} catch (Exceptions\UnprocessableMessageException $ex) {
// Log error consume reason
$this->logger->error('[FB:PLUGIN:REDISDB_EXCHANGE] Message could not be handled', [
Expand Down

0 comments on commit eca23fd

Please sign in to comment.