Skip to content

Commit

Permalink
Refactoring connector factory (#3)
Browse files Browse the repository at this point in the history
* Refactored connector command
  • Loading branch information
akadlec committed Jul 5, 2022
1 parent a97f835 commit 9288466
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 298 deletions.
4 changes: 2 additions & 2 deletions composer.json
Expand Up @@ -43,9 +43,9 @@
"ext-pcntl": "*",
"contributte/flysystem": "^0.3.0",
"cweagans/composer-patches": "^1.7",
"fastybird/exchange": "^0.47",
"fastybird/exchange": "^0.48",
"fastybird/json-api": "^0.10",
"fastybird/metadata": "^0.65",
"fastybird/metadata": "^0.66",
"fastybird/simple-auth": "^0.3",
"ipub/doctrine-dynamic-discriminator-map": "^1.4",
"ipub/doctrine-timestampable": "^1.5",
Expand Down
2 changes: 1 addition & 1 deletion fastybird_devices_module/__init__.py
Expand Up @@ -18,4 +18,4 @@
Devices module
"""

__version__ = "0.67.0"
__version__ = "0.68.0"
2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "@fastybird/devices-module",
"version": "0.66.0",
"version": "0.68.0",
"description": "Devices module data model plugin",
"keywords": [
"devices",
Expand Down
183 changes: 175 additions & 8 deletions src/Commands/ConnectorCommand.php
Expand Up @@ -15,20 +15,28 @@

namespace FastyBird\DevicesModule\Commands;

use FastyBird\DateTimeFactory;
use FastyBird\DevicesModule\Connectors;
use FastyBird\DevicesModule\DataStorage;
use FastyBird\DevicesModule\Entities;
use FastyBird\DevicesModule\Events;
use FastyBird\DevicesModule\Exceptions;
use FastyBird\DevicesModule\Models;
use FastyBird\DevicesModule\Queries;
use FastyBird\Metadata\Entities as MetadataEntities;
use FastyBird\Metadata\Types as MetadataTypes;
use League\Flysystem;
use Nette\Localization;
use Nette\Utils;
use Psr\EventDispatcher as PsrEventDispatcher;
use Psr\Log;
use Ramsey\Uuid;
use React\EventLoop;
use Symfony\Component\Console;
use Symfony\Component\Console\Input;
use Symfony\Component\Console\Output;
use Symfony\Component\Console\Style;
use Throwable;

/**
* Module connector command
Expand All @@ -41,12 +49,29 @@
class ConnectorCommand extends Console\Command\Command
{

/** @var Connectors\Connector */
private Connectors\Connector $connector;
private const SHUTDOWN_WAITING_DELAY = 3;

/** @var Connectors\ConnectorFactory */
private Connectors\ConnectorFactory $factory;

/** @var Models\DataStorage\IConnectorsRepository */
private Models\DataStorage\IConnectorsRepository $connectorsRepository;

/** @var Models\Connectors\Properties\IPropertiesRepository */
private Models\Connectors\Properties\IPropertiesRepository $connectorPropertiesRepository;

/** @var Models\Connectors\Properties\IPropertiesManager */
private Models\Connectors\Properties\IPropertiesManager $connectorPropertiesManager;

/** @var Models\States\ConnectorPropertiesRepository */
private Models\States\ConnectorPropertiesRepository $connectorPropertiesStateRepository;

/** @var Models\States\ConnectorPropertiesManager */
private Models\States\ConnectorPropertiesManager $connectorPropertiesStateManager;

/** @var DateTimeFactory\DateTimeFactory */
private DateTimeFactory\DateTimeFactory $dateTimeFactory;

/** @var DataStorage\Reader */
private DataStorage\Reader $reader;

Expand All @@ -56,26 +81,43 @@ class ConnectorCommand extends Console\Command\Command
/** @var Log\LoggerInterface */
private Log\LoggerInterface $logger;

/** @var PsrEventDispatcher\EventDispatcherInterface|null */
private ?PsrEventDispatcher\EventDispatcherInterface $dispatcher;

/** @var EventLoop\LoopInterface */
private EventLoop\LoopInterface $eventLoop;

public function __construct(
Connectors\Connector $connector,
Connectors\ConnectorFactory $factory,
Models\DataStorage\IConnectorsRepository $connectorsRepository,
Models\Connectors\Properties\IPropertiesRepository $connectorPropertiesRepository,
Models\Connectors\Properties\IPropertiesManager $connectorPropertiesManager,
Models\States\ConnectorPropertiesRepository $connectorPropertiesStateRepository,
Models\States\ConnectorPropertiesManager $connectorPropertiesStateManager,
DataStorage\Reader $reader,
DateTimeFactory\DateTimeFactory $dateTimeFactory,
?PsrEventDispatcher\EventDispatcherInterface $dispatcher,
Localization\Translator $translator,
EventLoop\LoopInterface $eventLoop,
?Log\LoggerInterface $logger = null,
?string $name = null
) {
parent::__construct($name);

$this->connector = $connector;
$this->factory = $factory;
$this->connectorsRepository = $connectorsRepository;
$this->connectorPropertiesRepository = $connectorPropertiesRepository;
$this->connectorPropertiesManager = $connectorPropertiesManager;
$this->connectorPropertiesStateRepository = $connectorPropertiesStateRepository;
$this->connectorPropertiesStateManager = $connectorPropertiesStateManager;

$this->reader = $reader;

$this->dateTimeFactory = $dateTimeFactory;

$this->translator = $translator;
$this->eventLoop = $eventLoop;
$this->dispatcher = $dispatcher;

$this->logger = $logger ?? new Log\NullLogger();
}
Expand Down Expand Up @@ -139,13 +181,83 @@ protected function execute(Input\InputInterface $input, Output\OutputInterface $
return 0;
}

$service = $this->factory->create($connector);

try {
$this->eventLoop->futureTick(function () use ($connector): void {
$this->connector->execute($connector);
$this->eventLoop->futureTick(function () use ($connector, $service): void {
if ($this->dispatcher !== null) {
$this->dispatcher->dispatch(new Events\BeforeConnectorStartEvent($connector));
}

$this->logger->debug('Starting connector...', [
'source' => 'devices-module',
'type' => 'connector',
]);

try {
// Start connector service
$service->execute();

$this->setConnectorState(
$connector,
MetadataTypes\ConnectionStateType::get(MetadataTypes\ConnectionStateType::STATE_RUNNING)
);
} catch (Throwable $ex) {
throw new Exceptions\TerminateException('Connector can\'t be started');
}

if ($this->dispatcher !== null) {
$this->dispatcher->dispatch(new Events\AfterConnectorStartEvent($connector));
}
});

$this->eventLoop->addSignal(SIGINT, function (int $signal): void {
$this->connector->terminate();
$this->eventLoop->addSignal(SIGINT, function (int $signal) use ($connector, $service): void {
$this->logger->debug('Stopping connector...', [
'source' => 'devices-module',
'type' => 'connector',
]);

try {
if ($this->dispatcher !== null) {
$this->dispatcher->dispatch(new Events\BeforeConnectorTerminateEvent($service));
}

$service->terminate();

$now = $this->dateTimeFactory->getNow();

$waitingForClosing = true;

// Wait until connector is fully terminated
while (
$waitingForClosing
&& (
$this->dateTimeFactory->getNow()->getTimestamp() - $now->getTimestamp()
) < self::SHUTDOWN_WAITING_DELAY
) {
if (!$service->hasUnfinishedTasks()) {
$waitingForClosing = false;
}
}

if ($this->dispatcher !== null) {
$this->dispatcher->dispatch(new Events\AfterConnectorTerminateEvent($service));
}

$this->setConnectorState(
$connector,
MetadataTypes\ConnectionStateType::get(MetadataTypes\ConnectionStateType::STATE_STOPPED)
);
} catch (Throwable $ex) {
$this->logger->error('Connector couldn\'t be stopped. An unexpected error occurred', [
'source' => 'devices-module',
'type' => 'connector',
'exception' => [
'message' => $ex->getMessage(),
'code' => $ex->getCode(),
],
]);
}
});

$this->eventLoop->run();
Expand All @@ -156,4 +268,59 @@ protected function execute(Input\InputInterface $input, Output\OutputInterface $
return 0;
}

/**
* @param MetadataEntities\Modules\DevicesModule\IConnectorEntity $connector
* @param MetadataTypes\ConnectionStateType $state
*
* @return void
*/
private function setConnectorState(
MetadataEntities\Modules\DevicesModule\IConnectorEntity $connector,
MetadataTypes\ConnectionStateType $state
): void {
$findProperty = new Queries\FindConnectorPropertiesQuery();
$findProperty->byConnectorId($connector->getId());
$findProperty->byIdentifier(MetadataTypes\ConnectorPropertyNameType::NAME_STATE);

$property = $this->connectorPropertiesRepository->findOneBy($findProperty);

if ($property === null) {
$property = $this->connectorPropertiesManager->create(Utils\ArrayHash::from([
'connector' => $connector->getId(),
'entity' => Entities\Connectors\Properties\DynamicProperty::class,
'identifier' => MetadataTypes\ConnectorPropertyNameType::NAME_STATE,
'data_type' => MetadataTypes\DataTypeType::get(MetadataTypes\DataTypeType::DATA_TYPE_ENUM),
'unit' => null,
'format' => [
MetadataTypes\ConnectionStateType::STATE_RUNNING,
MetadataTypes\ConnectionStateType::STATE_STOPPED,
MetadataTypes\ConnectionStateType::STATE_UNKNOWN,
MetadataTypes\ConnectionStateType::STATE_SLEEPING,
MetadataTypes\ConnectionStateType::STATE_ALERT,
],
'settable' => false,
'queryable' => false,
]));
}

$propertyState = $this->connectorPropertiesStateRepository->findOne($property);

if ($propertyState === null) {
$this->connectorPropertiesStateManager->create($property, Utils\ArrayHash::from([
'actualValue' => $state->getValue(),
'expectedValue' => null,
'pending' => false,
'valid' => true,
]));

} else {
$this->connectorPropertiesStateManager->update($property, $propertyState, Utils\ArrayHash::from([
'actualValue' => $state->getValue(),
'expectedValue' => null,
'pending' => false,
'valid' => true,
]));
}
}

}

0 comments on commit 9288466

Please sign in to comment.