Skip to content

Commit

Permalink
Refactoring consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
akadlec committed Aug 8, 2020
1 parent 059cf74 commit 999ebd6
Show file tree
Hide file tree
Showing 14 changed files with 450 additions and 663 deletions.
18 changes: 9 additions & 9 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 4 additions & 6 deletions config/common.neon
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ parameters:
rabbitmq:
queueName: fb.triggers.node.exchange
routing:
- fb.bus.node.entity.*.channel # Channel entities
- fb.bus.node.entity.*.channel.* # Channel entities
- fb.bus.node.entity.*.device # Device entities
- fb.bus.node.entity.*.device.* # Device entities
- fb.bus.node.entity.deleted.channel # Channel entities
- fb.bus.node.entity.*.channel.property # Channel property entities
- fb.bus.node.entity.deleted.device # Device entities
- fb.bus.node.entity.*.device.property # Device property entities

#########################
# Used Nette extensions #
Expand Down Expand Up @@ -184,8 +184,6 @@ services:

- {factory: FastyBird\TriggersNode\Consumers\DevicePropertyMessageHandler}

- {factory: FastyBird\TriggersNode\Consumers\PropertyDataMessageHandler}

# Node events
#############
- {factory: FastyBird\TriggersNode\Events\ServerBeforeStartHandler}
Expand Down
2 changes: 0 additions & 2 deletions src/Constants.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,12 @@ final class Constants
// Devices
public const RABBIT_MQ_DEVICES_DELETED_ENTITY_ROUTING_KEY = 'fb.bus.node.entity.deleted.device';

public const RABBIT_MQ_DEVICES_PROPERTY_CREATED_ENTITY_ROUTING_KEY = 'fb.bus.node.entity.created.device.property';
public const RABBIT_MQ_DEVICES_PROPERTY_UPDATED_ENTITY_ROUTING_KEY = 'fb.bus.node.entity.updated.device.property';
public const RABBIT_MQ_DEVICES_PROPERTY_DELETED_ENTITY_ROUTING_KEY = 'fb.bus.node.entity.deleted.device.property';

// Channels
public const RABBIT_MQ_CHANNELS_DELETED_ENTITY_ROUTING_KEY = 'fb.bus.node.entity.deleted.device.channel';

public const RABBIT_MQ_CHANNELS_PROPERTY_CREATED_ENTITY_ROUTING_KEY = 'fb.bus.node.entity.created.channel.property';
public const RABBIT_MQ_CHANNELS_PROPERTY_UPDATED_ENTITY_ROUTING_KEY = 'fb.bus.node.entity.updated.channel.property';
public const RABBIT_MQ_CHANNELS_PROPERTY_DELETED_ENTITY_ROUTING_KEY = 'fb.bus.node.entity.deleted.channel.property';

Expand Down
59 changes: 10 additions & 49 deletions src/Consumers/ChannelMessageHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -128,72 +128,33 @@ private function clearChannels(string $device, string $channel): void
$findQuery = new Queries\FindChannelPropertyTriggersQuery();
$findQuery->forChannel($device, $channel);

$this->clearTriggers($findQuery);

/** @var Queries\FindActionsQuery<Entities\Actions\ChannelPropertyAction> $findQuery */
$findQuery = new Queries\FindActionsQuery();
$findQuery->forChannel($device, $channel);

$this->clearActions($findQuery);

/** @var Queries\FindConditionsQuery<Entities\Conditions\ChannelPropertyCondition> $findQuery */
$findQuery = new Queries\FindConditionsQuery();
$findQuery->forChannel($device, $channel);

$this->clearConditions($findQuery);

$this->logger->info('[CONSUMER] Successfully consumed channel entity message');
}

/**
* @param Queries\FindChannelPropertyTriggersQuery $findQuery
*
* @return void
*
* @phpstan-template T of Entities\Triggers\ChannelPropertyTrigger
* @phpstan-param Queries\FindChannelPropertyTriggersQuery<T> $findQuery
*/
private function clearTriggers(Queries\FindChannelPropertyTriggersQuery $findQuery): void
{
$triggers = $this->triggerRepository->findAllBy($findQuery, Entities\Triggers\ChannelPropertyTrigger::class);

foreach ($triggers as $trigger) {
$this->triggersManager->delete($trigger);
}
}

/**
* @param Queries\FindActionsQuery $findQuery
*
* @return void
*
* @phpstan-template T of Entities\Actions\ChannelPropertyAction
* @phpstan-param Queries\FindActionsQuery<T> $findQuery
*/
private function clearActions(Queries\FindActionsQuery $findQuery): void
{
/** @var Queries\FindActionsQuery<Entities\Actions\ChannelPropertyAction> $findQuery */
$findQuery = new Queries\FindActionsQuery();
$findQuery->forChannel($device, $channel);

$actions = $this->actionRepository->findAllBy($findQuery, Entities\Actions\ChannelPropertyAction::class);

foreach ($actions as $action) {
$this->actionsManager->delete($action);
}
}

