Skip to content

Commit

Permalink
F/exchange to exchange binding (#48)
Browse files Browse the repository at this point in the history
* added ExchangeBindingSchema
  • Loading branch information
Mararok committed Jun 18, 2018
1 parent ed33fd2 commit d996c2b
Show file tree
Hide file tree
Showing 10 changed files with 303 additions and 96 deletions.
54 changes: 42 additions & 12 deletions src/SAREhub/Client/Amqp/AmqpEnvironmentManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,27 @@ class AmqpEnvironmentManager
/**
* @var AmqpQueueManager
*/
private $amqpQueueManager;
private $queueManager;

/**
* @var AmqpQueueBindingManager
*/
private $amqpQueueBindingManager;
private $queueBindingManager;

/**
* @var AmqpExchangeManager
*/
private $amqpExchangeManager;
private $exchangeManager;

public function __construct(
AmqpQueueManager $queueManager,
AmqpQueueBindingManager $queueBindingManager,
AmqpExchangeManager $exchangeManager
)
{
$this->amqpQueueManager = $queueManager;
$this->amqpQueueBindingManager = $queueBindingManager;
$this->amqpExchangeManager = $exchangeManager;
$this->queueManager = $queueManager;
$this->queueBindingManager = $queueBindingManager;
$this->exchangeManager = $exchangeManager;
}

/**
Expand All @@ -38,16 +38,46 @@ public function __construct(
*/
public function create(AmqpEnvironmentSchema $environmentSchema)
{
foreach ($environmentSchema->getQueueSchemas() as $queueSchema) {
$this->amqpQueueManager->create($queueSchema);
$this->createQueues($environmentSchema);
$this->createExchanges($environmentSchema);
$this->createBindings($environmentSchema);
}

/**
* @param AmqpEnvironmentSchema $environmentSchema
* @throws AmqpSchemaException
*/
private function createQueues(AmqpEnvironmentSchema $environmentSchema)
{
foreach ($environmentSchema->getQueueSchemas() as $schema) {
$this->queueManager->create($schema);
}
}

/**
* @param AmqpEnvironmentSchema $environmentSchema
* @throws AmqpSchemaException
*/
private function createExchanges(AmqpEnvironmentSchema $environmentSchema)
{
foreach ($environmentSchema->getExchangeSchemas() as $schema) {
$this->exchangeManager->create($schema);
}
}

foreach ($environmentSchema->getExchangeSchemas() as $exchangeSchema) {
$this->amqpExchangeManager->create($exchangeSchema);
/**
* @param AmqpEnvironmentSchema $environmentSchema
* @throws AmqpSchemaException
*/
private function createBindings(AmqpEnvironmentSchema $environmentSchema)
{
foreach ($environmentSchema->getQueueBindingSchemas() as $schema) {
$this->queueBindingManager->create($schema);
}

foreach ($environmentSchema->getQueueBindingSchemas() as $queueBindingSchema) {
$this->amqpQueueBindingManager->create($queueBindingSchema);
foreach ($environmentSchema->getExchangeBindingSchemas() as $schema) {
$this->exchangeManager->bindToExchange($schema);
}
}

}
40 changes: 29 additions & 11 deletions src/SAREhub/Client/Amqp/AmqpEnvironmentSchema.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
namespace SAREhub\Client\Amqp;


use SAREhub\Client\Amqp\Schema\AmqpExchangeBindingSchema;

class AmqpEnvironmentSchema
{
/**
Expand All @@ -21,41 +23,57 @@ class AmqpEnvironmentSchema
*/
private $exchangeSchemas = [];

/**
* @var AmqpExchangeBindingSchema[]
*/
private $exchangeBindingSchemas = [];

public static function newInstance(): self
{
return new self();
}

public function getQueueSchemas(): array
public function addQueueSchema(AmqpQueueSchema $schema): self
{
return $this->queueSchemas;
$this->queueSchemas[] = $schema;
return $this;
}

public function addQueueSchema(AmqpQueueSchema $queueSchema): AmqpEnvironmentSchema
public function addExchangeSchema(AmqpExchangeSchema $schema): self
{
$this->queueSchemas[] = $queueSchema;
$this->exchangeSchemas[] = $schema;
return $this;
}

public function getQueueBindingSchemas(): array
public function addQueueBindingSchema(AmqpQueueBindingSchema $schema): self
{
return $this->queueBindingSchemas;
$this->queueBindingSchemas[] = $schema;
return $this;
}

public function addQueueBindingSchema(AmqpQueueBindingSchema $queueBindingSchema): self
public function addExchangeBindingSchema(AmqpExchangeBindingSchema $schema): self
{
$this->queueBindingSchemas[] = $queueBindingSchema;
$this->exchangeBindingSchemas[] = $schema;
return $this;
}

public function getQueueSchemas(): array
{
return $this->queueSchemas;
}

public function getExchangeSchemas(): array
{
return $this->exchangeSchemas;
}

public function addExchangeSchema(AmqpExchangeSchema $exchangeSchema): AmqpEnvironmentSchema
public function getQueueBindingSchemas(): array
{
return $this->queueBindingSchemas;
}

public function getExchangeBindingSchemas(): array
{
$this->exchangeSchemas[] = $exchangeSchema;
return $this;
return $this->exchangeBindingSchemas;
}
}
28 changes: 24 additions & 4 deletions src/SAREhub/Client/Amqp/AmqpExchangeManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@


use PhpAmqpLib\Channel\AMQPChannel;
use SAREhub\Client\Amqp\Schema\AmqpExchangeBindingSchema;

class AmqpExchangeManager
{
const EXCEPTION_MESSAGE_FORMAT = "AmqpExchangeManager occurred error when creating exchange (name: %s).";
const CREATE_ERROR = "Creating exchange error: %s";
const BINDING_ERROR = "Creating exchange to exchange binding error(destination: %s; source: %s; routing_key: %s)";

private $channel;

Expand Down Expand Up @@ -37,12 +39,30 @@ public function create(AmqpExchangeSchema $schema)
);
return true;
} catch (\Exception $e) {
throw new AmqpSchemaException($this->getExceptionMessage($schema->getName()), $e);
$message = sprintf(self::CREATE_ERROR, $schema->getName());
throw new AmqpSchemaException($message, $e);
}
}

private function getExceptionMessage(string $exchangeName): string
public function bindToExchange(AmqpExchangeBindingSchema $schema)
{
return sprintf(self::EXCEPTION_MESSAGE_FORMAT, $exchangeName);
try {
$this->channel->exchange_bind(
$schema->getDestination(),
$schema->getSource(),
$schema->getRoutingKey(),
false,
$schema->getArguments()
);
return true;
} catch (\Exception $e) {
$message = sprintf(
self::BINDING_ERROR,
$schema->getDestination(),
$schema->getSource(),
$schema->getRoutingKey()
);
throw new AmqpSchemaException($message, $e);
}
}
}
13 changes: 7 additions & 6 deletions src/SAREhub/Client/Amqp/AmqpQueueBindingManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@

