Skip to content

Commit

Permalink
Data storage refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
akadlec committed Jun 19, 2022
1 parent 95f5429 commit b0e7b6c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/Connectors/Connector.php
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public function terminate(): void

$waitingForClosing = true;

# Wait until connector is fully terminated
// Wait until connector is fully terminated
while (
$waitingForClosing
&& ($this->dateTimeFactory->getNow()->getTimestamp() - $now->getTimestamp()) < self::SHUTDOWN_WAITING_DELAY
Expand Down
12 changes: 12 additions & 0 deletions src/Consumers/ConnectorConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
namespace FastyBird\DevicesModule\Consumers;

use FastyBird\DevicesModule\Connectors;
use FastyBird\DevicesModule\DataStorage;
use FastyBird\Exchange\Consumer as ExchangeConsumer;
use FastyBird\Metadata\Entities as MetadataEntities;
use FastyBird\Metadata\Types as MetadataTypes;
use League\Flysystem;
use Nette;
use Psr\Log;

Expand All @@ -38,27 +40,37 @@ final class ConnectorConsumer implements ExchangeConsumer\IConsumer
/** @var Connectors\Connector */
private Connectors\Connector $connector;

/** @var DataStorage\Reader */
private DataStorage\Reader $dataStorageReader;

/** @var Log\LoggerInterface */
private Log\LoggerInterface $logger;

public function __construct(
Connectors\Connector $connector,
DataStorage\Reader $dataStorageReader,
?Log\LoggerInterface $logger = null
) {
$this->connector = $connector;
$this->dataStorageReader = $dataStorageReader;

$this->logger = $logger ?? new Log\NullLogger();
}

/**
* {@inheritDoc}
*
* @throws Nette\Utils\JsonException
* @throws Flysystem\FilesystemException
*/
public function consume(
$source,
MetadataTypes\RoutingKeyType $routingKey,
?MetadataEntities\IEntity $entity
): void {
if ($entity !== null) {
$this->dataStorageReader->read();

$this->connector->handleMessage(new Connectors\Messages\ExchangeMessage($routingKey, $entity));

} else {
Expand Down

0 comments on commit b0e7b6c

Please sign in to comment.