/**
* @param Queries\FindConditionsQuery $findQuery
*
* @return void
*
* @phpstan-template T of Entities\Conditions\ChannelPropertyCondition
* @phpstan-param Queries\FindConditionsQuery<T> $findQuery
*/
private function clearConditions(Queries\FindConditionsQuery $findQuery): void
{
/** @var Queries\FindConditionsQuery<Entities\Conditions\ChannelPropertyCondition> $findQuery */
$findQuery = new Queries\FindConditionsQuery();
$findQuery->forChannel($device, $channel);

$conditions = $this->conditionRepository->findAllBy($findQuery, Entities\Conditions\ChannelPropertyCondition::class);

foreach ($conditions as $condition) {
$this->conditionsManager->delete($condition);
}

$this->logger->info('[CONSUMER] Successfully consumed channel entity message');
}

}
133 changes: 89 additions & 44 deletions src/Consumers/ChannelPropertyMessageHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
namespace FastyBird\TriggersNode\Consumers;

use FastyBird\NodeExchange\Consumers as NodeExchangeConsumers;
use FastyBird\NodeExchange\Publishers as NodeExchangePublishers;
use FastyBird\NodeMetadata;
use FastyBird\NodeMetadata\Loaders as NodeMetadataLoaders;
use FastyBird\TriggersNode;
use FastyBird\TriggersNode\Entities;
use FastyBird\TriggersNode\Exceptions;
use FastyBird\TriggersNode\Models;
use FastyBird\TriggersNode\Queries;
use FastyBird\TriggersNode\Types;
use Nette\Utils;
use Psr\Log;