class AmqpQueueBindingManager
{
const ERROR_MESSAGE = "Creating queue binding error(queue: %s; exchange: %s; routing_key: %s)";

const EXCEPTION_MESSAGE_FORMAT = "AmqpQueueBindingManager occurred error when creating binding (from: %s; to: %s; routing key: %s).";


/**
* @var AMQPChannel
*/
private $channel;

public function __construct(AMQPChannel $channel)
Expand All @@ -34,14 +35,14 @@ public function create(AmqpQueueBindingSchema $schema)
);
return true;
} catch (\Exception $e) {
throw new AmqpSchemaException($this->getExceptionMessage($schema), $e);
throw new AmqpSchemaException($this->formatExceptionMessage($schema), $e);
}
}

private function getExceptionMessage(AmqpQueueBindingSchema $schema): string
private function formatExceptionMessage(AmqpQueueBindingSchema $schema): string
{
return sprintf(
self::EXCEPTION_MESSAGE_FORMAT,
self::ERROR_MESSAGE,
$schema->getQueueName(),
$schema->getExchangeName(),
$schema->getRoutingKey()
Expand Down
56 changes: 28 additions & 28 deletions src/SAREhub/Client/Amqp/AmqpQueueSchema.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,26 @@ class AmqpQueueSchema
*/
private $arguments;

public function __construct()
{
$this->arguments = new AMQPTable();
}

public static function newInstance(): self
{
return new self();
}

public function __construct()
public function withAutoDelete(bool $autoDelete): AmqpQueueSchema
{
$this->arguments = new AMQPTable();
$this->autoDelete = $autoDelete;
return $this;
}

public function getName(): string
public function withArguments(AMQPTable $arguments): AmqpQueueSchema
{
return $this->name;
$this->arguments = $arguments;
return $this;
}

public function withName(string $name): AmqpQueueSchema
Expand All @@ -59,58 +66,51 @@ public function withName(string $name): AmqpQueueSchema
return $this;
}

public function isPassive(): bool
{
return $this->passive;
}

public function withPassive(bool $passive): AmqpQueueSchema
{
$this->passive = $passive;
return $this;
}

public function isDurable(): bool
public function withDurable(bool $durable): AmqpQueueSchema
{
return $this->durable;
$this->durable = $durable;
return $this;
}

public function withDurable(bool $durable): AmqpQueueSchema
public function withExclusive(bool $exclusive): AmqpQueueSchema
{
$this->durable = $durable;
$this->exclusive = $exclusive;
return $this;
}

public function isExclusive(): bool
public function getName(): string
{
return $this->exclusive;
return $this->name;
}

public function withExclusive(bool $exclusive): AmqpQueueSchema
public function isPassive(): bool
{
$this->exclusive = $exclusive;
return $this;
return $this->passive;
}

public function isAutoDelete(): bool
public function isDurable(): bool
{
return $this->autoDelete;
return $this->durable;
}

public function withAutoDelete(bool $autoDelete): AmqpQueueSchema
public function isExclusive(): bool
{
$this->autoDelete = $autoDelete;
return $this;
return $this->exclusive;
}

public function getArguments(): AMQPTable
public function isAutoDelete(): bool
{
return $this->arguments;
return $this->autoDelete;
}

public function withArguments(AMQPTable $arguments): AmqpQueueSchema
public function getArguments(): AMQPTable
{
$this->arguments = $arguments;
return $this;
return $this->arguments;
}
}

0 comments on commit d996c2b

Please sign in to comment.