Skip to content

Commit

Permalink
Simplify worker and strategy handling
Browse files Browse the repository at this point in the history
  • Loading branch information
MrHash committed Aug 22, 2020
1 parent 119c157 commit a3dc518
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 79 deletions.
18 changes: 9 additions & 9 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 15 additions & 14 deletions src/Job/RabbitMq3Worker.php
Expand Up @@ -10,12 +10,13 @@

use Daikon\AsyncJob\Job\JobDefinitionInterface;
use Daikon\AsyncJob\Job\JobDefinitionMap;
use Daikon\AsyncJob\Metadata\JobMetadataEnricher;
use Daikon\AsyncJob\Worker\WorkerInterface;
use Daikon\Interop\Assertion;
use Daikon\Interop\RuntimeException;
use Daikon\MessageBus\Channel\ChannelInterface;
use Daikon\MessageBus\Envelope;
use Daikon\MessageBus\MessageBusInterface;
use Daikon\Metadata\MetadataInterface;
use Daikon\RabbitMq3\Connector\RabbitMq3Connector;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
Expand Down Expand Up @@ -74,27 +75,27 @@ private function execute(AMQPMessage $amqpMessage): void

$envelope = Envelope::fromNative(json_decode($amqpMessage->body, true));
$metadata = $envelope->getMetadata();
$jobName = (string)$metadata->get('job');

Assertion::notBlank($jobName, 'Worker job name must not be blank.');
/** @var JobDefinitionInterface $job */
$job = $this->jobDefinitionMap->get($jobName);
Assertion::isInstanceOf($job, JobDefinitionInterface::class, "Job definition '$jobName' not found.");
$jobKey = (string)$metadata->get(JobMetadataEnricher::JOB);
Assertion::notBlank($jobKey, 'Job key must not be blank.');
if (!$this->jobDefinitionMap->has($jobKey)) {
throw new RuntimeException("Job definition '$jobKey' not found.");
}

try {
$this->messageBus->receive($envelope);
} catch (RuntimeException $error) {
$message = $envelope->getMessage();
/** @var JobDefinitionInterface $job */
$job = $this->jobDefinitionMap->get($jobKey);
if ($job->getStrategy()->canRetry($envelope)) {
$retries = $metadata->get('_retries', 0);
/** @var MetadataInterface $metadata */
$metadata = $metadata
->with('_retries', ++$retries)
->with('_expiration', $job->getStrategy()->getRetryInterval($envelope));
$this->messageBus->publish($message, (string)$metadata->get('_channel'), $metadata);
$this->messageBus->publish(
$envelope->getMessage(),
(string)$metadata->get(ChannelInterface::METADATA_KEY),
$metadata
);
} else {
//@todo add message/metadata to error context
$this->logger->error($error->getMessage(), ['trace' => $error->getTrace()]);
$this->logger->error($error->getMessage(), ['exception' => $error->getTrace()]);
}
}

Expand Down
50 changes: 0 additions & 50 deletions src/Migration/RabbitMq3Migration.php
Expand Up @@ -18,56 +18,6 @@ protected function createMigrationList(string $exchange): void
$this->declareExchange($exchange, AMQPExchangeType::TOPIC, false, true, false, true);
}

protected function createMessagePipeline(string $exchange, int $repubInterval = 30000): void
{
$waitExchange = $exchange.'.waiting';
$waitQueue = $waitExchange;
$unroutedExchange = $exchange.'.unrouted';
$unroutedQueue = $unroutedExchange;
$repubExchange = $exchange.'.repub';
$repubQueue = $repubExchange;

// Setup the default exchange and queue pipelines
$this->declareExchange($unroutedExchange, AMQPExchangeType::FANOUT, false, true, false, true); //internal
$this->declareExchange($repubExchange, AMQPExchangeType::FANOUT, false, true, false, true); //internal
$this->declareExchange($waitExchange, AMQPExchangeType::FANOUT, false, true, false);
$this->declareExchange($exchange, AMQPExchangeType::TOPIC, false, true, false, false, false, [
'alternate-exchange' => $unroutedExchange
]);
$this->declareQueue($waitQueue, false, true, false, false, false, [
'x-dead-letter-exchange' => $exchange
]);
$this->bindQueue($waitQueue, $waitExchange);
$this->declareQueue($unroutedQueue, false, true, false, false, false, [
'x-dead-letter-exchange' => $repubExchange,
'x-message-ttl' => $repubInterval
]);
$this->bindQueue($unroutedQueue, $unroutedExchange);
$this->declareQueue($repubQueue, false, true, false, false);
$this->bindQueue($repubQueue, $repubExchange);

$this->createShovel($repubExchange, $exchange, $repubQueue);
}

protected function deleteMessagePipeline(string $exchange): void
{
$waitExchange = $exchange.'.waiting';
$waitQueue = $waitExchange;
$unroutedExchange = $exchange.'.unrouted';
$unroutedQueue = $unroutedExchange;
$repubExchange = $exchange.'.repub';
$repubQueue = $repubExchange;

$this->deleteShovel($repubExchange);
$this->deleteExchange($waitExchange);
$this->deleteExchange($unroutedExchange);
$this->deleteExchange($repubExchange);
$this->deleteExchange($exchange);
$this->deleteQueue($waitQueue);
$this->deleteQueue($unroutedQueue);
$this->deleteQueue($repubQueue);
}

protected function declareExchange(
string $exchange,
string $type,
Expand Down
19 changes: 13 additions & 6 deletions src/Transport/RabbitMq3Transport.php
Expand Up @@ -13,11 +13,17 @@
use Daikon\MessageBus\EnvelopeInterface;
use Daikon\MessageBus\MessageBusInterface;
use Daikon\RabbitMq3\Connector\RabbitMq3Connector;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

final class RabbitMq3Transport implements TransportInterface
{
public const EXCHANGE = 'exchange';
public const ROUTING_KEY = 'routing_key';
public const APPLICATION_HEADERS = 'application_headers';
public const EXPIRATION = 'expiration';

private string $key;

private RabbitMq3Connector $connector;
Expand All @@ -31,22 +37,23 @@ public function __construct(string $key, RabbitMq3Connector $connector)
public function send(EnvelopeInterface $envelope, MessageBusInterface $messageBus): void
{
$metadata = $envelope->getMetadata();
$exchange = $metadata->get('exchange');
$routingKey = $metadata->get('routing_key', '');
$exchange = $metadata->get(self::EXCHANGE);
$routingKey = $metadata->get(self::ROUTING_KEY, '');

Assertion::notBlank($exchange, 'Exchange name must not be blank.');
Assertion::string($routingKey, 'Routing key must be a string.');

$payload = json_encode($envelope->toNative());
$properties = ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
if ($metadata->has('headers')) {
$properties['application_headers'] = new AMQPTable($metadata->get('headers'));
if ($metadata->has(self::APPLICATION_HEADERS)) {
$properties['application_headers'] = new AMQPTable($metadata->get(self::APPLICATION_HEADERS));
}
if ($metadata->has('_expiration')) {
$properties['expiration'] = $metadata->get('_expiration');
if ($metadata->has(self::EXPIRATION)) {
$properties['expiration'] = $metadata->get(self::EXPIRATION);
}
$message = new AMQPMessage($payload, $properties);

/** @var AMQPChannel $channel */
$channel = $this->connector->getConnection()->channel();
$channel->basic_publish($message, $exchange, $routingKey);
}
Expand Down

0 comments on commit a3dc518

Please sign in to comment.