Expand All @@ -37,6 +39,14 @@
final class ChannelPropertyMessageHandler implements NodeExchangeConsumers\IMessageHandler
{

use TPropertyDataMessageHandler;

/** @var NodeExchangePublishers\IRabbitMqPublisher */
protected $rabbitMqPublisher;

/** @var Log\LoggerInterface */
protected $logger;

/** @var Models\Triggers\ITriggerRepository */
private $triggerRepository;

Expand All @@ -58,9 +68,6 @@ final class ChannelPropertyMessageHandler implements NodeExchangeConsumers\IMess
/** @var NodeMetadataLoaders\ISchemaLoader */
private $schemaLoader;

/** @var Log\LoggerInterface */
private $logger;

public function __construct(
Models\Triggers\ITriggerRepository $triggerRepository,
Models\Triggers\ITriggersManager $triggersManager,
Expand All @@ -69,6 +76,7 @@ public function __construct(
Models\Conditions\IConditionRepository $conditionRepository,
Models\Conditions\IConditionsManager $conditionsManager,
NodeMetadataLoaders\ISchemaLoader $schemaLoader,
NodeExchangePublishers\IRabbitMqPublisher $rabbitMqPublisher,
Log\LoggerInterface $logger
) {
$this->triggerRepository = $triggerRepository;
Expand All @@ -79,6 +87,7 @@ public function __construct(
$this->conditionsManager = $conditionsManager;

$this->schemaLoader = $schemaLoader;
$this->rabbitMqPublisher = $rabbitMqPublisher;
$this->logger = $logger;
}

Expand All @@ -96,6 +105,23 @@ public function process(
$message->offsetGet('property')
);

} elseif ($routingKey === TriggersNode\Constants::RABBIT_MQ_CHANNELS_PROPERTY_UPDATED_ENTITY_ROUTING_KEY) {
// Only not pending messages will be processed
if (
$message->offsetExists('pending')
&& $message->offsetGet('pending') === false
&& $message->offsetExists('value')
) {
$this->processChannelConditions(
$message->offsetGet('device'),
$message->offsetGet('channel'),
$message->offsetGet('property'),
$message->offsetGet('value'),
$message->offsetExists('previous_value') ? $message->offsetGet('previous_value') : null,
$message->offsetGet('datatype')
);
}

} else {
throw new Exceptions\InvalidStateException('Unknown routing key');
}
Expand All @@ -111,6 +137,7 @@ public function getSchema(string $routingKey, string $origin): ?string
if ($origin === TriggersNode\Constants::NODE_DEVICES_ORIGIN) {
switch ($routingKey) {
case TriggersNode\Constants::RABBIT_MQ_CHANNELS_PROPERTY_DELETED_ENTITY_ROUTING_KEY:
case TriggersNode\Constants::RABBIT_MQ_CHANNELS_PROPERTY_UPDATED_ENTITY_ROUTING_KEY:
return $this->schemaLoader->load(NodeMetadata\Constants::RESOURCES_FOLDER . '/schemas/devices-node/entity.channel.property.json');
}
}
Expand All @@ -130,71 +157,89 @@ private function clearProperties(string $device, string $channel, string $proper
$findQuery = new Queries\FindChannelPropertyTriggersQuery();
$findQuery->forProperty($device, $channel, $property);

$this->clearTriggers($findQuery);
$triggers = $this->triggerRepository->findAllBy($findQuery, Entities\Triggers\ChannelPropertyTrigger::class);

foreach ($triggers as $trigger) {
$this->triggersManager->delete($trigger);
}

/** @var Queries\FindActionsQuery<Entities\Actions\ChannelPropertyAction> $findQuery */
$findQuery = new Queries\FindActionsQuery();
$findQuery->forChannelProperty($device, $channel, $property);

$this->clearActions($findQuery);
$actions = $this->actionRepository->findAllBy($findQuery, Entities\Actions\ChannelPropertyAction::class);

foreach ($actions as $action) {
$this->actionsManager->delete($action);
}

/** @var Queries\FindConditionsQuery<Entities\Conditions\ChannelPropertyCondition> $findQuery */
$findQuery = new Queries\FindConditionsQuery();
$findQuery->forChannelProperty($device, $channel, $property);

$this->clearConditions($findQuery);
$conditions = $this->conditionRepository->findAllBy($findQuery, Entities\Conditions\ChannelPropertyCondition::class);

foreach ($conditions as $condition) {
$this->conditionsManager->delete($condition);
}

$this->logger->info('[CONSUMER] Successfully consumed channel property data message');
}

/**
* @param Queries\FindChannelPropertyTriggersQuery $findQuery
* @param string $device
* @param string $channel
* @param string $property
* @param mixed $value
* @param mixed|null $previousValue
* @param string|null $datatype
*
* @return void
*
* @phpstan-template T of Entities\Triggers\ChannelPropertyTrigger
* @phpstan-param Queries\FindChannelPropertyTriggersQuery<T> $findQuery
*/
private function clearTriggers(Queries\FindChannelPropertyTriggersQuery $findQuery): void
{
$triggers = $this->triggerRepository->findAllBy($findQuery, Entities\Triggers\ChannelPropertyTrigger::class);

foreach ($triggers as $trigger) {
$this->triggersManager->delete($trigger);
private function processChannelConditions(
string $device,
string $channel,
string $property,
$value,
$previousValue = null,
?string $datatype = null
): void {
$value = $this->formatValue($value, $datatype);
$previousValue = $this->formatValue($previousValue, $datatype);

// Previous value is same as current, skipping
if ($previousValue !== null && (string) $value === (string) $previousValue) {
return;
}
}

/**
* @param Queries\FindActionsQuery $findQuery
*
* @return void
*
* @phpstan-template T of Entities\Actions\ChannelPropertyAction
* @phpstan-param Queries\FindActionsQuery<T> $findQuery
*/
private function clearActions(Queries\FindActionsQuery $findQuery): void
{
$actions = $this->actionRepository->findAllBy($findQuery, Entities\Actions\ChannelPropertyAction::class);

foreach ($actions as $action) {
$this->actionsManager->delete($action);
}
}
$findQuery = new Queries\FindConditionsQuery();
$findQuery->forChannelProperty($device, $channel, $property);

/**
* @param Queries\FindConditionsQuery $findQuery
*
* @return void
*
* @phpstan-template T of Entities\Conditions\ChannelPropertyCondition
* @phpstan-param Queries\FindConditionsQuery<T> $findQuery
*/
private function clearConditions(Queries\FindConditionsQuery $findQuery): void
{
$conditions = $this->conditionRepository->findAllBy($findQuery, Entities\Conditions\ChannelPropertyCondition::class);

/** @var Entities\Conditions\ChannelPropertyCondition $condition */
foreach ($conditions as $condition) {
$this->conditionsManager->delete($condition);
if (
$condition->getOperator()->equalsValue(Types\ConditionOperatorType::STATE_VALUE_EQUAL)
&& $condition->getOperand() === (string) $value
) {
$this->processCondition($condition);
}
}

$findQuery = new Queries\FindChannelPropertyTriggersQuery();
$findQuery->forProperty($device, $channel, $property);

$triggers = $this->triggerRepository->findAllBy($findQuery, Entities\Triggers\ChannelPropertyTrigger::class);

/** @var Entities\Triggers\ChannelPropertyTrigger $trigger */
foreach ($triggers as $trigger) {
if (
$trigger->getOperator()->equalsValue(Types\ConditionOperatorType::STATE_VALUE_EQUAL)
&& $trigger->getOperand() === (string) $value
) {
$this->processTrigger($trigger);
}
}
}

Expand Down
Loading

0 comments on commit 999ebd6

Please sign in to comment.