Skip to content

Commit

Permalink
try fix broken pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
dhanielo committed Apr 2, 2017
1 parent 1fce208 commit c903a9b
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 118 deletions.
24 changes: 0 additions & 24 deletions Tests/Transport/Subscriber/PublisherTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,30 +49,6 @@ public function testGetExchangeType()
$this->assertEquals(ExchangeType::DIRECT, $publisher->getExchangeType());
}

public function testRefreshChannel()
{
$publisher = new Publisher(
'exch_test',
ExchangeType::FANOUT,
$this->getConnectionManagerMock(),
'test'
);

$this->assertInstanceOf(CmobiAMQPChannel::class, $publisher->refreshChannel());
}

public function testGetChannel()
{
$publisher = new Publisher(
'exch_test',
ExchangeType::FANOUT,
$this->getConnectionManagerMock(),
'test'
);

$this->assertInstanceOf(CmobiAMQPChannel::class, $publisher->refreshChannel());
}

public function testGetFromName()
{
$publisher = new Publisher(
Expand Down
14 changes: 0 additions & 14 deletions Tests/Transport/Worker/TaskTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,6 @@ public function testGetQueueName()
$this->assertEquals('test', $taskClient->getQueueName());
}

public function testRefreshChannel()
{
$taskClient = new Task('test', $this->getConnectionManagerMock(), 'test');

$this->assertInstanceOf(CmobiAMQPChannel::class, $taskClient->refreshChannel());
}

public function testGetChannel()
{
$taskClient = new Task('test', $this->getConnectionManagerMock(), 'test');

$this->assertInstanceOf(CmobiAMQPChannel::class, $taskClient->refreshChannel());
}

public function testGetFromName()
{
$taskClient = new Task('test', $this->getConnectionManagerMock(), 'caller_test');
Expand Down
81 changes: 63 additions & 18 deletions Transport/Rpc/RpcClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Cmobi\RabbitmqBundle\Connection\CmobiAMQPChannel;
use Cmobi\RabbitmqBundle\Connection\CmobiAMQPConnection;
use Cmobi\RabbitmqBundle\Connection\CmobiAMQPConnectionInterface;
use Cmobi\RabbitmqBundle\Connection\ConnectionManager;
use Cmobi\RabbitmqBundle\Queue\CmobiAMQPMessage;
use Cmobi\RabbitmqBundle\Queue\QueueProducerInterface;
Expand All @@ -19,6 +20,7 @@ class RpcClient implements QueueProducerInterface
private $queueName;
private $response;
private $logOutput;
private $errOutput;
private $correlationId;
private $callbackQueue;

Expand All @@ -29,7 +31,7 @@ public function __construct($queueName, ConnectionManager $manager, $fromName, $
$this->fromName = $fromName;
$this->connectionManager = $manager;
$this->logOutput = fopen('php://stdout', 'a+');
//$this->connection = $this->connectionManager->getConnection();
$this->errOutput = fopen('php://stderr', 'a+');
}

/**
Expand All @@ -42,23 +44,9 @@ public function onResponse(AMQPMessage $rep)
}
}

/**
* @param $data
* @param int $expire
* @param int $priority
* @throws QueueNotFoundException
* @throws \Cmobi\RabbitmqBundle\Connection\Exception\NotFoundAMQPConnectionFactoryException
*/
public function publish($data, $expire = self::DEFAULT_TTL, $priority = self::PRIORITY_LOW)
public function createCallbackQueue(CmobiAMQPChannel $channel, $expire, $corralationId = null)
{
$this->response = null;
$connection = $this->connectionManager->getConnection($this->connectionName);
$channel = $connection->channel();

if (! $this->queueHasExists($channel)) {
throw new QueueNotFoundException("Queue $this->queueName not declared.");
}
$this->correlationId = $this->generateCorrelationId();
$this->correlationId = is_null($corralationId) ? $this->generateCorrelationId() : $corralationId;
$queueBag = new RpcQueueBag(
sprintf(
'callback_to_%s_from_%s_%s',
Expand All @@ -73,12 +61,34 @@ public function publish($data, $expire = self::DEFAULT_TTL, $priority = self::PR
]);
list($callbackQueue) = $channel->queueDeclare($queueBag->getQueueDeclare());
$this->callbackQueue = $callbackQueue;

$callbackQueue = $this->createCallbackQueue($channel, $expire);
$consumeQueueBag = new RpcQueueBag($callbackQueue);

$channel->basicConsume(
$consumeQueueBag->getQueueConsume(),
[$this, 'onResponse']
);

return $callbackQueue;
}

/**
* @param $data
* @param int $expire
* @param int $priority
* @throws QueueNotFoundException
* @throws \Cmobi\RabbitmqBundle\Connection\Exception\NotFoundAMQPConnectionFactoryException
*/
public function publish($data, $expire = self::DEFAULT_TTL, $priority = self::PRIORITY_LOW)
{
$this->response = null;
$connection = $this->connectionManager->getConnection($this->connectionName);
$channel = $connection->channel();

if (! $this->queueHasExists($channel)) {
throw new QueueNotFoundException("Queue $this->queueName not declared.");
}
$msg = new CmobiAMQPMessage(
(string) $data,
[
Expand All @@ -90,7 +100,15 @@ public function publish($data, $expire = self::DEFAULT_TTL, $priority = self::PR
$channel->basic_publish($msg, '', $this->getQueueName());

while (! $this->response) {
$channel->wait(null, 0, ($expire / 1000));
try {
$channel->wait(null, 0, ($expire / 1000));
} catch (\Exception $e) {
fwrite($this->errOutput, $e->getMessage());
$connection = $this->forceReconnect($connection, $expire, $this->correlationId);
$channel = $connection->channel();

continue;
}
}
$channel->close();
$connection->close();
Expand Down Expand Up @@ -185,4 +203,31 @@ public function getConnectionManager()
{
return $this->connectionManager;
}

/**
* @param CmobiAMQPConnectionInterface $connection
* @param $expire
* @param $corralationId
* @return CmobiAMQPConnectionInterface
*/
public function forceReconnect(CmobiAMQPConnectionInterface $connection, $expire, $corralationId)
{
do {
try {
$connection->close();
$failed = false;
fwrite($this->logOutput, 'start RpcClient::forceReconnect() - trying connect...' . PHP_EOL);
$connection = $this->getConnectionManager()->getConnection($this->connectionName);
$channel = $connection->channel();
$this->createCallbackQueue($channel, $expire, $corralationId);
} catch (\Exception $e) {
$failed = true;
sleep(3);
fwrite($this->errOutput, 'failed RpcClient::forceReconnect() - ' . $e->getMessage() . PHP_EOL);
}
} while ($failed);
fwrite($this->logOutput, 'RpcClient::forceReconnect() - connected!' . PHP_EOL);

return $connection;
}
}
36 changes: 7 additions & 29 deletions Transport/Subscriber/Publisher.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
class Publisher implements QueueProducerInterface
{
private $connectionManager;
private $channel;
private $fromName;
private $queueName;
private $exchange;
Expand All @@ -38,38 +37,17 @@ public function __construct(
*/
public function publish($data, $expire = self::DEFAULT_TTL, $priority = self::PRIORITY_LOW)
{
$this->refreshChannel();
$queueBag = new SubscriberQueueBag($this->getExchange(), $this->getExchangeType(), $this->getQueueName());
$this->getChannel()->exchangeDeclare($queueBag->getExchangeDeclare());
$msg = new CmobiAMQPMessage((string) $data);
$this->getChannel()->basic_publish($msg, $queueBag->getExchange());

$this->getChannel()->close();
$this->connectionManager->getConnection()->close();
}

/**
* @return CmobiAMQPChannel
*/
public function refreshChannel()
{
/** @var CmobiAMQPConnectionInterface $connection */
$connection = $this->connectionManager->getConnection();
$channel = $connection->channel();
$queueBag = new SubscriberQueueBag($this->getExchange(), $this->getExchangeType(), $this->getQueueName());
$channel->exchangeDeclare($queueBag->getExchangeDeclare());
$msg = new CmobiAMQPMessage((string) $data);
$channel->basic_publish($msg, $queueBag->getExchange());

if (!$connection->isConnected()) {
$connection->reconnect();
}
$this->channel = $connection->channel();

return $this->channel;
}

/**
* @return CmobiAMQPChannel
*/
public function getChannel()
{
return $this->channel;
$channel->close();
$connection->close();
}

/**
Expand Down
44 changes: 11 additions & 33 deletions Transport/Worker/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
class Task implements QueueProducerInterface
{
private $connectionManager;
private $channel;
private $fromName;
private $queueName;

Expand All @@ -32,56 +31,43 @@ public function __construct($queueName, ConnectionManager $manager, $fromName)
*/
public function publish($data, $expire = self::DEFAULT_TTL, $priority = self::PRIORITY_LOW)
{
$this->refreshChannel();
/** @var CmobiAMQPConnectionInterface $connection */
$connection = $this->connectionManager->getConnection();
$channel = $connection->channel();

if (! $this->queueHasExists()) {
if (! $this->queueHasExists($channel)) {
throw new QueueNotFoundException("Queue $this->queueName not declared.");
}
$queueBag = new WorkerQueueBag($this->getQueueName());
$this->getChannel()->queueDeclare($queueBag->getQueueDeclare());
$channel->queueDeclare($queueBag->getQueueDeclare());
$msg = new CmobiAMQPMessage(
(string) $data,
[
'delivery_mode' => 2, // make message persistent
'priority' => $priority,
]
);
$this->getChannel()->basic_publish($msg, '', $this->getQueueName());
$channel->basic_publish($msg, '', $this->getQueueName());

$this->getChannel()->close();
$this->connectionManager->getConnection()->close();
$channel->close();
$connection->close();
}

/**
* @param CmobiAMQPChannel $channel
* @return bool
*/
public function queueHasExists()
public function queueHasExists(CmobiAMQPChannel $channel)
{
try {
$this->getChannel()->queue_declare($this->queueName, true);
$channel->queue_declare($this->queueName, true);
} catch (\Exception $e) {
return false;
}

return true;
}

/**
* @return CmobiAMQPChannel
*/
public function refreshChannel()
{
/** @var CmobiAMQPConnectionInterface $connection */
$connection = $this->connectionManager->getConnection();

if (!$connection->isConnected()) {
$connection->reconnect();
}
$this->channel = $connection->channel();

return $this->channel;
}

/**
* @return string
*/
Expand All @@ -90,14 +76,6 @@ public function getQueueName()
return $this->queueName;
}

/**
* @return CmobiAMQPChannel
*/
public function getChannel()
{
return $this->channel;
}

/**
* @return string
*/
Expand Down

0 comments on commit c903a9b

Please sign in to comment.