From b0e7b6c39205be390c2ce82f639d7073fa8ff3e4 Mon Sep 17 00:00:00 2001 From: Adam Kadlec Date: Sun, 19 Jun 2022 20:59:11 +0200 Subject: [PATCH] Data storage refresh --- src/Connectors/Connector.php | 2 +- src/Consumers/ConnectorConsumer.php | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/Connectors/Connector.php b/src/Connectors/Connector.php index 5d676068..582b77dc 100644 --- a/src/Connectors/Connector.php +++ b/src/Connectors/Connector.php @@ -173,7 +173,7 @@ public function terminate(): void $waitingForClosing = true; - # Wait until connector is fully terminated + // Wait until connector is fully terminated while ( $waitingForClosing && ($this->dateTimeFactory->getNow()->getTimestamp() - $now->getTimestamp()) < self::SHUTDOWN_WAITING_DELAY diff --git a/src/Consumers/ConnectorConsumer.php b/src/Consumers/ConnectorConsumer.php index 5a967ed2..a47f0ba0 100644 --- a/src/Consumers/ConnectorConsumer.php +++ b/src/Consumers/ConnectorConsumer.php @@ -16,9 +16,11 @@ namespace FastyBird\DevicesModule\Consumers; use FastyBird\DevicesModule\Connectors; +use FastyBird\DevicesModule\DataStorage; use FastyBird\Exchange\Consumer as ExchangeConsumer; use FastyBird\Metadata\Entities as MetadataEntities; use FastyBird\Metadata\Types as MetadataTypes; +use League\Flysystem; use Nette; use Psr\Log; @@ -38,20 +40,28 @@ final class ConnectorConsumer implements ExchangeConsumer\IConsumer /** @var Connectors\Connector */ private Connectors\Connector $connector; + /** @var DataStorage\Reader */ + private DataStorage\Reader $dataStorageReader; + /** @var Log\LoggerInterface */ private Log\LoggerInterface $logger; public function __construct( Connectors\Connector $connector, + DataStorage\Reader $dataStorageReader, ?Log\LoggerInterface $logger = null ) { $this->connector = $connector; + $this->dataStorageReader = $dataStorageReader; $this->logger = $logger ?? new Log\NullLogger(); } /** * {@inheritDoc} + * + * @throws Nette\Utils\JsonException + * @throws Flysystem\FilesystemException */ public function consume( $source, @@ -59,6 +69,8 @@ public function consume( ?MetadataEntities\IEntity $entity ): void { if ($entity !== null) { + $this->dataStorageReader->read(); + $this->connector->handleMessage(new Connectors\Messages\ExchangeMessage($routingKey, $entity)); } else {